/*
 * Decompiled with CFR 0.152.
 */
package cn.ponfee.scheduler.supervisor.thread;

import cn.ponfee.scheduler.common.date.Dates;
import cn.ponfee.scheduler.common.lock.DoInLocked;
import cn.ponfee.scheduler.core.base.AbstractHeartbeatThread;
import cn.ponfee.scheduler.core.enums.ExecuteState;
import cn.ponfee.scheduler.core.model.SchedJob;
import cn.ponfee.scheduler.core.model.SchedTask;
import cn.ponfee.scheduler.core.model.SchedTrack;
import cn.ponfee.scheduler.supervisor.manager.SchedulerJobManager;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;

public class ScanWaitingTrackThread
extends AbstractHeartbeatThread {
    private final DoInLocked doInLocked;
    private final SchedulerJobManager schedulerJobManager;
    private final long beforeMilliseconds;

    public ScanWaitingTrackThread(long heartbeatPeriodMs0, DoInLocked doInLocked, SchedulerJobManager schedulerJobManager) {
        super(heartbeatPeriodMs0);
        this.doInLocked = doInLocked;
        this.schedulerJobManager = schedulerJobManager;
        this.beforeMilliseconds = this.heartbeatPeriodMs << 3;
    }

    protected boolean heartbeat() {
        if (this.schedulerJobManager.hasNotFoundWorkers()) {
            this.log.warn("Not found available worker.");
            return true;
        }
        Boolean result = (Boolean)this.doInLocked.apply(this::process);
        return result == null || result != false;
    }

    private boolean process() {
        Date now = new Date();
        Date expireTime = new Date(now.getTime() - this.beforeMilliseconds);
        List<SchedTrack> tracks = this.schedulerJobManager.findExpireWaiting(expireTime, 200);
        if (CollectionUtils.isEmpty(tracks)) {
            return true;
        }
        for (SchedTrack track : tracks) {
            this.processEach(track, now);
        }
        return tracks.size() < 200;
    }

    private void processEach(SchedTrack track, Date now) {
        List<SchedTask> tasks = this.schedulerJobManager.findMediumTaskByTrackId(track.getTrackId());
        if (tasks.stream().allMatch(t -> ExecuteState.of((Integer)t.getExecuteState()).isTerminal())) {
            if (this.schedulerJobManager.renewUpdateTime(track, now)) {
                this.log.info("All task terminal, terminate the sched track: {}", (Object)track.getTrackId());
                this.schedulerJobManager.terminate(track.getTrackId());
            }
            return;
        }
        List<SchedTask> waitingTasks = tasks.stream().filter(e -> ExecuteState.WAITING.equals(e.getExecuteState())).collect(Collectors.toList());
        if (CollectionUtils.isEmpty(waitingTasks)) {
            this.log.info("Not has waiting tasks: {}", (Object)track);
            return;
        }
        List<SchedTask> dispatchingTasks = this.schedulerJobManager.filterDispatchingTask(waitingTasks);
        if (CollectionUtils.isEmpty(dispatchingTasks)) {
            this.schedulerJobManager.renewUpdateTime(track, now);
            return;
        }
        SchedJob job = this.schedulerJobManager.getJob(track.getJobId());
        if (job == null) {
            this.log.error("Job not exists: {}, {}", (Object)track, tasks);
            this.schedulerJobManager.updateState(ExecuteState.DATA_INCONSISTENT, tasks, track);
            return;
        }
        if (this.schedulerJobManager.hasNotFoundWorkers(job.getJobGroup())) {
            this.schedulerJobManager.renewUpdateTime(track, now);
            this.log.warn("Scan track not found available group '{}' workers.", (Object)job.getJobGroup());
            return;
        }
        if (this.schedulerJobManager.renewUpdateTime(track, now)) {
            this.log.info("Redispatch sched track: {} | {}", (Object)track, (Object)Dates.format((Date)now));
            this.schedulerJobManager.dispatch(job, track, dispatchingTasks);
        }
    }
}

