/*
 * Decompiled with CFR 0.152.
 */
package cn.boboweike.carrot.server;

import cn.boboweike.carrot.SevereCarrotException;
import cn.boboweike.carrot.server.BackgroundTaskServer;
import cn.boboweike.carrot.server.concurrent.ConcurrentTaskModificationResolver;
import cn.boboweike.carrot.server.concurrent.UnresolvableConcurrentTaskModificationException;
import cn.boboweike.carrot.server.dashboard.DashboardNotificationManager;
import cn.boboweike.carrot.server.strategy.WorkDistributionStrategy;
import cn.boboweike.carrot.storage.BackgroundTaskServerStatus;
import cn.boboweike.carrot.storage.ConcurrentTaskModificationException;
import cn.boboweike.carrot.storage.PageRequest;
import cn.boboweike.carrot.storage.PartitionedStorageProvider;
import cn.boboweike.carrot.tasks.RecurringTask;
import cn.boboweike.carrot.tasks.Task;
import cn.boboweike.carrot.tasks.filters.TaskFilterUtils;
import cn.boboweike.carrot.tasks.states.StateName;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskZooKeeper
implements Runnable {
    static final Logger LOGGER = LoggerFactory.getLogger(TaskZooKeeper.class);
    private final BackgroundTaskServer backgroundTaskServer;
    private final PartitionedStorageProvider storageProvider;
    private final List<RecurringTask> recurringTasks;
    private final DashboardNotificationManager dashboardNotificationManager;
    private final TaskFilterUtils taskFilterUtils;
    private final WorkDistributionStrategy workDistributionStrategy;
    private final ConcurrentTaskModificationResolver concurrentTaskModificationResolver;
    private final Map<Task, Thread> currentlyProcessedTasks;
    private final AtomicInteger exceptionCount;
    private final ReentrantLock reentrantLock;
    private final AtomicInteger occupiedWorkers;
    private final Duration durationPollIntervalTimeBox;
    private Instant runStartTime;

    public TaskZooKeeper(BackgroundTaskServer backgroundTaskServer) {
        this.backgroundTaskServer = backgroundTaskServer;
        this.storageProvider = backgroundTaskServer.getStorageProvider();
        this.recurringTasks = new ArrayList<RecurringTask>();
        this.workDistributionStrategy = backgroundTaskServer.getWorkDistributionStrategy();
        this.dashboardNotificationManager = backgroundTaskServer.getDashboardNotificationManager();
        this.taskFilterUtils = new TaskFilterUtils(backgroundTaskServer.getTaskFilters());
        this.concurrentTaskModificationResolver = this.createConcurrentTaskModificationResolver();
        this.currentlyProcessedTasks = new ConcurrentHashMap<Task, Thread>();
        this.durationPollIntervalTimeBox = Duration.ofSeconds((long)((double)this.backgroundTaskServerStatus().getPollIntervalInSeconds() - (double)this.backgroundTaskServerStatus().getPollIntervalInSeconds() * 0.05));
        this.reentrantLock = new ReentrantLock();
        this.exceptionCount = new AtomicInteger();
        this.occupiedWorkers = new AtomicInteger();
    }

    @Override
    public void run() {
        try {
            this.runStartTime = Instant.now();
            if (this.backgroundTaskServer.isUnAnnounced()) {
                return;
            }
            this.updateTasksThatAreBeingProcessed();
            this.runRoutineTasks();
            this.onboardNewWorkIfPossible();
        }
        catch (Exception e) {
            this.dashboardNotificationManager.handle(e);
            this.exceptionCount.getAndIncrement();
            LOGGER.warn("Carrot encountered a problematic exception. Please create a bug report (if possible, provide the code to reproduce this and the stacktrace) - Processing will continue, exceptionCount = {}", (Object)this.exceptionCount, (Object)e);
        }
    }

    void updateTasksThatAreBeingProcessed() {
        LOGGER.debug("Updating currently processed tasks... ");
        this.processTaskList(new ArrayList<Task>(this.currentlyProcessedTasks.keySet()), this::updateCurrentlyProcessingTask);
    }

    void runRoutineTasks() {
        this.checkForRecurringTasks();
        this.checkForScheduledTasks();
        this.checkForOrphanedTasks();
        this.checkForSucceededTasksThanCanGoToDeletedState();
        this.checkForTasksThatCanBeDeleted();
    }

    boolean canOnboardNewWork() {
        return this.backgroundTaskServerStatus().isRunning() && this.workDistributionStrategy.canOnboardNewWork();
    }

    void checkForRecurringTasks() {
        Integer partition = this.getPartition();
        if (partition == BackgroundTaskServer.NO_PARTITION) {
            return;
        }
        LOGGER.debug("Looking for recurring tasks... ");
        List<RecurringTask> recurringTasks = this.getRecurringTasks(partition);
        this.processRecurringTasks(recurringTasks);
    }

    void checkForScheduledTasks() {
        Integer partition = this.getPartition();
        if (partition == BackgroundTaskServer.NO_PARTITION) {
            return;
        }
        LOGGER.debug("Looking for scheduled tasks... ");
        Supplier<List<Task>> scheduledTasksSupplier = () -> this.storageProvider.getScheduledTasksByPartition(Instant.now().plusSeconds(this.backgroundTaskServerStatus().getPollIntervalInSeconds()), PageRequest.ascOnUpdatedAt(1000), partition);
        this.processTaskList(scheduledTasksSupplier, Task::enqueue);
    }

    void checkForOrphanedTasks() {
        Integer partition = this.getPartition();
        if (partition == BackgroundTaskServer.NO_PARTITION) {
            return;
        }
        LOGGER.debug("Looking for orphan tasks... ");
        Instant updatedBefore = this.runStartTime.minus(Duration.ofSeconds(this.backgroundTaskServer.getServerStatus().getPollIntervalInSeconds()).multipliedBy(4L));
        Supplier<List<Task>> orphanedTasksSupplier = () -> this.storageProvider.getTasksByPartition(StateName.PROCESSING, updatedBefore, PageRequest.ascOnUpdatedAt(1000), partition);
        this.processTaskList(orphanedTasksSupplier, (Task task) -> task.failed("Orphaned task", new IllegalThreadStateException("Task was too long in PROCESSING state without being updated.")));
    }

    void checkForSucceededTasksThanCanGoToDeletedState() {
        Integer partition = this.getPartition();
        if (partition == BackgroundTaskServer.NO_PARTITION) {
            return;
        }
        LOGGER.debug("Looking for succeeded tasks that can go to the deleted state... ");
        AtomicInteger succeededTasksCounter = new AtomicInteger();
        Instant updatedBefore = Instant.now().minus(this.backgroundTaskServer.getServerStatus().getDeleteSucceededTasksAfter());
        Supplier<List<Task>> succeededTasksSupplier = () -> this.storageProvider.getTasksByPartition(StateName.SUCCEEDED, updatedBefore, PageRequest.ascOnUpdatedAt(1000), partition);
        this.processTaskList(succeededTasksSupplier, (Task task) -> {
            succeededTasksCounter.incrementAndGet();
            task.delete("Carrot maintenance - deleting succeeded task");
        });
        if (succeededTasksCounter.get() > 0) {
            this.storageProvider.publishTotalAmountOfSucceededTasks(succeededTasksCounter.get());
        }
    }

    void checkForTasksThatCanBeDeleted() {
        Integer partition = this.getPartition();
        if (partition == BackgroundTaskServer.NO_PARTITION) {
            return;
        }
        LOGGER.debug("Looking for deleted tasks that can be deleted permanently... ");
        this.storageProvider.deleteTasksPermanentlyByPartition(StateName.DELETED, Instant.now().minus(this.backgroundTaskServer.getServerStatus().getPermanentlyDeleteDeletedTasksAfter()), partition);
    }

    private Integer getPartition() {
        Integer partition = this.backgroundTaskServer.getPartition();
        if (partition == null) {
            return BackgroundTaskServer.NO_PARTITION;
        }
        return partition;
    }

    void onboardNewWorkIfPossible() {
        if (this.pollIntervalInSecondsTimeBoxIsAboutToPass()) {
            return;
        }
        if (this.canOnboardNewWork()) {
            this.checkForEnqueuedTasks();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void checkForEnqueuedTasks() {
        Integer partition = this.getPartition();
        if (partition == BackgroundTaskServer.NO_PARTITION) {
            return;
        }
        try {
            if (this.reentrantLock.tryLock()) {
                LOGGER.debug("Looking for enqueued tasks... ");
                PageRequest workPageRequest = this.workDistributionStrategy.getWorkPageRequest();
                if (workPageRequest.getLimit() > 0) {
                    List<Task> enqueuedTasks = this.storageProvider.getTasksByPartition(StateName.ENQUEUED, workPageRequest, partition);
                    enqueuedTasks.forEach(this.backgroundTaskServer::processTask);
                }
            }
        }
        finally {
            if (this.reentrantLock.isHeldByCurrentThread()) {
                this.reentrantLock.unlock();
            }
        }
    }

    void processRecurringTasks(List<RecurringTask> recurringTasks) {
        Integer partition = this.getPartition();
        if (partition == BackgroundTaskServer.NO_PARTITION) {
            return;
        }
        LOGGER.debug("Found {} recurring tasks", (Object)recurringTasks.size());
        List<Task> tasksToSchedule = recurringTasks.stream().filter(rt -> this.mustSchedule((RecurringTask)rt, partition)).map(RecurringTask::toScheduledTask).collect(Collectors.toList());
        if (!tasksToSchedule.isEmpty()) {
            this.storageProvider.saveByPartition(tasksToSchedule, partition);
        }
    }

    boolean mustSchedule(RecurringTask recurringTask, Integer partition) {
        return recurringTask.getNextRun().isBefore(Instant.now().plus(this.durationPollIntervalTimeBox).plusSeconds(1L)) && !this.storageProvider.recurringTaskExistsByPartition(recurringTask.getId(), partition, StateName.SCHEDULED, StateName.ENQUEUED, StateName.PROCESSING);
    }

    void processTaskList(Supplier<List<Task>> taskListSupplier, Consumer<Task> taskConsumer) {
        List<Task> tasks = this.getTasksToProcess(taskListSupplier);
        while (!tasks.isEmpty()) {
            this.processTaskList(tasks, taskConsumer);
            tasks = this.getTasksToProcess(taskListSupplier);
        }
    }

    void processTaskList(List<Task> tasks, Consumer<Task> taskConsumer) {
        Integer partition = this.getPartition();
        if (partition == BackgroundTaskServer.NO_PARTITION) {
            return;
        }
        if (!tasks.isEmpty()) {
            try {
                tasks.forEach(taskConsumer);
                this.taskFilterUtils.runOnStateElectionFilter(tasks);
                this.storageProvider.saveByPartition(tasks, partition);
                this.taskFilterUtils.runOnStateAppliedFilters(tasks);
            }
            catch (ConcurrentTaskModificationException concurrentTaskModificationException) {
                try {
                    this.concurrentTaskModificationResolver.resolve(concurrentTaskModificationException);
                }
                catch (UnresolvableConcurrentTaskModificationException unresolvableConcurrentTaskModificationException) {
                    throw new SevereCarrotException("Could not resolve ConcurrentTaskModificationException", unresolvableConcurrentTaskModificationException);
                }
            }
        }
    }

    BackgroundTaskServerStatus backgroundTaskServerStatus() {
        return this.backgroundTaskServer.getServerStatus();
    }

    public void startProcessing(Task task, Thread thread) {
        this.currentlyProcessedTasks.put(task, thread);
    }

    public void stopProcessing(Task task) {
        this.currentlyProcessedTasks.remove(task);
    }

    public Thread getThreadProcessingTask(Task task) {
        return this.currentlyProcessedTasks.get(task);
    }

    public int getOccupiedWorkerCount() {
        return this.occupiedWorkers.get();
    }

    public void notifyThreadOccupied() {
        this.occupiedWorkers.incrementAndGet();
    }

    public void notifyThreadIdle() {
        this.occupiedWorkers.decrementAndGet();
        if (this.workDistributionStrategy.canOnboardNewWork()) {
            this.checkForEnqueuedTasks();
        }
    }

    private List<Task> getTasksToProcess(Supplier<List<Task>> taskListSupplier) {
        if (this.pollIntervalInSecondsTimeBoxIsAboutToPass()) {
            return Collections.emptyList();
        }
        return taskListSupplier.get();
    }

    private void updateCurrentlyProcessingTask(Task task) {
        try {
            task.updateProcessing();
        }
        catch (ClassCastException classCastException) {
            // empty catch block
        }
    }

    private boolean pollIntervalInSecondsTimeBoxIsAboutToPass() {
        boolean runTimeBoxIsPassed;
        Duration durationRunTime = Duration.between(this.runStartTime, Instant.now());
        boolean bl = runTimeBoxIsPassed = durationRunTime.compareTo(this.durationPollIntervalTimeBox) >= 0;
        if (runTimeBoxIsPassed) {
            LOGGER.debug("Carrot is passing the poll interval in seconds timebox because of too many tasks.");
        }
        return runTimeBoxIsPassed;
    }

    private List<RecurringTask> getRecurringTasks(Integer partition) {
        if ((long)this.recurringTasks.size() != this.storageProvider.countRecurringTasksByPartition(partition)) {
            this.recurringTasks.clear();
            this.recurringTasks.addAll(this.storageProvider.getRecurringTasksByPartition(partition));
        }
        return this.recurringTasks;
    }

    ConcurrentTaskModificationResolver createConcurrentTaskModificationResolver() {
        return this.backgroundTaskServer.getConfiguration().concurrentTaskModificationPolicy.toConcurrentTaskModificationResolver(this.storageProvider, this);
    }
}

