/*
 * Decompiled with CFR 0.152.
 */
package cn.ponfee.disjob.supervisor.thread;

import cn.ponfee.disjob.common.base.SingletonClassConstraint;
import cn.ponfee.disjob.common.collect.Collects;
import cn.ponfee.disjob.common.concurrent.AbstractHeartbeatThread;
import cn.ponfee.disjob.common.lock.DoInLocked;
import cn.ponfee.disjob.core.enums.ExecuteState;
import cn.ponfee.disjob.core.enums.RunState;
import cn.ponfee.disjob.core.model.SchedInstance;
import cn.ponfee.disjob.core.model.SchedJob;
import cn.ponfee.disjob.core.model.SchedTask;
import cn.ponfee.disjob.supervisor.component.DistributedJobManager;
import cn.ponfee.disjob.supervisor.component.DistributedJobQuerier;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import org.apache.commons.collections4.CollectionUtils;

public class WaitingInstanceScanner
extends AbstractHeartbeatThread {
    private final DoInLocked doInLocked;
    private final DistributedJobManager jobManager;
    private final DistributedJobQuerier jobQuerier;
    private final long beforeMilliseconds;

    public WaitingInstanceScanner(long heartbeatPeriodMilliseconds, DoInLocked doInLocked, DistributedJobManager jobManager, DistributedJobQuerier jobQuerier) {
        super(heartbeatPeriodMilliseconds);
        SingletonClassConstraint.constrain((Object)((Object)this));
        this.doInLocked = doInLocked;
        this.jobManager = jobManager;
        this.jobQuerier = jobQuerier;
        this.beforeMilliseconds = this.heartbeatPeriodMs << 3;
    }

    protected boolean heartbeat() {
        if (this.jobManager.hasNotDiscoveredWorkers()) {
            this.log.warn("Not discovered worker.");
            return true;
        }
        Boolean result = (Boolean)this.doInLocked.action(this::process);
        return result != null && result != false;
    }

    private boolean process() {
        Date expireTime = new Date(System.currentTimeMillis() - this.beforeMilliseconds);
        List<SchedInstance> instances = this.jobQuerier.findExpireWaitingInstance(expireTime, 200);
        if (CollectionUtils.isEmpty(instances)) {
            return true;
        }
        for (SchedInstance instance : instances) {
            this.processEach(instance);
        }
        return instances.size() < 200;
    }

    private void processEach(SchedInstance instance) {
        if (!this.jobManager.renewInstanceUpdateTime(instance, new Date())) {
            return;
        }
        List<SchedTask> tasks = this.jobQuerier.findBaseInstanceTasks(instance.getInstanceId());
        List waitingTasks = Collects.filter(tasks, e -> ExecuteState.WAITING.equals(e.getExecuteState()));
        if (CollectionUtils.isNotEmpty((Collection)waitingTasks)) {
            List redispatchingTasks = Collects.filter((List)waitingTasks, e -> this.jobManager.isDeadWorker(e.getWorker()));
            if (CollectionUtils.isEmpty((Collection)redispatchingTasks)) {
                return;
            }
            SchedJob schedJob = this.jobQuerier.getJob(instance.getJobId());
            if (schedJob == null) {
                this.log.error("Scanned waiting state instance not found job: {}", (Object)instance.getJobId());
                return;
            }
            if (this.jobManager.hasNotDiscoveredWorkers(schedJob.getGroup())) {
                this.log.error("Scanned waiting state instance not discovered worker: {}, {}", (Object)instance.getInstanceId(), (Object)schedJob.getGroup());
                return;
            }
            this.log.info("Scanned waiting state instance re-dispatch task: {}", (Object)instance.getInstanceId());
            this.jobManager.dispatch(schedJob, instance, redispatchingTasks);
        } else {
            if (tasks.stream().allMatch(e -> ExecuteState.of((Integer)e.getExecuteState()).isTerminal())) {
                SchedInstance reloadInstance = this.jobQuerier.getInstance(instance.getInstanceId());
                if (reloadInstance == null) {
                    this.log.error("Scanned waiting state instance not exists: {}", (Object)instance.getInstanceId());
                    return;
                }
                if (RunState.of((Integer)reloadInstance.getRunState()).isTerminal()) {
                    return;
                }
            }
            this.log.info("Scanned waiting state instance was dead: {}", (Object)instance.getInstanceId());
            this.jobManager.purgeInstance(instance);
        }
    }
}

