/*
 * Decompiled with CFR 0.152.
 */
package cn.ponfee.disjob.supervisor.thread;

import cn.ponfee.disjob.common.concurrent.NamedThreadFactory;
import cn.ponfee.disjob.common.concurrent.ThreadPoolExecutors;
import cn.ponfee.disjob.common.date.Dates;
import cn.ponfee.disjob.common.exception.Throwables;
import cn.ponfee.disjob.common.lock.DoInLocked;
import cn.ponfee.disjob.core.base.AbstractHeartbeatThread;
import cn.ponfee.disjob.core.enums.CollidedStrategy;
import cn.ponfee.disjob.core.enums.JobState;
import cn.ponfee.disjob.core.enums.MisfireStrategy;
import cn.ponfee.disjob.core.enums.Operations;
import cn.ponfee.disjob.core.enums.RunState;
import cn.ponfee.disjob.core.enums.RunType;
import cn.ponfee.disjob.core.model.SchedInstance;
import cn.ponfee.disjob.core.model.SchedJob;
import cn.ponfee.disjob.core.model.SchedTask;
import cn.ponfee.disjob.supervisor.instance.TriggerInstanceCreator;
import cn.ponfee.disjob.supervisor.service.DistributedJobManager;
import cn.ponfee.disjob.supervisor.service.DistributedJobQuerier;
import cn.ponfee.disjob.supervisor.util.TriggerTimeUtils;
import com.google.common.math.IntMath;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.dao.DuplicateKeyException;

