/*
 * Decompiled with CFR 0.152.
 */
package cn.ponfee.disjob.supervisor.scanner;

import cn.ponfee.disjob.common.base.SingletonClassConstraint;
import cn.ponfee.disjob.common.concurrent.AbstractHeartbeatThread;
import cn.ponfee.disjob.common.concurrent.MultithreadExecutors;
import cn.ponfee.disjob.common.concurrent.NamedThreadFactory;
import cn.ponfee.disjob.common.concurrent.PeriodExecutor;
import cn.ponfee.disjob.common.concurrent.ThreadPoolExecutors;
import cn.ponfee.disjob.common.date.Dates;
import cn.ponfee.disjob.common.lock.LockTemplate;
import cn.ponfee.disjob.core.enums.CollidedStrategy;
import cn.ponfee.disjob.core.enums.JobState;
import cn.ponfee.disjob.core.enums.MisfireStrategy;
import cn.ponfee.disjob.core.enums.Operation;
import cn.ponfee.disjob.core.enums.RunState;
import cn.ponfee.disjob.core.enums.RunType;
import cn.ponfee.disjob.supervisor.base.TriggerTimes;
import cn.ponfee.disjob.supervisor.component.JobManager;
import cn.ponfee.disjob.supervisor.component.JobQuerier;
import cn.ponfee.disjob.supervisor.component.WorkerClient;
import cn.ponfee.disjob.supervisor.configuration.SupervisorProperties;
import cn.ponfee.disjob.supervisor.model.SchedInstance;
import cn.ponfee.disjob.supervisor.model.SchedJob;
import com.google.common.math.IntMath;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import javax.annotation.PreDestroy;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.dao.DuplicateKeyException;

