/*
 * Decompiled with CFR 0.152.
 */
package cn.ponfee.scheduler.supervisor;

import cn.ponfee.scheduler.common.date.Dates;
import cn.ponfee.scheduler.common.lock.DoInLocked;
import cn.ponfee.scheduler.core.base.AbstractHeartbeatThread;
import cn.ponfee.scheduler.core.enums.ExecuteState;
import cn.ponfee.scheduler.core.model.SchedJob;
import cn.ponfee.scheduler.core.model.SchedTask;
import cn.ponfee.scheduler.core.model.SchedTrack;
import cn.ponfee.scheduler.supervisor.manager.JobManager;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;

public class ScanTrackHeartbeatThread
extends AbstractHeartbeatThread {
    private static final int QUERY_BATCH_SIZE = 200;
    private static final long EXPIRE_WAITING_MILLISECONDS = 60000L;
    private static final long EXPIRE_RUNNING_MILLISECONDS = 120000L;
    private final DoInLocked doInLocked;
    private final JobManager jobManager;
    private long nextScanExpireRunningTimeMillis = 0L;

    public ScanTrackHeartbeatThread(int heartbeatIntervalSeconds, DoInLocked doInLocked, JobManager jobManager) {
        super(heartbeatIntervalSeconds);
        this.doInLocked = doInLocked;
        this.jobManager = jobManager;
    }

    protected boolean heartbeat() {
        if (this.jobManager.hasNotFoundWorkers()) {
            return false;
        }
        Boolean result = (Boolean)this.doInLocked.apply(() -> {
            Date now = new Date();
            return this.processExpireWaiting(now) || this.processExpireRunning(now);
        });
        return result != null && result != false;
    }

    private boolean processExpireWaiting(Date now) {
        long expireTime = now.getTime() - 60000L;
        List<SchedTrack> tracks = this.jobManager.findExpireWaiting(expireTime, new Date(expireTime), 200);
        if (CollectionUtils.isEmpty(tracks)) {
            return false;
        }
        for (SchedTrack track : tracks) {
            this.processExpireWaiting(track, now);
        }
        return tracks.size() == 200;
    }

    private void processExpireWaiting(SchedTrack track, Date now) {
        List<SchedTask> tasks = this.jobManager.findTasks(track.getTrackId());
        if (tasks.stream().allMatch(t -> ExecuteState.of((Integer)t.getExecuteState()).isTerminal())) {
            if (this.jobManager.renewUpdateTime(track, now)) {
                this.log.info("All task terminal, terminate the sched track: {}", (Object)track.getTrackId());
                this.jobManager.terminate(track.getTrackId());
            }
            return;
        }
        List<SchedTask> waitingTasks = tasks.stream().filter(e -> ExecuteState.WAITING.equals(e.getExecuteState())).collect(Collectors.toList());
        if (CollectionUtils.isEmpty(waitingTasks)) {
            this.log.info("Not has waiting tasks: {}", (Object)track);
            return;
        }
        SchedJob job = this.jobManager.getJob(track.getJobId());
        if (job == null) {
            this.log.error("Job not exists: {}, {}", (Object)track, tasks);
            this.jobManager.updateState(ExecuteState.DATA_INCONSISTENT, tasks, track);
            return;
        }
        if (this.jobManager.hasNotFoundWorkers(job.getJobGroup())) {
            this.jobManager.renewUpdateTime(track, now);
            this.log.warn("Scan track not found available group '{}' workers.", (Object)job.getJobGroup());
            return;
        }
        if (this.jobManager.renewUpdateTime(track, now)) {
            this.log.info("Redispatch sched track: {} - {}", (Object)track, (Object)Dates.format((Date)now));
            this.jobManager.dispatch(job, track, waitingTasks);
        }
    }

    private boolean processExpireRunning(Date now) {
        if (now.getTime() < this.nextScanExpireRunningTimeMillis) {
            return false;
        }
        long expireTime = now.getTime() - 120000L;
        List<SchedTrack> tracks = this.jobManager.findExpireRunning(expireTime, new Date(expireTime), 200);
        if (CollectionUtils.isEmpty(tracks) || tracks.size() < 200) {
            this.nextScanExpireRunningTimeMillis = now.getTime() + 120000L;
        }
        if (CollectionUtils.isEmpty(tracks)) {
            return false;
        }
        for (SchedTrack track : tracks) {
            this.processExpireRunning(track, now);
        }
        return tracks.size() == 200;
    }

    private void processExpireRunning(SchedTrack track, Date now) {
        List<SchedTask> tasks = this.jobManager.findByTrackId(track.getTrackId());
        List<SchedTask> waitingTasks = tasks.stream().filter(e -> ExecuteState.WAITING.equals(e.getExecuteState())).collect(Collectors.toList());
        if (CollectionUtils.isNotEmpty(waitingTasks)) {
            if (this.jobManager.renewUpdateTime(track, now)) {
                this.log.info("Redispatch sched track: {} - {}", (Object)track, (Object)Dates.format((Date)now));
                this.jobManager.dispatch(this.jobManager.getJob(track.getJobId()), track, waitingTasks);
            }
            return;
        }
        boolean hasAliveExecuting = tasks.stream().filter(e -> ExecuteState.EXECUTING.equals(e.getExecuteState())).map(SchedTask::getWorker).filter(StringUtils::isNotBlank).anyMatch(this.jobManager::isAliveWorker);
        if (hasAliveExecuting) {
            this.jobManager.renewUpdateTime(track, now);
            return;
        }
        this.log.info("Scan track, all worker dead, terminate the sched track: {}", (Object)track.getTrackId());
        this.jobManager.terminate(track.getTrackId());
    }
}

