/*
 * Decompiled with CFR 0.152.
 */
package cn.ponfee.disjob.supervisor.scanner;

import cn.ponfee.disjob.common.base.SingletonClassConstraint;
import cn.ponfee.disjob.common.collect.Collects;
import cn.ponfee.disjob.common.concurrent.AbstractHeartbeatThread;
import cn.ponfee.disjob.common.concurrent.PeriodExecutor;
import cn.ponfee.disjob.common.lock.LockTemplate;
import cn.ponfee.disjob.supervisor.component.JobManager;
import cn.ponfee.disjob.supervisor.component.JobQuerier;
import cn.ponfee.disjob.supervisor.component.WorkerClient;
import cn.ponfee.disjob.supervisor.configuration.SupervisorProperties;
import cn.ponfee.disjob.supervisor.model.SchedInstance;
import cn.ponfee.disjob.supervisor.model.SchedJob;
import cn.ponfee.disjob.supervisor.model.SchedTask;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import org.apache.commons.collections4.CollectionUtils;

public class RunningInstanceScanner
extends AbstractHeartbeatThread {
    private final int scanBatchSize;
    private final JobManager jobManager;
    private final JobQuerier jobQuerier;
    private final WorkerClient workerClient;
    private final LockTemplate lockTemplate;
    private final long beforeMilliseconds;
    private final PeriodExecutor logPrinter = new PeriodExecutor(30000L, () -> this.log.warn("Not discovered any worker."));

    public RunningInstanceScanner(SupervisorProperties conf, JobManager jobManager, JobQuerier jobQuerier, WorkerClient workerClient, LockTemplate lockTemplate) {
        super(conf.getScanRunningInstancePeriodMs());
        SingletonClassConstraint.constrain((Object)((Object)this));
        this.scanBatchSize = conf.getScanBatchSize();
        this.jobManager = jobManager;
        this.jobQuerier = jobQuerier;
        this.workerClient = workerClient;
        this.lockTemplate = lockTemplate;
        this.beforeMilliseconds = this.heartbeatPeriodMs * 12L;
    }

    protected boolean heartbeat() {
        if (!this.workerClient.hasAliveWorker()) {
            this.logPrinter.execute();
            return true;
        }
        Boolean result = (Boolean)this.lockTemplate.execute(this::process);
        return result != null && result != false;
    }

    private boolean process() {
        Date expireTime = new Date(System.currentTimeMillis() - this.beforeMilliseconds);
        List<SchedInstance> instances = this.jobQuerier.findExpireRunningInstance(expireTime, this.scanBatchSize);
        if (CollectionUtils.isEmpty(instances)) {
            return true;
        }
        for (SchedInstance instance : instances) {
            this.processEach(instance);
        }
        return instances.size() < this.scanBatchSize;
    }

    private void processEach(SchedInstance instance) {
        if (!this.jobManager.updateInstanceNextScanTime(instance, new Date())) {
            return;
        }
        List<SchedTask> tasks = this.jobQuerier.findBaseInstanceTasks(instance.getInstanceId());
        List waitingTasks = Collects.filter(tasks, SchedTask::isWaiting);
        if (CollectionUtils.isNotEmpty((Collection)waitingTasks)) {
            this.processHasWaitingTask(instance, waitingTasks);
        } else if (tasks.stream().allMatch(SchedTask::isTerminal)) {
            this.processAllTerminatedTask(instance);
        } else {
            this.processHasExecutingTask(instance, tasks);
        }
    }

    private void processHasWaitingTask(SchedInstance instance, List<SchedTask> waitingTasks) {
        List redispatchingTasks = Collects.filter(waitingTasks, this.workerClient::shouldRedispatch);
        if (CollectionUtils.isEmpty((Collection)redispatchingTasks)) {
            return;
        }
        SchedJob job = this.jobQuerier.getJob(instance.getJobId());
        if (job == null) {
            this.log.error("Scanned running state instance not found job: {}", (Object)instance.getJobId());
            return;
        }
        if (!this.workerClient.hasAliveWorker(job.getGroup())) {
            this.log.error("Scanned running state instance none alive worker: {}, {}", (Object)instance.getInstanceId(), (Object)job.getGroup());
            return;
        }
        this.jobManager.redispatch(job, instance, redispatchingTasks);
        this.log.info("Scanned running state instance re-dispatch task: {}", (Object)instance.getInstanceId());
    }

    private void processAllTerminatedTask(SchedInstance instance) {
        SchedInstance reloadInstance = this.jobQuerier.getInstance(instance.getInstanceId());
        if (reloadInstance == null) {
            this.log.error("Scanned running state instance not exists: {}", (Object)instance.getInstanceId());
            return;
        }
        if (reloadInstance.isTerminal()) {
            return;
        }
        boolean purged = this.jobManager.purgeInstance(instance);
        this.log.info("Purge scanned running instance task all terminated: {}, {}", (Object)instance.getInstanceId(), (Object)purged);
    }

    private void processHasExecutingTask(SchedInstance instance, List<SchedTask> tasks) {
        if (this.workerClient.hasAliveTask(tasks)) {
            return;
        }
        boolean purged = this.jobManager.purgeInstance(instance);
        this.log.info("Purge scanned running instance was dead: {}, {}", (Object)instance.getInstanceId(), (Object)purged);
    }
}

