/*
 * Decompiled with CFR 0.152.
 */
package cn.ponfee.scheduler.supervisor.thread;

import cn.ponfee.scheduler.common.base.exception.CheckedThrowing;
import cn.ponfee.scheduler.common.date.Dates;
import cn.ponfee.scheduler.common.lock.DoInLocked;
import cn.ponfee.scheduler.core.base.AbstractHeartbeatThread;
import cn.ponfee.scheduler.core.enums.CollisionStrategy;
import cn.ponfee.scheduler.core.enums.JobState;
import cn.ponfee.scheduler.core.enums.MisfireStrategy;
import cn.ponfee.scheduler.core.enums.Operations;
import cn.ponfee.scheduler.core.enums.RunState;
import cn.ponfee.scheduler.core.enums.RunType;
import cn.ponfee.scheduler.core.exception.JobException;
import cn.ponfee.scheduler.core.model.SchedJob;
import cn.ponfee.scheduler.core.model.SchedTask;
import cn.ponfee.scheduler.core.model.SchedTrack;
import cn.ponfee.scheduler.supervisor.manager.SchedulerJobManager;
import cn.ponfee.scheduler.supervisor.util.TriggerTimeUtils;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.dao.DuplicateKeyException;

public class ScanTriggeringJobThread
extends AbstractHeartbeatThread {
    private static final int SCAN_COLLISION_INTERVAL_SECONDS = 60;
    private final DoInLocked doInLocked;
    private final SchedulerJobManager schedulerJobManager;
    private final long afterMilliseconds;

    public ScanTriggeringJobThread(long heartbeatPeriodMs0, DoInLocked doInLocked, SchedulerJobManager schedulerJobManager) {
        super(heartbeatPeriodMs0);
        this.doInLocked = doInLocked;
        this.schedulerJobManager = schedulerJobManager;
        this.afterMilliseconds = this.heartbeatPeriodMs << 2;
    }

    protected boolean heartbeat() {
        if (this.schedulerJobManager.hasNotFoundWorkers()) {
            this.log.warn("Not found available worker.");
            return true;
        }
        Boolean result = (Boolean)this.doInLocked.apply(() -> {
            Date now = new Date();
            long maxNextTriggerTime = now.getTime() + this.afterMilliseconds;
            List<SchedJob> jobs = this.schedulerJobManager.findBeTriggering(maxNextTriggerTime, 200);
            if (jobs == null || jobs.isEmpty()) {
                return true;
            }
            for (SchedJob job : jobs) {
                this.processJob(job, now, maxNextTriggerTime);
            }
            return false;
        });
        return result == null || result != false;
    }

    private void processJob(SchedJob job, Date now, long maxNextTriggerTime) {
        try {
            job.setNextTriggerTime(this.recomputeNextTriggerTime(job, now));
            if (job.getNextTriggerTime() == null) {
                job.setRemark("Stop recompute reason: has not next trigger time");
                this.log.info(job.getRemark() + ": " + job);
                this.schedulerJobManager.stopJob(job);
                return;
            }
            if (job.getNextTriggerTime() > maxNextTriggerTime) {
                this.schedulerJobManager.updateNextTriggerTime(job);
                return;
            }
            if (this.schedulerJobManager.hasNotFoundWorkers(job.getJobGroup())) {
                this.updateNextScanTime(job, now, 15);
                this.log.warn("Scan job not found available group '{}' workers.", (Object)job.getJobGroup());
                return;
            }
            if (this.checkBlockCollisionTrigger(job, now)) {
                return;
            }
            SchedTrack track = SchedTrack.create((long)this.schedulerJobManager.generateId(), (long)job.getJobId(), (RunType)RunType.SCHEDULE, (long)job.getNextTriggerTime(), (int)0, (Date)now);
            List<SchedTask> tasks = this.schedulerJobManager.splitTasks(job, track.getTrackId(), now);
            ScanTriggeringJobThread.refreshNextTriggerTime(job, job.getNextTriggerTime(), now);
            if (this.schedulerJobManager.updateAndSave(job, track, tasks)) {
                this.schedulerJobManager.dispatch(job, track, tasks);
            }
        }
        catch (DuplicateKeyException e) {
            if (this.schedulerJobManager.updateNextTriggerTime(job)) {
                this.log.info("Conflict trigger time: {} | {}", (Object)job, (Object)e.getMessage());
            } else {
                this.log.error("Conflict trigger time: {} | {}", (Object)job, (Object)e.getMessage());
            }
        }
        catch (JobException | IllegalArgumentException e) {
            this.log.error(e.getMessage() + ": " + job, e);
            job.setRemark("Stop reason: " + e.getMessage());
            job.setNextTriggerTime(null);
            this.schedulerJobManager.stopJob(job);
        }
        catch (Exception e) {
            this.log.error("Process handle job occur error: " + job, (Throwable)e);
        }
    }

    private Long recomputeNextTriggerTime(SchedJob job, Date now) {
        if (now.getTime() <= job.getNextTriggerTime() + this.afterMilliseconds) {
            return job.getNextTriggerTime();
        }
        return TriggerTimeUtils.computeNextTriggerTime(job, now);
    }

    private boolean checkBlockCollisionTrigger(SchedJob job, Date now) {
        Long lastTriggerTime;
        CollisionStrategy collisionStrategy = CollisionStrategy.of((Integer)job.getCollisionStrategy());
        if (CollisionStrategy.CONCURRENT == collisionStrategy || (lastTriggerTime = job.getLastTriggerTime()) == null) {
            return false;
        }
        SchedTrack lastTrack = this.schedulerJobManager.getByTriggerTime(job.getJobId(), lastTriggerTime, RunType.SCHEDULE.value());
        if (lastTrack == null) {
            return false;
        }
        long trackId = lastTrack.getTrackId();
        RunState runState = RunState.of((Integer)lastTrack.getRunState());
        switch (runState) {
            case FINISHED: {
                return false;
            }
            case WAITING: 
            case PAUSED: {
                return this.checkBlockCollisionTrigger(job, Collections.singletonList(trackId), collisionStrategy, now);
            }
            case RUNNING: {
                List<SchedTask> tasks = this.schedulerJobManager.findMediumTaskByTrackId(trackId);
                if (this.schedulerJobManager.hasAliveExecuting(tasks)) {
                    return this.checkBlockCollisionTrigger(job, Collections.singletonList(trackId), collisionStrategy, now);
                }
                this.log.info("Collision, all worker dead, terminate the sched track: {}", (Object)trackId);
                this.schedulerJobManager.cancelTrack(trackId, Operations.COLLISION_CANCEL);
                return false;
            }
            case CANCELED: {
                List<SchedTrack> list = this.schedulerJobManager.findUnterminatedRetry(trackId);
                if (CollectionUtils.isEmpty(list)) {
                    return false;
                }
                List<Long> trackIds = list.stream().map(SchedTrack::getTrackId).collect(Collectors.toList());
                return this.checkBlockCollisionTrigger(job, trackIds, collisionStrategy, now);
            }
        }
        throw new UnsupportedOperationException("Unsupported run state: " + runState.name());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean checkBlockCollisionTrigger(SchedJob job, List<Long> trackIds, CollisionStrategy collisionStrategy, Date now) {
        switch (collisionStrategy) {
            case DISCARD: {
                Integer misfireStrategy = job.getMisfireStrategy();
                try {
                    job.setMisfireStrategy(Integer.valueOf(MisfireStrategy.DISCARD.value()));
                    job.setNextTriggerTime(TriggerTimeUtils.computeNextTriggerTime(job, now));
                }
                finally {
                    job.setMisfireStrategy(misfireStrategy);
                }
                if (job.getNextTriggerTime() == null) {
                    job.setRemark("Disable collision reason: has not next trigger time.");
                    job.setJobState(Integer.valueOf(JobState.DISABLE.value()));
                }
                this.schedulerJobManager.updateNextTriggerTime(job);
                return true;
            }
            case SERIAL: {
                this.updateNextScanTime(job, now, 60);
                return true;
            }
            case OVERRIDE: {
                trackIds.forEach(trackId -> CheckedThrowing.supplier(() -> this.schedulerJobManager.cancelTrack((long)trackId, Operations.COLLISION_CANCEL)));
                return false;
            }
        }
        throw new UnsupportedOperationException("Unsupported collision strategy: " + collisionStrategy.name());
    }

    private void updateNextScanTime(SchedJob job, Date now, int delayedSeconds) {
        Date nextScanTime = Dates.plusSeconds((Date)now, (int)delayedSeconds);
        this.schedulerJobManager.updateNextScanTime(job.getJobId(), nextScanTime, job.getVersion());
    }

    private static void refreshNextTriggerTime(SchedJob job, Long lastTriggerTime, Date now) {
        job.setLastTriggerTime(lastTriggerTime);
        job.setNextTriggerTime(TriggerTimeUtils.computeNextTriggerTime(job, now));
        if (job.getNextTriggerTime() == null) {
            job.setRemark("Disable refresh reason: has not next trigger time");
            job.setJobState(Integer.valueOf(JobState.DISABLE.value()));
        }
    }
}