public class TriggeringJobScanner
extends AbstractHeartbeatThread {
    private static final int SCAN_COLLIDED_INTERVAL_SECONDS = 60;
    private static final int REMARK_MAX_LENGTH = 255;
    private static final int FAILED_SCAN_COUNT_THRESHOLD = 5;
    private final DoInLocked doInLocked;
    private final DistributedJobManager jobManager;
    private final DistributedJobQuerier jobQuerier;
    private final long afterMilliseconds;
    private final ExecutorService processJobExecutor;

    public TriggeringJobScanner(long heartbeatPeriodMilliseconds, int processJobMaximumPoolSize, DoInLocked doInLocked, DistributedJobManager jobManager, DistributedJobQuerier jobQuerier) {
        super(heartbeatPeriodMilliseconds);
        this.doInLocked = doInLocked;
        this.jobManager = jobManager;
        this.jobQuerier = jobQuerier;
        this.afterMilliseconds = this.heartbeatPeriodMs * 3L;
        this.processJobExecutor = ThreadPoolExecutors.builder().corePoolSize(1).maximumPoolSize(Math.max(1, processJobMaximumPoolSize)).workQueue(new SynchronousQueue()).keepAliveTimeSeconds(300L).rejectedHandler(ThreadPoolExecutors.CALLER_RUNS).threadFactory((ThreadFactory)NamedThreadFactory.builder().prefix("triggering_job_scanner").priority(Integer.valueOf(10)).build()).prestartCoreThreadType(ThreadPoolExecutors.PrestartCoreThreadType.ONE).build();
    }

    protected boolean heartbeat() {
        if (this.jobManager.hasNotDiscoveredWorkers()) {
            this.log.warn("Not discovered worker.");
            return true;
        }
        Boolean result = (Boolean)this.doInLocked.action(() -> {
            Date now = new Date();
            long maxNextTriggerTime = now.getTime() + this.afterMilliseconds;
            List<SchedJob> jobs = this.jobQuerier.findBeTriggeringJob(maxNextTriggerTime, 200);
            if (CollectionUtils.isEmpty(jobs)) {
                return true;
            }
            jobs.stream().map(job -> CompletableFuture.runAsync(() -> this.processJob((SchedJob)job, now, maxNextTriggerTime), this.processJobExecutor)).collect(Collectors.toList()).forEach(CompletableFuture::join);
            return jobs.size() < 200;
        });
        return result != null && result != false;
    }

    public void close() {
        super.close();
        Throwables.ThrowingSupplier.caught(() -> ThreadPoolExecutors.shutdown((ExecutorService)this.processJobExecutor, (int)3));
    }

    private void processJob(SchedJob job, Date now, long maxNextTriggerTime) {
        try {
            job.setNextTriggerTime(this.recomputeNextTriggerTime(job, now));
            if (job.getNextTriggerTime() == null) {
                String reason = "Recompute has not next trigger time";
                job.setRemark(reason);
                this.log.info("{} | {}", (Object)reason, (Object)job);
                this.jobManager.stopJob(job);
                return;
            }
            if (job.getNextTriggerTime() > maxNextTriggerTime) {
                this.jobManager.updateJobNextTriggerTime(job);
                return;
            }
            if (this.jobManager.hasNotDiscoveredWorkers(job.getJobGroup())) {
                this.updateNextScanTime(job, now, 15);
                this.log.warn("Scan job not discovered worker: {} | {}", (Object)job.getJobId(), (Object)job.getJobGroup());
                return;
            }
            if (this.checkBlockCollidedTrigger(job, now)) {
                return;
            }
            long triggerTime = job.getNextTriggerTime();
            TriggeringJobScanner.refreshNextTriggerTime(job, triggerTime, now);
            TriggerInstanceCreator<?> creator = TriggerInstanceCreator.of(job.getJobType(), this.jobManager);
            creator.createWithSaveAndDispatch(job, RunType.SCHEDULE, triggerTime);
        }
        catch (DuplicateKeyException e) {
            if (this.jobManager.updateJobNextTriggerTime(job)) {
                this.log.info("Conflict trigger time: {} | {}", (Object)job, (Object)e.getMessage());
            } else {
                this.log.error("Conflict trigger time: {} | {}", (Object)job, (Object)e.getMessage());
            }
        }
        catch (IllegalArgumentException e) {
            this.log.error("Scan trigger job failed: " + job, (Throwable)e);
            job.setRemark(StringUtils.truncate((String)("Scan process failed: " + e.getMessage()), (int)255));
            job.setNextTriggerTime(null);
            this.jobManager.stopJob(job);
        }
        catch (Throwable t) {
            this.log.error("Scan trigger job error: " + job, t);
            if (job.getFailedScanCount() >= 5) {
                job.setRemark(StringUtils.truncate((String)("Scan over failed: " + t.getMessage()), (int)255));
                job.setNextTriggerTime(null);
                this.jobManager.stopJob(job);
            }
            int failedScanCount = job.incrementAndGetFailedScanCount();
            this.updateNextScanTime(job, now, IntMath.pow((int)failedScanCount, (int)2) * 5);
        }
    }

    private Long recomputeNextTriggerTime(SchedJob job, Date now) {
        if (now.getTime() <= job.getNextTriggerTime() + this.afterMilliseconds) {
            return job.getNextTriggerTime();
        }
        return TriggerTimeUtils.computeNextTriggerTime(job, now);
    }

    private boolean checkBlockCollidedTrigger(SchedJob job, Date now) {
        Long lastTriggerTime;
        CollidedStrategy collidedStrategy = CollidedStrategy.of((Integer)job.getCollidedStrategy());
        if (CollidedStrategy.CONCURRENT == collidedStrategy || (lastTriggerTime = job.getLastTriggerTime()) == null) {
            return false;
        }
        SchedInstance lastInstance = this.jobQuerier.getInstance(job.getJobId(), lastTriggerTime, RunType.SCHEDULE.value());
        if (lastInstance == null) {
            return false;
        }
        long instanceId = lastInstance.getInstanceId();
        RunState runState = RunState.of((Integer)lastInstance.getRunState());
        switch (runState) {
            case FINISHED: {
                return false;
            }
            case WAITING: 
            case PAUSED: {
                return this.checkBlockCollidedTrigger(job, Collections.singletonList(lastInstance), collidedStrategy, now);
            }
            case RUNNING: {
                List<SchedTask> tasks = this.jobQuerier.findBaseInstanceTasks(instanceId);
                if (this.jobManager.hasAliveExecuting(tasks)) {
                    return this.checkBlockCollidedTrigger(job, Collections.singletonList(lastInstance), collidedStrategy, now);
                }
                this.log.info("All worker dead, terminate collided sched instance: {}", (Object)instanceId);
                this.jobManager.cancelInstance(instanceId, lastInstance.getWnstanceId(), Operations.COLLIDED_CANCEL);
                return false;
            }
            case CANCELED: {
                List<SchedInstance> list = this.jobQuerier.findUnterminatedRetryInstance(instanceId);
                if (CollectionUtils.isEmpty(list)) {
                    return false;
                }
                return this.checkBlockCollidedTrigger(job, list, collidedStrategy, now);
            }
        }
        throw new UnsupportedOperationException("Unsupported run state: " + runState.name());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean checkBlockCollidedTrigger(SchedJob job, List<SchedInstance> instances, CollidedStrategy collidedStrategy, Date now) {
        switch (collidedStrategy) {
            case DISCARD: {
                Integer misfireStrategy = job.getMisfireStrategy();
                try {
                    job.setMisfireStrategy(Integer.valueOf(MisfireStrategy.DISCARD.value()));
                    job.setNextTriggerTime(TriggerTimeUtils.computeNextTriggerTime(job, now));
                }
                finally {
                    job.setMisfireStrategy(misfireStrategy);
                }
                if (job.getNextTriggerTime() == null) {
                    job.setRemark("Disable collided reason: has not next trigger time.");
                    job.setJobState(Integer.valueOf(JobState.DISABLE.value()));
                }
                this.jobManager.updateJobNextTriggerTime(job);
                return true;
            }
            case SERIAL: {
                this.updateNextScanTime(job, now, 60);
                return true;
            }
            case OVERRIDE: {
                instances.forEach(e -> this.jobManager.cancelInstance(e.getInstanceId(), e.getWnstanceId(), Operations.COLLIDED_CANCEL));
                return false;
            }
        }
        throw new UnsupportedOperationException("Unsupported collided strategy: " + collidedStrategy.name());
    }

    private void updateNextScanTime(SchedJob job, Date now, int delayedSeconds) {
        job.setNextScanTime(Dates.plusSeconds((Date)now, (long)delayedSeconds));
        this.jobManager.updateJobNextScanTime(job);
    }

    private static void refreshNextTriggerTime(SchedJob job, Long lastTriggerTime, Date now) {
        job.setLastTriggerTime(lastTriggerTime);
        job.setNextTriggerTime(TriggerTimeUtils.computeNextTriggerTime(job, now));
        if (job.getNextTriggerTime() == null) {
            job.setRemark("Disable refresh reason: has not next trigger time");
            job.setJobState(Integer.valueOf(JobState.DISABLE.value()));
        }
    }
}