public class TriggeringJobScanner
extends AbstractHeartbeatThread {
    private static final int REMARK_MAX_LENGTH = 255;
    private final int scanBatchSize;
    private final int jobScanFailedCountThreshold;
    private final JobManager jobManager;
    private final JobQuerier jobQuerier;
    private final WorkerClient workerClient;
    private final LockTemplate lockTemplate;
    private final long afterMilliseconds;
    private final ExecutorService processJobExecutor;
    private final PeriodExecutor logPrinter = new PeriodExecutor(30000L, () -> this.log.warn("Not discovered any worker."));

    public TriggeringJobScanner(SupervisorProperties conf, JobManager jobManager, JobQuerier jobQuerier, WorkerClient workerClient, LockTemplate lockTemplate) {
        super(conf.getScanTriggeringJobPeriodMs());
        SingletonClassConstraint.constrain((Object)((Object)this));
        this.scanBatchSize = conf.getScanBatchSize();
        this.jobScanFailedCountThreshold = conf.getJobScanFailedCountThreshold();
        this.jobManager = jobManager;
        this.jobQuerier = jobQuerier;
        this.workerClient = workerClient;
        this.lockTemplate = lockTemplate;
        this.afterMilliseconds = this.heartbeatPeriodMs * 3L;
        this.processJobExecutor = ThreadPoolExecutors.builder().corePoolSize(1).maximumPoolSize(Math.max(1, conf.getMaximumProcessJobPoolSize())).workQueue(new SynchronousQueue()).keepAliveTimeSeconds(300L).rejectedHandler(ThreadPoolExecutors.CALLER_RUNS).threadFactory((ThreadFactory)NamedThreadFactory.builder().prefix("triggering_job_scanner").priority(Integer.valueOf(10)).daemon(true).uncaughtExceptionHandler(this.log).build()).build();
    }

    protected boolean heartbeat() {
        if (!this.workerClient.hasAliveWorker()) {
            this.logPrinter.execute();
            return true;
        }
        Boolean result = (Boolean)this.lockTemplate.execute(() -> {
            Date now = new Date();
            long maxNextTriggerTime = now.getTime() + this.afterMilliseconds;
            List<SchedJob> jobs = this.jobQuerier.findBeTriggeringJob(maxNextTriggerTime, this.scanBatchSize);
            if (CollectionUtils.isEmpty(jobs)) {
                return true;
            }
            MultithreadExecutors.run(jobs, job -> this.processJob((SchedJob)((Object)((Object)job)), now, maxNextTriggerTime), (Executor)this.processJobExecutor);
            return jobs.size() < this.scanBatchSize;
        });
        return result != null && result != false;
    }

    @PreDestroy
    public void close() {
        super.close();
        ThreadPoolExecutors.shutdown((ExecutorService)this.processJobExecutor, (int)1);
    }

    private void processJob(SchedJob job, Date now, long maxNextTriggerTime) {
        try {
            if (!this.workerClient.hasAliveWorker(job.getGroup())) {
                this.updateNextScanTime(job, now, 60000L);
                this.log.warn("Scan job none alive worker: {}, {}", (Object)job.getJobId(), (Object)job.getGroup());
                return;
            }
            job.setNextTriggerTime(this.reComputeNextTriggerTime(job, now));
            if (job.getNextTriggerTime() == null) {
                this.disableJob(job, "Recompute disabled, none next trigger time");
                return;
            }
            long triggerTime = job.getNextTriggerTime();
            if (job.getLastTriggerTime() != null && job.getLastTriggerTime() >= triggerTime) {
                this.disableJob(job, "Recompute disabled, invalid next trigger time: " + triggerTime);
                return;
            }
            if (triggerTime > maxNextTriggerTime) {
                this.jobManager.updateJobNextTriggerTime(job);
                return;
            }
            if (this.shouldBlockCollidedTrigger(job, now)) {
                return;
            }
            TriggeringJobScanner.refreshNextTriggerTime(job, triggerTime, now);
            this.jobManager.scheduleTriggerJob(job, triggerTime);
        }
        catch (DuplicateKeyException e) {
            boolean updated = this.jobManager.updateJobNextTriggerTime(job);
            this.log.info("Update conflict next trigger time: {}, {}, {}", new Object[]{updated, job, e.getMessage()});
        }
        catch (IllegalArgumentException e) {
            this.log.error("Scan trigger job failed: " + (Object)((Object)job), (Throwable)e);
            this.disableJob(job, StringUtils.truncate((String)("Scan process failed: " + e.getMessage()), (int)255));
        }
        catch (Throwable t) {
            this.log.error("Scan trigger job error: " + (Object)((Object)job), t);
            if (job.getScanFailedCount() >= this.jobScanFailedCountThreshold) {
                this.disableJob(job, StringUtils.truncate((String)("Scan over failed: " + t.getMessage()), (int)255));
            }
            int scanFailedCount = job.incrementAndGetScanFailedCount();
            this.updateNextScanTime(job, now, (long)IntMath.pow((int)scanFailedCount, (int)2) * 5000L);
        }
    }

    private void disableJob(SchedJob job, String reason) {
        job.setRemark(reason);
        this.jobManager.disableJob(job);
    }

    private Long reComputeNextTriggerTime(SchedJob job, Date now) {
        if (job.isFixedTriggerType()) {
            return job.obtainNextTriggerTime();
        }
        if (job.getNextTriggerTime() + this.afterMilliseconds >= now.getTime()) {
            return job.getNextTriggerTime();
        }
        return TriggerTimes.computeNextTriggerTime(job, now);
    }

    private static Long doComputeNextTriggerTime(SchedJob job, Date now) {
        if (job.isFixedTriggerType()) {
            return Long.MAX_VALUE;
        }
        return TriggerTimes.computeNextTriggerTime(job, now);
    }

    private boolean shouldBlockCollidedTrigger(SchedJob job, Date now) {
        CollidedStrategy collidedStrategy = CollidedStrategy.of((int)job.getCollidedStrategy());
        Long lastTriggerTime = job.getLastTriggerTime();
        if (CollidedStrategy.CONCURRENT == collidedStrategy || lastTriggerTime == null) {
            return false;
        }
        SchedInstance lastInstance = this.jobQuerier.getInstance(job.getJobId(), lastTriggerTime, RunType.SCHEDULE);
        if (lastInstance == null || lastInstance.isCompleted()) {
            return false;
        }
        if (!RunState.of((int)lastInstance.getRunState()).isTerminal()) {
            this.blockCollidedTrigger(job, lastInstance, collidedStrategy, now);
            return true;
        }
        if (lastInstance.isWorkflow() || !Boolean.TRUE.equals(lastInstance.getRetrying())) {
            return false;
        }
        SchedInstance retryingInstance = this.jobQuerier.getRetryingInstance(lastInstance.getInstanceId());
        this.blockCollidedTrigger(job, retryingInstance, collidedStrategy, now);
        return true;
    }

    private void blockCollidedTrigger(SchedJob job, SchedInstance instance, CollidedStrategy strategy, Date now) {
        if (job.isFixedTriggerType()) {
            this.log.warn("Fixed trigger type instance collided: {}", (Object)instance);
        }
        if (job.getNextTriggerTime() > now.getTime()) {
            this.updateNextScanTime(job, now, job.getNextTriggerTime() - now.getTime());
            return;
        }
        if (strategy == CollidedStrategy.DISCARD) {
            Integer misfireStrategy = job.getMisfireStrategy();
            job.setMisfireStrategy(MisfireStrategy.SKIP_ALL_LOST.value());
            job.setLastTriggerTime(job.getNextTriggerTime());
            job.setNextTriggerTime(TriggeringJobScanner.doComputeNextTriggerTime(job, now));
            job.setMisfireStrategy(misfireStrategy);
            if (job.getNextTriggerTime() == null || job.getNextTriggerTime() < now.getTime()) {
                this.disableJob(job, "Collide disabled, invalid next trigger time: " + job.getNextTriggerTime());
            } else {
                this.jobManager.updateJobNextTriggerTime(job);
                this.updateNextScanTime(job, now, job.getNextTriggerTime() - now.getTime());
            }
        } else if (strategy == CollidedStrategy.SEQUENTIAL) {
            this.updateNextScanTime(job, now, 30000L);
        } else if (strategy == CollidedStrategy.OVERRIDE) {
            if (instance != null) {
                this.jobManager.cancelInstance(instance.getInstanceId(), Operation.COLLIDED_CANCEL);
            }
            this.updateNextScanTime(job, now, 3000L);
        } else {
            throw new UnsupportedOperationException("Unsupported collided strategy: " + strategy);
        }
    }

    private void updateNextScanTime(SchedJob job, Date now, long delayMillis) {
        job.setNextScanTime(Dates.plusMillis((Date)now, (long)delayMillis));
        this.jobManager.updateJobNextScanTime(job);
    }

    private static void refreshNextTriggerTime(SchedJob job, Long lastTriggerTime, Date now) {
        job.setLastTriggerTime(lastTriggerTime);
        job.setNextTriggerTime(TriggeringJobScanner.doComputeNextTriggerTime(job, now));
        if (job.getNextTriggerTime() == null) {
            job.setRemark("Refresh disabled, not next trigger time");
            job.setJobState(JobState.DISABLED.value());
        }
    }
}

