/*
 * Decompiled with CFR 0.152.
 */
package cn.ponfee.disjob.supervisor.component;

import cn.ponfee.disjob.alert.base.AlertInstanceEvent;
import cn.ponfee.disjob.common.base.IdGenerator;
import cn.ponfee.disjob.common.base.TriConsumer;
import cn.ponfee.disjob.common.collect.Collects;
import cn.ponfee.disjob.common.dag.DAGEdge;
import cn.ponfee.disjob.common.dag.DAGNode;
import cn.ponfee.disjob.common.date.Dates;
import cn.ponfee.disjob.common.exception.Throwables;
import cn.ponfee.disjob.common.model.BaseEntity;
import cn.ponfee.disjob.common.spring.TransactionUtils;
import cn.ponfee.disjob.common.tuple.Tuple2;
import cn.ponfee.disjob.common.tuple.Tuple3;
import cn.ponfee.disjob.common.util.Functions;
import cn.ponfee.disjob.common.util.Strings;
import cn.ponfee.disjob.core.base.CoreUtils;
import cn.ponfee.disjob.core.base.Worker;
import cn.ponfee.disjob.core.dag.PredecessorInstance;
import cn.ponfee.disjob.core.dto.supervisor.StartTaskParam;
import cn.ponfee.disjob.core.dto.supervisor.StartTaskResult;
import cn.ponfee.disjob.core.dto.supervisor.StopTaskParam;
import cn.ponfee.disjob.core.dto.worker.SplitJobParam;
import cn.ponfee.disjob.core.enums.ExecuteState;
import cn.ponfee.disjob.core.enums.JobState;
import cn.ponfee.disjob.core.enums.Operation;
import cn.ponfee.disjob.core.enums.RetryType;
import cn.ponfee.disjob.core.enums.RouteStrategy;
import cn.ponfee.disjob.core.enums.RunState;
import cn.ponfee.disjob.core.enums.RunType;
import cn.ponfee.disjob.core.enums.TriggerType;
import cn.ponfee.disjob.core.exception.JobException;
import cn.ponfee.disjob.dispatch.ExecuteTaskParam;
import cn.ponfee.disjob.dispatch.event.TaskDispatchFailedEvent;
import cn.ponfee.disjob.supervisor.base.ExecuteTaskParamBuilder;
import cn.ponfee.disjob.supervisor.base.ModelConverter;
import cn.ponfee.disjob.supervisor.base.TriggerTimes;
import cn.ponfee.disjob.supervisor.component.WorkerClient;
import cn.ponfee.disjob.supervisor.configuration.SupervisorProperties;
import cn.ponfee.disjob.supervisor.dag.WorkflowGraph;
import cn.ponfee.disjob.supervisor.dao.mapper.SchedDependMapper;
import cn.ponfee.disjob.supervisor.dao.mapper.SchedInstanceMapper;
import cn.ponfee.disjob.supervisor.dao.mapper.SchedJobMapper;
import cn.ponfee.disjob.supervisor.dao.mapper.SchedTaskMapper;
import cn.ponfee.disjob.supervisor.dao.mapper.SchedWorkflowMapper;
import cn.ponfee.disjob.supervisor.exception.KeyExistsException;
import cn.ponfee.disjob.supervisor.instance.TriggerInstance;
import cn.ponfee.disjob.supervisor.model.SchedDepend;
import cn.ponfee.disjob.supervisor.model.SchedInstance;
import cn.ponfee.disjob.supervisor.model.SchedJob;
import cn.ponfee.disjob.supervisor.model.SchedTask;
import cn.ponfee.disjob.supervisor.model.SchedWorkflow;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.LongFunction;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.Assert;

@Component
public class JobManager {
    private static final Logger LOG = LoggerFactory.getLogger(JobManager.class);
    private static final Comparator<Tuple2<Worker, Long>> WORKLOAD_COMPARATOR = Comparator.comparingLong(e -> (Long)e.b);
    private static final List<Integer> RS_TERMINABLE = ImmutableList.of((Object)RunState.WAITING.value(), (Object)RunState.RUNNING.value(), (Object)RunState.PAUSED.value());
    private static final List<Integer> RS_RUNNABLE = ImmutableList.of((Object)RunState.WAITING.value(), (Object)RunState.PAUSED.value());
    private static final List<Integer> RS_PAUSABLE = ImmutableList.of((Object)RunState.WAITING.value(), (Object)RunState.RUNNING.value());
    private static final List<Integer> RS_WAITING = ImmutableList.of((Object)RunState.WAITING.value());
    private static final List<Integer> RS_RUNNING = ImmutableList.of((Object)RunState.RUNNING.value());
    private static final List<Integer> RS_PAUSED = ImmutableList.of((Object)RunState.PAUSED.value());
    private static final List<Integer> ES_EXECUTABLE = ImmutableList.of((Object)ExecuteState.WAITING.value(), (Object)ExecuteState.PAUSED.value());
    private static final List<Integer> ES_PAUSABLE = ImmutableList.of((Object)ExecuteState.WAITING.value(), (Object)ExecuteState.EXECUTING.value());
    private static final List<Integer> ES_WAITING = ImmutableList.of((Object)ExecuteState.WAITING.value());
    private static final List<Integer> ES_EXECUTING = ImmutableList.of((Object)ExecuteState.EXECUTING.value());
    private static final List<Integer> ES_PAUSED = ImmutableList.of((Object)ExecuteState.PAUSED.value());
    private static final List<Integer> ES_COMPLETED = ImmutableList.of((Object)ExecuteState.COMPLETED.value());
    private final SupervisorProperties conf;
    private final IdGenerator idGenerator;
    private final SchedJobMapper jobMapper;
    private final SchedDependMapper dependMapper;
    private final SchedInstanceMapper instanceMapper;
    private final SchedWorkflowMapper workflowMapper;
    private final SchedTaskMapper taskMapper;
    private final WorkerClient workerClient;
    private final ApplicationEventPublisher eventPublisher;
    private final TransactionTemplate transactionTemplate;

    public JobManager(SupervisorProperties conf, IdGenerator idGenerator, SchedJobMapper jobMapper, SchedDependMapper dependMapper, SchedInstanceMapper instanceMapper, SchedWorkflowMapper workflowMapper, SchedTaskMapper taskMapper, WorkerClient workerClient, ApplicationEventPublisher eventPublisher, @Qualifier(value="disjobTransactionTemplate") TransactionTemplate txTemplate) {
        conf.check();
        this.conf = conf;
        this.idGenerator = idGenerator;
        this.jobMapper = jobMapper;
        this.dependMapper = dependMapper;
        this.instanceMapper = instanceMapper;
        this.workflowMapper = workflowMapper;
        this.taskMapper = taskMapper;
        this.workerClient = workerClient;
        this.eventPublisher = eventPublisher;
        this.transactionTemplate = txTemplate;
    }

    public long generateId() {
        return this.idGenerator.generateId();
    }

    public List<SchedTask> splitJob(String group, long instanceId, SplitJobParam param) throws JobException {
        return this.workerClient.splitJob(group, instanceId, param, this.idGenerator, this.conf.getMaximumSplitTaskSize());
    }

    public void redispatch(SchedJob job, SchedInstance instance, List<SchedTask> tasks) {
        this.dispatch(true, job, instance, tasks);
    }

    public void disableJob(SchedJob job) {
        this.jobMapper.disable(job);
    }

    public boolean updateJobNextTriggerTime(SchedJob job) {
        return TransactionUtils.isOneAffectedRow((int)this.jobMapper.updateNextTriggerTime(job));
    }

    public void updateJobNextScanTime(SchedJob job) {
        this.jobMapper.updateNextScanTime(job);
    }

    public boolean updateInstanceNextScanTime(SchedInstance inst, Date nextScanTime) {
        Assert.notNull((Object)nextScanTime, (String)"Instance next scan time cannot be null.");
        return TransactionUtils.isOneAffectedRow((int)this.instanceMapper.updateNextScanTime(inst.getInstanceId(), nextScanTime, inst.getVersion()));
    }

    public boolean savepoint(long taskId, String worker, String executeSnapshot) {
        CoreUtils.checkClobMaximumLength((String)executeSnapshot, (String)"Execute snapshot");
        return TransactionUtils.isOneAffectedRow((int)this.taskMapper.savepoint(taskId, worker, executeSnapshot));
    }

    @Transactional(transactionManager="disjobTransactionManager", rollbackFor={Exception.class})
    public Long addJob(SchedJob job) throws JobException {
        job.verifyForAdd(this.conf.getMaximumJobRetryCount());
        if (this.jobMapper.getJobId(job.getGroup(), job.getJobName()) != null) {
            throw new KeyExistsException("Exists job name: " + job.getJobName());
        }
        this.workerClient.verifyJob(job);
        job.setJobId(this.generateId());
        this.parseTriggerConfig(job);
        this.jobMapper.insert(job);
        return job.getJobId();
    }

    @Transactional(transactionManager="disjobTransactionManager", rollbackFor={Exception.class})
    public void updateJob(SchedJob job) throws JobException {
        Long jobId;
        job.verifyForUpdate(this.conf.getMaximumJobRetryCount());
        if (job.isNeedUpdateExecutor()) {
            this.workerClient.verifyJob(job);
        }
        if ((jobId = this.jobMapper.getJobId(job.getGroup(), job.getJobName())) != null && !jobId.equals(job.getJobId())) {
            throw new IllegalArgumentException("Exists job name: " + job.getJobName());
        }
        SchedJob dbJob = this.jobMapper.get(job.getJobId());
        Assert.notNull((Object)((Object)dbJob), () -> "Sched job id not found " + job.getJobId());
        Assert.isTrue((boolean)dbJob.getGroup().equals(job.getGroup()), (String)"Job group cannot be modify.");
        if (job.isNeedUpdateTrigger(dbJob.getTriggerType(), dbJob.getTriggerValue())) {
            this.dependMapper.deleteByChildJobId(job.getJobId());
            this.parseTriggerConfig(job);
        }
        TransactionUtils.assertOneAffectedRow((int)this.jobMapper.update(job), (String)"Update sched job fail or conflict.");
    }

    @Transactional(transactionManager="disjobTransactionManager", rollbackFor={Exception.class})
    public void deleteJob(String user, long jobId) {
        SchedJob job = this.jobMapper.get(jobId);
        Assert.notNull((Object)((Object)job), () -> "Job id not found: " + jobId);
        Assert.state((!job.isEnabled() ? 1 : 0) != 0, (String)"Please disable job before delete this job.");
        TransactionUtils.assertOneAffectedRow((int)this.jobMapper.softDelete(jobId, user), (String)"Delete sched job fail or conflict.");
        this.dependMapper.deleteByParentJobId(jobId);
        this.dependMapper.deleteByChildJobId(jobId);
    }

    @Transactional(transactionManager="disjobTransactionManager", rollbackFor={Exception.class})
    public void changeJobState(String user, long jobId, JobState toState) {
        if (TransactionUtils.isNotAffectedRow((int)this.jobMapper.updateState(jobId, user, toState.value(), 1 ^ toState.value()))) {
            throw new IllegalStateException("Change job state failed: " + jobId);
        }
        if (toState == JobState.ENABLED) {
            this.updateNextTriggerTime(this.jobMapper.get(jobId));
        }
    }

    @Transactional(transactionManager="disjobTransactionManager", rollbackFor={Exception.class})
    public void manualTriggerJob(long jobId) throws JobException {
        this.triggerJob(this.getRequiredJob(jobId), RunType.MANUAL, System.currentTimeMillis());
    }

    @Transactional(transactionManager="disjobTransactionManager", rollbackFor={Exception.class})
    public void scheduleTriggerJob(SchedJob job, long triggerTime) throws JobException {
        if (TransactionUtils.isOneAffectedRow((int)this.jobMapper.updateNextTriggerTime(job))) {
            this.triggerJob(job, RunType.SCHEDULE, triggerTime);
        } else {
            LOG.warn("Schedule trigger job unsuccessful: {}, {}", (Object)job.getJobId(), (Object)triggerTime);
        }
    }

    @Transactional(transactionManager="disjobTransactionManager", rollbackFor={Exception.class})
    public void updateTaskWorker(String worker, List<Long> taskIds) {
        if (CollectionUtils.isNotEmpty(taskIds)) {
            taskIds = taskIds.stream().distinct().sorted().collect(Collectors.toList());
            Collects.batchProcess(taskIds, ids -> this.taskMapper.batchUpdateWorker(worker, (List<Long>)ids), (int)100);
        }
    }

    @EventListener
    public void onTaskDispatchFailedEvent(TaskDispatchFailedEvent event) {
        this.transactionTemplate.executeWithoutResult(status -> {
            long taskId = event.getTaskId();
            if (!this.shouldTerminateDispatchFailedTask(taskId)) {
                return;
            }
            if (!this.taskMapper.terminate(taskId, null, ExecuteState.DISPATCH_FAILED, ExecuteState.WAITING, null, null)) {
                LOG.warn("Terminate dispatch failed task unsuccessful: {}", (Object)taskId);
            }
        });
    }

    public StartTaskResult startTask(StartTaskParam param) {
        param.check();
        return this.doInSynchronizedTransaction0(param.getInstanceId(), param.getWnstanceId(), lockInstanceId -> {
            String startRequestId = param.getStartRequestId();
            LOG.info("Task trace [{}] starting: {}, {}", new Object[]{param.getTaskId(), param.getWorker(), startRequestId});
            Date now = new Date();
            if (TransactionUtils.isNotAffectedRow((int)this.taskMapper.start(param.getTaskId(), param.getWorker(), startRequestId, now))) {
                if (!this.taskMapper.checkStartIdempotent(param.getTaskId(), param.getWorker(), startRequestId)) {
                    return StartTaskResult.failure((String)"Start task failure.");
                }
                LOG.info("Start task idempotent: {}, {}, {}", new Object[]{param.getTaskId(), param.getWorker(), startRequestId});
            }
            if (TransactionUtils.isNotAffectedRow((int)this.instanceMapper.start(param.getInstanceId(), now))) {
                SchedInstance instance = this.instanceMapper.get(param.getInstanceId());
                Assert.state((instance != null && instance.isRunning() ? 1 : 0) != 0, () -> "Start instance failure: " + (Object)((Object)instance));
            }
            return ModelConverter.toStartTaskResult(this.taskMapper.get(param.getTaskId()));
        });
    }

    public boolean stopTask(StopTaskParam param) {
        param.check();
        Operation ops = param.getOperation();
        LOG.info("Task trace [{}] stopping: {}, {}, {}", new Object[]{param.getTaskId(), ops, param.getToState(), param.getWorker()});
        return this.doInSynchronizedTransaction(param.getInstanceId(), param.getWnstanceId(), (SchedInstance instance) -> {
            Assert.isTrue((!instance.isWorkflowLead() ? 1 : 0) != 0, () -> "Stop task instance cannot be workflow lead: " + (Object)instance);
            if (instance.isTerminal()) {
                return false;
            }
            ExecuteState toState = param.getToState();
            Date executeEndTime = toState.isTerminal() ? new Date() : null;
            String errMsg = param.getErrorMsg();
            if (!this.taskMapper.terminate(param.getTaskId(), param.getWorker(), toState, ExecuteState.EXECUTING, executeEndTime, errMsg)) {
                LOG.warn("Conflict stop executing task: {}, {}", (Object)param.getTaskId(), (Object)toState);
                return false;
            }
            List<SchedTask> tasks = this.taskMapper.findBaseByInstanceId(param.getInstanceId());
            if (toState == ExecuteState.WAITING) {
                Assert.isTrue((ops == Operation.SHUTDOWN_RESUME ? 1 : 0) != 0, () -> "Operation must be SHUTDOWN_RESUME, but actual: " + ops);
                if (tasks.stream().allMatch(SchedTask::isWaiting)) {
                    boolean updated = this.instanceMapper.updateState(param.getInstanceId(), RunState.WAITING, RunState.RUNNING);
                    Assert.isTrue((boolean)updated, () -> "Shutdown resume instance state to WAITING failed: " + param.getInstanceId());
                }
                Date nextScanTime = new Date(System.currentTimeMillis() + this.conf.getShutdownTaskDelayResumeMs());
                if (TransactionUtils.isNotAffectedRow((int)this.instanceMapper.updateNextScanTime(instance.getInstanceId(), nextScanTime, instance.getVersion()))) {
                    LOG.warn("Resume task renew instance update time failed: {}", (Object)param.getTaskId());
                }
                return true;
            }
            Tuple2<RunState, Date> tuple = this.obtainRunState(tasks);
            if (tuple == null) {
                return true;
            }
            if (!((RunState)tuple.a).isTerminal()) {
                Assert.isTrue((tuple.a == RunState.PAUSED ? 1 : 0) != 0, () -> "Run state must be PAUSED, but actual: " + tuple.a);
                this.pauseInstance(instance.isWorkflow() ? this.instanceMapper.get(instance.getWnstanceId()) : instance);
                return true;
            }
            boolean updated = this.instanceMapper.terminate(param.getInstanceId(), (RunState)tuple.a, RS_TERMINABLE, (Date)tuple.b);
            Assert.state((boolean)updated, () -> "Stop task instance failed: " + param.getInstanceId() + ", " + tuple.a);
            instance.markTerminated((RunState)tuple.a, (Date)tuple.b);
            if (ops.isTrigger()) {
                this.processTerminatedInstance((SchedInstance)((Object)instance));
            } else if (instance.isWorkflowNode()) {
                Assert.isTrue((tuple.a == RunState.CANCELED ? 1 : 0) != 0, () -> "Invalid workflow non-trigger stop state: " + tuple.a);
                this.updateWorkflowNodeState((SchedInstance)((Object)instance), (RunState)tuple.a, RS_TERMINABLE);
                this.updateWorkflowLeadState(this.instanceMapper.get(instance.getWnstanceId()), (RunState)tuple.a, RS_RUNNABLE);
            } else {
                Assert.isTrue((tuple.a == RunState.CANCELED ? 1 : 0) != 0, () -> "Invalid general non-trigger stop state: " + tuple.a);
                this.afterTerminatedInstance((SchedInstance)((Object)instance));
            }
            return true;
        });
    }

    public void changeInstanceState(long instanceId, ExecuteState toExecuteState) {
        Assert.isTrue((toExecuteState != ExecuteState.EXECUTING ? 1 : 0) != 0, () -> "Force change state invalid target: " + toExecuteState);
        this.doInSynchronizedTransaction(instanceId, null, (SchedInstance instance) -> {
            Assert.isTrue((!instance.isWorkflow() ? 1 : 0) != 0, () -> "Force change state unsupported workflow: " + instanceId);
            RunState fromRunState = RunState.of((int)instance.getRunState());
            RunState toRunState = toExecuteState.runState();
            Assert.isTrue((fromRunState != RunState.RUNNING ? 1 : 0) != 0, (String)"Force change state current cannot be RUNNING.");
            Assert.isTrue((fromRunState != toRunState ? 1 : 0) != 0, () -> "Force change state current cannot equals target " + toRunState);
            boolean updated = this.instanceMapper.updateState(instanceId, toRunState, fromRunState);
            Assert.state((boolean)updated, () -> "Force change state failed: " + instanceId);
            int changedTaskRows = this.taskMapper.forceChangeState(instanceId, toExecuteState.value());
            if (toExecuteState == ExecuteState.WAITING) {
                Tuple3<SchedJob, SchedInstance, List<SchedTask>> tuple = this.buildDispatchParam(instanceId, changedTaskRows);
                this.dispatch(false, (SchedJob)((Object)((Object)tuple.a)), (SchedInstance)((Object)((Object)tuple.b)), (List)tuple.c);
            }
            LOG.info("Force change state success {}, {}", (Object)instanceId, (Object)toExecuteState);
        });
    }

    public void deleteInstance(long instanceId) {
        this.doInSynchronizedTransaction(instanceId, this.requireWnstanceIdIfWorkflow(instanceId), (SchedInstance instance) -> {
            Assert.isTrue((boolean)instance.isTerminal(), () -> "Deleting instance must be terminal: " + (Object)instance);
            if (instance.isWorkflow()) {
                Assert.isTrue((boolean)instance.isWorkflowLead(), () -> "Delete workflow instance must be lead: " + instanceId);
                List<SchedInstance> nodeInstances = this.instanceMapper.findWorkflowNode(instanceId);
                TransactionUtils.assertHasAffectedRow((int)this.instanceMapper.deleteByWnstanceId(instanceId), () -> "Delete workflow instance failed: " + instanceId);
                TransactionUtils.assertHasAffectedRow((int)this.workflowMapper.deleteByWnstanceId(instanceId), () -> "Delete workflow config failed: " + instanceId);
                for (SchedInstance nodeInstance : nodeInstances) {
                    int row = this.taskMapper.deleteByInstanceId(nodeInstance.getInstanceId());
                    TransactionUtils.assertHasAffectedRow((int)row, () -> "Delete workflow task failed: " + (Object)((Object)nodeInstance));
                }
            } else {
                Assert.isTrue((instance.getRetrying() == false ? 1 : 0) != 0, (String)"Cannot delete retrying original instance.");
                Assert.isTrue((!instance.isRunRetry() ? 1 : 0) != 0, (String)"Cannot delete run retry sub instance.");
                Set instanceIds = this.instanceMapper.findRunRetry(instanceId).stream().map(SchedInstance::getInstanceId).collect(Collectors.toSet());
                instanceIds.add(instanceId);
                for (Long id : instanceIds) {
                    TransactionUtils.assertOneAffectedRow((int)this.instanceMapper.deleteByInstanceId(id), () -> "Delete instance failed: " + id);
                    TransactionUtils.assertHasAffectedRow((int)this.taskMapper.deleteByInstanceId(id), () -> "Delete task failed: " + id);
                }
            }
            LOG.info("Delete sched instance success {}", (Object)instanceId);
        });
    }

    public boolean purgeInstance(SchedInstance inst) {
        Long instanceId = inst.getInstanceId();
        LOG.info("Purge instance: {}", (Object)instanceId);
        return this.doInSynchronizedTransaction((long)instanceId, inst.getWnstanceId(), (SchedInstance instance) -> {
            Assert.isTrue((!instance.isWorkflowLead() ? 1 : 0) != 0, () -> "Purge instance cannot be workflow lead: " + (Object)instance);
            if (!instance.isPausable()) {
                return false;
            }
            List<SchedTask> tasks = this.taskMapper.findBaseByInstanceId(instanceId);
            if (tasks.stream().anyMatch(SchedTask::isWaiting) || this.workerClient.hasAliveTask(tasks)) {
                LOG.warn("Purge instance failed, has waiting or alive executing task: {}", tasks);
                return false;
            }
            Tuple2 tuple = this.obtainRunState(tasks);
            if (tuple == null) {
                tuple = Tuple2.of((Object)RunState.CANCELED, (Object)new Date());
            }
            Assert.isTrue((boolean)((RunState)tuple.a).isTerminal(), () -> "Purge instance state must be terminal state: " + (Object)instance);
            if (!this.instanceMapper.terminate((long)instanceId, (RunState)tuple.a, RS_TERMINABLE, (Date)tuple.b)) {
                throw new IllegalStateException("Purge instance failed: " + (Object)instance + ", " + tuple.a);
            }
            tasks.stream().filter(SchedTask::isPausable).forEach(e -> {
                String worker = e.isExecuting() ? Strings.requireNonBlank((String)e.getWorker()) : null;
                ExecuteState fromState = ExecuteState.of((Integer)e.getExecuteState());
                this.taskMapper.terminate((long)e.getTaskId(), worker, ExecuteState.EXECUTE_ABORTED, fromState, new Date(), null);
            });
            instance.markTerminated((RunState)tuple.a, (Date)tuple.b);
            this.processTerminatedInstance((SchedInstance)((Object)instance));
            LOG.warn("Purge instance {} to state {}", (Object)instanceId, tuple.a);
            return true;
        });
    }

    public boolean pauseInstance(long instanceId) {
        LOG.info("Pause instance: {}", (Object)instanceId);
        return this.doInSynchronizedTransaction(instanceId, this.requireWnstanceIdIfWorkflow(instanceId), (SchedInstance instance) -> Functions.doIfTrue((boolean)instance.isPausable(), () -> this.pauseInstance((SchedInstance)((Object)instance))));
    }

    public boolean cancelInstance(long instanceId, Operation ops) {
        LOG.info("Cancel instance: {}, {}", (Object)instanceId, (Object)ops);
        Assert.isTrue((boolean)ops.toState().isFailure(), () -> "Cancel instance operation invalid: " + ops);
        return this.doInSynchronizedTransaction(instanceId, this.requireWnstanceIdIfWorkflow(instanceId), (SchedInstance instance) -> Functions.doIfTrue((!instance.isTerminal() ? 1 : 0) != 0, () -> this.cancelInstance((SchedInstance)((Object)instance), ops)));
    }

    public boolean resumeInstance(long instanceId) {
        LOG.info("Resume instance: {}", (Object)instanceId);
        return this.doInSynchronizedTransaction(instanceId, this.requireWnstanceIdIfWorkflow(instanceId), (SchedInstance instance) -> Functions.doIfTrue((boolean)instance.isPaused(), () -> this.resumeInstance((SchedInstance)((Object)instance))));
    }

    private void saveInstances(List<SchedInstance> instances, List<SchedWorkflow> workflows, List<SchedTask> tasks) {
        instances.forEach(SchedInstance::fillUniqueFlag);
        Collects.batchProcess(instances, this.instanceMapper::batchInsert, (int)100);
        Collects.batchProcess(workflows, this.workflowMapper::batchInsert, (int)100);
        Collects.batchProcess(tasks, this.taskMapper::batchInsert, (int)100);
    }

    private SchedJob getRequiredJob(long jobId) {
        return Objects.requireNonNull(this.jobMapper.get(jobId), () -> "Job not found: " + jobId);
    }

    private void triggerJob(SchedJob schedJob, RunType runType, long triggerTime) throws JobException {
        TriggerInstance ti = TriggerInstance.of(this, schedJob, null, runType, triggerTime);
        ti.save((TriConsumer<List<SchedInstance>, List<SchedWorkflow>, List<SchedTask>>)((TriConsumer)this::saveInstances));
        ti.dispatch((TriConsumer<SchedJob, SchedInstance, List<SchedTask>>)((TriConsumer)(job, instance, tasks) -> this.dispatch(false, (SchedJob)((Object)job), (SchedInstance)((Object)instance), (List<SchedTask>)tasks)));
    }

    private boolean shouldTerminateDispatchFailedTask(long taskId) {
        SchedTask task = this.taskMapper.get(taskId);
        if (!task.isWaiting()) {
            return false;
        }
        int currentDispatchFailedCount = task.getDispatchFailedCount();
        if (currentDispatchFailedCount >= this.conf.getTaskDispatchFailedCountThreshold()) {
            return true;
        }
        return TransactionUtils.isOneAffectedRow((int)this.taskMapper.incrementDispatchFailedCount(taskId, currentDispatchFailedCount)) && currentDispatchFailedCount + 1 == this.conf.getTaskDispatchFailedCountThreshold();
    }

    private void updateNextTriggerTime(SchedJob job) {
        if (TriggerType.of((int)job.getTriggerType()) == TriggerType.DEPEND) {
            return;
        }
        Long nextTriggerTime = TriggerTimes.updateNextTriggerTime(job);
        if (!nextTriggerTime.equals(job.getNextTriggerTime())) {
            job.setNextTriggerTime(nextTriggerTime);
            TransactionUtils.assertOneAffectedRow((int)this.jobMapper.updateNextTriggerTime(job), () -> "Update next trigger time failed: " + (Object)((Object)job));
        }
    }

    private void parseTriggerConfig(SchedJob job) {
        String triggerValue = CoreUtils.trimRequired((String)job.getTriggerValue(), (int)255, (String)"Trigger value");
        job.setTriggerValue(triggerValue);
        Long jobId = job.getJobId();
        if (TriggerType.of((int)job.getTriggerType()) == TriggerType.DEPEND) {
            List parentJobIds = Collects.split((String)triggerValue, Long::parseLong);
            Assert.notEmpty((Collection)parentJobIds, () -> "Invalid dependency parent job id config: " + triggerValue);
            Assert.isTrue((!parentJobIds.contains(jobId) ? 1 : 0) != 0, () -> "Cannot depends self: " + jobId + ", " + parentJobIds);
            Map parentJobMap = Collects.toMap(this.jobMapper.findByJobIds(parentJobIds), SchedJob::getJobId);
            for (Long parentJobId : parentJobIds) {
                SchedJob parentJob = (SchedJob)((Object)parentJobMap.get(parentJobId));
                Assert.notNull((Object)((Object)parentJob), () -> "Parent job id not found: " + parentJobId);
                String cGroup = job.getGroup();
                String pGroup = parentJob.getGroup();
                Assert.isTrue((boolean)cGroup.equals(pGroup), () -> "Inconsistent depend group: " + cGroup + ", " + pGroup);
            }
            this.checkCycleDepends(jobId, new HashSet<Long>(parentJobIds));
            List list = Collects.convert((Collection)parentJobIds, pid -> SchedDepend.of(pid, jobId));
            Collects.batchProcess((List)list, this.dependMapper::batchInsert, (int)100);
            job.setTriggerValue(Joiner.on((String)",").join((Iterable)parentJobIds));
            job.setNextTriggerTime(null);
        } else {
            job.setNextTriggerTime(TriggerTimes.updateNextTriggerTime(job));
        }
    }

    private void checkCycleDepends(Long jobId, Set<Long> parentJobIds) {
        Set<Long> outerDepends = parentJobIds;
        int i = 1;
        Map map;
        while (!MapUtils.isEmpty((Map)(map = Collects.toMap(this.dependMapper.findByChildJobIds(parentJobIds), SchedDepend::getParentJobId)))) {
            Assert.isTrue((!map.containsKey(jobId) ? 1 : 0) != 0, () -> "Depends job has cycle: " + map.get(jobId));
            Assert.isTrue((i < this.conf.getMaximumJobDependsDepth() ? 1 : 0) != 0, () -> "Exceed depends depth: " + outerDepends);
            parentJobIds = map.keySet();
            ++i;
        }
        return;
    }

    private void dispatch(boolean isRedispatch, SchedJob job, SchedInstance instance, List<SchedTask> tasks) {
        List<Tuple2<Worker, Long>> workload;
        ExecuteTaskParamBuilder builder = new ExecuteTaskParamBuilder(job, instance);
        RouteStrategy routeStrategy = RouteStrategy.of((int)job.getRouteStrategy());
        ArrayList<ExecuteTaskParam> list = new ArrayList<ExecuteTaskParam>(tasks.size());
        if (routeStrategy.isBroadcast()) {
            for (SchedTask task : tasks) {
                Worker worker = task.worker();
                if (!this.workerClient.isAliveWorker(worker)) {
                    this.taskMapper.terminate((long)task.getTaskId(), null, ExecuteState.BROADCAST_ABORTED, ExecuteState.WAITING, null, null);
                    continue;
                }
                list.add(builder.build(Operation.TRIGGER, task.getTaskId(), instance.getTriggerTime(), worker));
            }
        } else if (!isRedispatch || routeStrategy.isNotRoundRobin() || (workload = this.calculateWorkload(job, instance)).isEmpty()) {
            for (SchedTask task : tasks) {
                list.add(builder.build(Operation.TRIGGER, task.getTaskId(), instance.getTriggerTime(), null));
            }
        } else {
            for (SchedTask task : tasks) {
                workload.sort(WORKLOAD_COMPARATOR);
                Tuple2<Worker, Long> first = workload.get(0);
                list.add(builder.build(Operation.TRIGGER, task.getTaskId(), instance.getTriggerTime(), (Worker)first.a));
                Tuple2<Worker, Long> tuple2 = first;
                Long.valueOf((Long)tuple2.b + 1L);
                tuple2.b = tuple2.b;
            }
        }
        TransactionUtils.doAfterTransactionCommit(() -> this.workerClient.dispatch(job.getGroup(), list));
    }

    private List<Tuple2<Worker, Long>> calculateWorkload(SchedJob job, SchedInstance instance) {
        List<Worker> workers = this.workerClient.getAliveWorkers(job.getGroup());
        if (CollectionUtils.isEmpty(workers)) {
            LOG.error("Not found available worker for calculate workload: {}", (Object)job.getGroup());
            return Collections.emptyList();
        }
        List<SchedTask> pausableTasks = this.taskMapper.findBaseByInstanceIdAndStates(instance.getInstanceId(), ES_PAUSABLE);
        Map<String, Long> workerScoreMapping = pausableTasks.stream().filter(e -> StringUtils.isNotBlank((CharSequence)e.getWorker())).collect(Collectors.groupingBy(SchedTask::getWorker, Collectors.counting()));
        return Collects.convert(workers, e -> Tuple2.of((Object)e, (Object)workerScoreMapping.getOrDefault(e.serialize(), 0L)));
    }

    private Long requireWnstanceIdIfWorkflow(long instanceId) {
        Long wnstanceId = this.instanceMapper.getWnstanceId(instanceId);
        if (wnstanceId != null && instanceId != wnstanceId) {
            throw new IllegalArgumentException("Must be workflow wnstance id: " + wnstanceId + ", " + instanceId);
        }
        return wnstanceId;
    }

    private void doInSynchronizedTransaction(long instanceId, Long wnstanceId, Consumer<SchedInstance> action) {
        this.doInSynchronizedTransaction(instanceId, wnstanceId, Functions.convert(action, (boolean)true));
    }

    private boolean doInSynchronizedTransaction(long instanceId, Long wnstanceId, Predicate<SchedInstance> action) {
        return this.doInSynchronizedTransaction0(instanceId, wnstanceId, lockInstanceId -> {
            SchedInstance lockedInstance = this.instanceMapper.lock(lockInstanceId);
            Assert.notNull((Object)((Object)lockedInstance), () -> "Locked instance not found: " + lockInstanceId);
            SchedInstance instance = instanceId == lockInstanceId ? lockedInstance : this.instanceMapper.get(instanceId);
            Assert.notNull((Object)((Object)instance), () -> "Instance not found: " + instanceId);
            if (!Objects.equals(instance.getWnstanceId(), wnstanceId)) {
                throw new IllegalStateException("Inconsistent workflow instance id: " + wnstanceId + ", " + (Object)((Object)instance));
            }
            return action.test(instance);
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T> T doInSynchronizedTransaction0(long instanceId, Long wnstanceId, LongFunction<T> action) {
        Long lockInstanceId = wnstanceId != null ? wnstanceId : Long.valueOf(instanceId);
        Long l = (Long)CoreUtils.INSTANCE_LOCK_POOL.intern((Object)lockInstanceId);
        synchronized (l) {
            return (T)this.transactionTemplate.execute(status -> action.apply(lockInstanceId));
        }
    }

    private Tuple2<RunState, Date> obtainRunState(List<SchedTask> tasks) {
        List states = tasks.stream().map(e -> ExecuteState.of((Integer)e.getExecuteState())).collect(Collectors.toList());
        if (states.stream().allMatch(ExecuteState::isTerminal)) {
            RunState toState = states.stream().anyMatch(ExecuteState::isFailure) ? RunState.CANCELED : RunState.COMPLETED;
            Date maxExecuteEndTime = tasks.stream().map(SchedTask::getExecuteEndTime).filter(Objects::nonNull).max(Comparator.naturalOrder()).orElseGet(Date::new);
            return Tuple2.of((Object)toState, (Object)maxExecuteEndTime);
        }
        return states.stream().anyMatch(ExecuteState::isPausable) ? null : Tuple2.of((Object)RunState.PAUSED, null);
    }

    private void pauseInstance(SchedInstance instance) {
        if (instance.isWorkflow()) {
            long instanceId = instance.getInstanceId();
            Assert.isTrue((boolean)instance.isWorkflowLead(), () -> "Pause instance must be workflow lead: " + instanceId);
            this.instanceMapper.findWorkflowNode(instanceId).stream().filter(SchedInstance::isPausable).forEach(this::pauseInstance0);
            this.updateWorkflowLeadState(instance, RunState.PAUSED, RS_WAITING);
        } else {
            this.pauseInstance0(instance);
        }
    }

    private void pauseInstance0(SchedInstance instance) {
        Assert.isTrue((boolean)instance.isPausable(), () -> "Invalid pause instance state: " + (Object)((Object)instance));
        long instanceId = instance.getInstanceId();
        Operation ops = Operation.PAUSE;
        this.taskMapper.updateStateByInstanceId(instanceId, ops.toState().value(), ES_WAITING, null);
        List<ExecuteTaskParam> executingTasks = this.loadExecutingTasks(instance, ops);
        if (executingTasks.isEmpty()) {
            Tuple2<RunState, Date> tuple = this.obtainRunState(this.taskMapper.findBaseByInstanceId(instanceId));
            Assert.notNull(tuple, () -> "Pause instance failed: " + instanceId);
            boolean updated = this.instanceMapper.terminate(instanceId, (RunState)tuple.a, RS_PAUSABLE, (Date)tuple.b);
            Assert.state((boolean)updated, () -> "Pause instance failed: " + (Object)((Object)instance) + ", " + tuple.a);
            if (instance.isWorkflowNode()) {
                this.updateWorkflowNodeState(instance, (RunState)tuple.a, RS_PAUSABLE);
            } else if (((RunState)tuple.a).isTerminal()) {
                instance.markTerminated((RunState)tuple.a, (Date)tuple.b);
                this.afterTerminatedInstance(instance);
            }
        } else {
            TransactionUtils.doAfterTransactionCommit(() -> this.workerClient.dispatch(executingTasks));
        }
    }

    private void cancelInstance(SchedInstance instance, Operation ops) {
        if (instance.isWorkflow()) {
            long instanceId = instance.getInstanceId();
            Assert.isTrue((boolean)instance.isWorkflowLead(), () -> "Cancel instance must be workflow lead: " + instanceId);
            this.instanceMapper.findWorkflowNode(instanceId).stream().filter(e -> !e.isTerminal()).forEach(e -> this.cancelInstance0((SchedInstance)((Object)e), ops));
            this.updateWorkflowLeadState(instance, RunState.CANCELED, RS_RUNNABLE);
        } else {
            this.cancelInstance0(instance, ops);
        }
    }

    private void cancelInstance0(SchedInstance instance, Operation ops) {
        long instanceId = instance.getInstanceId();
        this.taskMapper.updateStateByInstanceId(instanceId, ops.toState().value(), ES_EXECUTABLE, new Date());
        List<ExecuteTaskParam> executingTasks = this.loadExecutingTasks(instance, ops);
        if (executingTasks.isEmpty()) {
            Tuple2 tuple = this.obtainRunState(this.taskMapper.findBaseByInstanceId(instanceId));
            Assert.notNull(tuple, () -> "Cancel instance obtain run state failed: " + instanceId);
            if (tuple.a == RunState.PAUSED) {
                tuple = Tuple2.of((Object)RunState.CANCELED, (Object)new Date());
            }
            if (!this.instanceMapper.terminate(instanceId, (RunState)tuple.a, RS_TERMINABLE, (Date)tuple.b)) {
                throw new IllegalStateException("Cancel instance failed: " + (Object)((Object)instance) + ", " + tuple.a);
            }
            instance.markTerminated((RunState)tuple.a, (Date)tuple.b);
            if (instance.isWorkflowNode()) {
                this.updateWorkflowNodeState(instance, (RunState)tuple.a, RS_TERMINABLE);
            } else {
                this.afterTerminatedInstance(instance);
            }
        } else {
            TransactionUtils.doAfterTransactionCommit(() -> this.workerClient.dispatch(executingTasks));
        }
    }

    private void resumeInstance(SchedInstance instance) {
        if (instance.isWorkflow()) {
            long instanceId = instance.getInstanceId();
            Assert.isTrue((boolean)instance.isWorkflowLead(), () -> "Resume instance must be workflow lead: " + instanceId);
            boolean updated = this.instanceMapper.updateState(instanceId, RunState.RUNNING, RunState.PAUSED);
            Assert.state((boolean)updated, () -> "Resume workflow lead instance failed: " + instanceId);
            this.workflowMapper.resumeWaiting(instanceId);
            for (SchedInstance nodeInstance : this.instanceMapper.findWorkflowNode(instanceId)) {
                if (!nodeInstance.isPaused()) continue;
                this.resumeInstance0(nodeInstance);
                this.updateWorkflowNodeState(nodeInstance, RunState.RUNNING, RS_PAUSED);
            }
            WorkflowGraph graph = WorkflowGraph.of(this.workflowMapper.findByWnstanceId(instanceId));
            Throwables.ThrowingRunnable.doChecked(() -> this.processWorkflowGraph(instance, graph, graph.map()).forEach(Runnable::run));
        } else {
            this.resumeInstance0(instance);
        }
    }

    private void resumeInstance0(SchedInstance instance) {
        long instanceId = instance.getInstanceId();
        boolean updated = this.instanceMapper.updateState(instanceId, RunState.WAITING, RunState.PAUSED);
        Assert.state((boolean)updated, () -> "Resume sched instance failed: " + instanceId);
        int row = this.taskMapper.updateStateByInstanceId(instanceId, ExecuteState.WAITING.value(), ES_PAUSED, null);
        TransactionUtils.assertHasAffectedRow((int)row, (String)"Resume sched task failed.");
        Tuple3<SchedJob, SchedInstance, List<SchedTask>> param = this.buildDispatchParam(instanceId, row);
        this.dispatch(false, (SchedJob)((Object)param.a), (SchedInstance)((Object)param.b), (List)param.c);
    }

    private void processTerminatedInstance(SchedInstance instance) {
        Assert.isTrue((!instance.isWorkflowLead() ? 1 : 0) != 0, () -> "After terminate task cannot be workflow lead: " + (Object)((Object)instance));
        RunState runState = RunState.of((int)instance.getRunState());
        if (runState == RunState.CANCELED) {
            this.retryJob(instance);
        } else if (runState == RunState.COMPLETED) {
            if (!instance.isWorkflowNode()) {
                this.afterTerminatedInstance(instance);
            }
            this.processWorkflowInstance(instance);
            this.dependJob(instance);
        } else {
            throw new IllegalStateException("Unknown terminate run state " + runState);
        }
    }

    private void retryJob(SchedInstance failed) {
        Long retryingInstanceId = (Long)Throwables.ThrowingSupplier.doCaught(() -> this.retryJob0(failed));
        if (retryingInstanceId != null) {
            this.startRetrying(failed);
            return;
        }
        if (failed.isWorkflowNode()) {
            this.updateWorkflowNodeState(failed, RunState.CANCELED, RS_TERMINABLE);
            this.updateWorkflowLeadState(this.instanceMapper.get(failed.getWnstanceId()), RunState.CANCELED, RS_RUNNABLE);
        } else {
            this.afterTerminatedInstance(failed);
        }
    }

    private Long retryJob0(SchedInstance failed) throws JobException {
        SchedJob job = this.getRequiredJob(failed.getJobId());
        int retriedCount = failed.obtainRetriedCount();
        if (!job.retryable(RunState.of((int)failed.getRunState()), retriedCount)) {
            return null;
        }
        long retryInstanceId = this.generateId();
        long triggerTime = job.computeRetryTriggerTime(++retriedCount);
        SchedInstance retryInstance = SchedInstance.of(failed, retryInstanceId, job.getJobId(), RunType.RETRY, triggerTime, retriedCount);
        retryInstance.setWorkflowCurNode(failed.getWorkflowCurNode());
        List<SchedTask> tasks = this.splitRetryTask(job, failed, retryInstance);
        if (tasks.isEmpty()) {
            LOG.warn("Retry instance split tasks is empty: {}, {}", (Object)job, (Object)failed);
            return null;
        }
        Throwables.ThrowingRunnable persistenceAction = () -> {
            if (failed.isWorkflowNode()) {
                String curNode = failed.getWorkflowCurNode();
                int row = this.workflowMapper.update(failed.getWnstanceId(), curNode, null, retryInstanceId, RS_RUNNING, failed.getInstanceId());
                TransactionUtils.assertHasAffectedRow((int)row, () -> "Retry instance, workflow node update failed.");
            }
            this.saveInstances(Collections.singletonList(retryInstance), null, tasks);
        };
        Consumer<Throwable> errorHandler = t -> {
            throw new IllegalStateException("Create retry instance failed: " + (Object)((Object)failed), (Throwable)t);
        };
        if (TransactionUtils.doInNestedTransaction((TransactionTemplate)this.transactionTemplate, (Throwables.ThrowingRunnable)persistenceAction, errorHandler)) {
            this.dispatch(false, job, retryInstance, tasks);
        }
        return retryInstanceId;
    }

    private List<SchedTask> splitRetryTask(SchedJob job, SchedInstance failed, SchedInstance retry) throws JobException {
        RetryType retryType = RetryType.of((int)job.getRetryType());
        if (retryType == RetryType.ALL) {
            SplitJobParam splitJobParam;
            if (failed.isWorkflow()) {
                List<PredecessorInstance> list = this.loadWorkflowPredecessorInstances(job, failed.getWnstanceId(), failed.getInstanceId());
                splitJobParam = ModelConverter.toSplitJobParam(job, retry, list);
            } else {
                splitJobParam = ModelConverter.toSplitJobParam(job, retry);
            }
            return this.splitJob(job.getGroup(), retry.getInstanceId(), splitJobParam);
        }
        if (retryType == RetryType.FAILED) {
            return this.taskMapper.findLargeByInstanceId(failed.getInstanceId()).stream().filter(SchedTask::isFailure).filter(e -> !job.isBroadcast() || this.workerClient.isAliveWorker(e.worker())).map(e -> SchedTask.of(e.getTaskParam(), this.generateId(), retry.getInstanceId(), e.getTaskNo(), e.getTaskCount(), e.getWorker())).collect(Collectors.toList());
        }
        throw new UnsupportedOperationException("Retry instance, unknown retry type: " + job.getJobId() + ", " + retryType);
    }

    private void dependJob(SchedInstance parent) {
        if (parent.isWorkflowNode() || !parent.isCompleted()) {
            return;
        }
        for (SchedDepend depend : this.dependMapper.findByParentJobId(parent.getJobId())) {
            Throwables.ThrowingRunnable.doCaught(() -> this.dependJob(parent, depend), () -> "Depend job error: " + (Object)((Object)parent) + ", " + (Object)((Object)depend));
        }
    }

    private void dependJob(SchedInstance parent, SchedDepend depend) throws JobException {
        Consumer<Throwable> errorHandler;
        SchedJob childJob = this.getRequiredJob(depend.getChildJobId());
        if (childJob.isDisabled()) {
            LOG.warn("Depend child job disabled: {}", (Object)childJob);
            return;
        }
        TriggerInstance ti = TriggerInstance.of(this, childJob, parent, RunType.DEPEND, System.currentTimeMillis());
        if (TransactionUtils.doInNestedTransaction((TransactionTemplate)this.transactionTemplate, () -> ti.save((TriConsumer<List<SchedInstance>, List<SchedWorkflow>, List<SchedTask>>)((TriConsumer)this::saveInstances)), errorHandler = t -> LOG.error("Create depend instance failed: {}, {}", new Object[]{childJob, parent, t}))) {
            ti.dispatch((TriConsumer<SchedJob, SchedInstance, List<SchedTask>>)((TriConsumer)(job, instance, tasks) -> this.dispatch(false, (SchedJob)((Object)job), (SchedInstance)((Object)instance), (List<SchedTask>)tasks)));
        }
    }

    private List<ExecuteTaskParam> loadExecutingTasks(SchedInstance instance, Operation ops) {
        ArrayList<ExecuteTaskParam> executingTasks = new ArrayList<ExecuteTaskParam>();
        ExecuteTaskParamBuilder builder = null;
        long triggerTime = System.currentTimeMillis();
        for (SchedTask task : this.taskMapper.findBaseByInstanceIdAndStates(instance.getInstanceId(), ES_EXECUTING)) {
            Worker worker = task.worker();
            if (this.workerClient.isAliveWorker(worker)) {
                if (builder == null) {
                    builder = new ExecuteTaskParamBuilder(this.getRequiredJob(instance.getJobId()), instance);
                }
                executingTasks.add(builder.build(ops, task.getTaskId(), triggerTime, worker));
                continue;
            }
            Date executeEndTime = ops.toState().isTerminal() ? new Date() : null;
            ExecuteState toState = ops.toState().isTerminal() ? ExecuteState.EXECUTE_ABORTED : ops.toState();
            ExecuteState fromState = ExecuteState.EXECUTING;
            if (this.taskMapper.terminate((long)task.getTaskId(), task.getWorker(), toState, fromState, executeEndTime, null)) {
                LOG.info("Terminate dead worker executing task success: {}", (Object)task);
                continue;
            }
            LOG.error("Terminate dead worker executing task failed: {}", (Object)task);
        }
        return executingTasks;
    }

    private Tuple3<SchedJob, SchedInstance, List<SchedTask>> buildDispatchParam(long instanceId, int expectTaskSize) {
        SchedInstance instance = this.instanceMapper.get(instanceId);
        SchedJob job = this.getRequiredJob(instance.getJobId());
        List<SchedTask> waitingTasks = this.taskMapper.findLargeByInstanceIdAndStates(instanceId, ES_WAITING);
        int size = waitingTasks.size();
        Assert.state((size == expectTaskSize ? 1 : 0) != 0, () -> "Invalid dispatch tasks size: " + size + ", " + expectTaskSize);
        return Tuple3.of((Object)((Object)job), (Object)((Object)instance), waitingTasks);
    }

    private void startRetrying(SchedInstance instance) {
        if (!instance.isRunRetry()) {
            RunState state = RunState.CANCELED;
            boolean updated = this.instanceMapper.updateRetrying((long)instance.getInstanceId(), true, state, state);
            Assert.state((boolean)updated, () -> "Start retrying failed: " + (Object)((Object)instance));
        }
    }

    private void stopRetrying(SchedInstance instance, RunState toState) {
        if (instance.isRunRetry()) {
            long id = instance.obtainRetryOriginalInstanceId();
            boolean updated = this.instanceMapper.updateRetrying(id, false, toState, RunState.CANCELED);
            Assert.state((boolean)updated, () -> "Stop retrying failed: " + toState + ", " + (Object)((Object)instance));
        }
    }

    private void afterTerminatedInstance(SchedInstance instance) {
        AlertInstanceEvent event;
        SchedInstance original;
        SchedJob job;
        Assert.isTrue((boolean)instance.isTerminal(), () -> "Renew fixed instance must be terminal state: " + (Object)((Object)instance));
        Assert.isTrue((!instance.isWorkflowNode() ? 1 : 0) != 0, () -> "Renew fixed instance cannot be workflow node: " + (Object)((Object)instance));
        if (instance.isRunRetry()) {
            this.stopRetrying(instance, RunState.of((int)instance.getRunState()));
        }
        if ((job = this.jobMapper.get((original = instance.isRunRetry() ? this.instanceMapper.get(instance.getPnstanceId()) : instance).getJobId())) == null) {
            LOG.error("Sched job not found: {}", (Object)original);
            return;
        }
        if (job.isEnabled() && job.isFixedTriggerType() && original.isRunSchedule()) {
            long nextTriggerTime;
            TriggerType triggerType = TriggerType.of((int)job.getTriggerType());
            long lastTriggerTime = original.getTriggerTime();
            if (triggerType == TriggerType.FIXED_RATE) {
                Date time = triggerType.computeNextTriggerTime(job.getTriggerValue(), new Date(original.getTriggerTime()));
                nextTriggerTime = Dates.max((Date)time, (Date)original.getRunEndTime()).getTime();
            } else {
                nextTriggerTime = triggerType.computeNextTriggerTime(job.getTriggerValue(), original.getRunEndTime()).getTime();
            }
            boolean updated = TransactionUtils.isOneAffectedRow((int)this.jobMapper.updateFixedNextTriggerTime(job.getJobId(), lastTriggerTime, nextTriggerTime));
            LOG.info("Renew fixed next trigger time: {}, {}, {}, {}", new Object[]{job.getJobId(), lastTriggerTime, nextTriggerTime, updated});
        }
        if ((event = ModelConverter.toAlertInstanceEvent(job, original, instance)) != null) {
            TransactionUtils.doAfterTransactionCommit(() -> this.eventPublisher.publishEvent((Object)event));
        }
    }

    private void updateWorkflowNodeState(SchedInstance node, RunState toState, List<Integer> fromStates) {
        Assert.isTrue((boolean)node.isWorkflowNode(), () -> "Update workflow cur node state must be node: " + (Object)((Object)node));
        String curNode = node.getWorkflowCurNode();
        int row = this.workflowMapper.update(node.getWnstanceId(), curNode, toState.value(), null, fromStates, node.getInstanceId());
        TransactionUtils.assertHasAffectedRow((int)row, () -> "Update workflow state failed: " + (Object)((Object)node) + ", " + toState);
        if (toState.isTerminal()) {
            this.stopRetrying(node, toState);
        }
    }

    private void updateWorkflowLeadState(SchedInstance lead, RunState toState, List<Integer> fromStates) {
        Assert.isTrue((boolean)lead.isWorkflowLead(), () -> "Update workflow free node state must be lead: " + (Object)((Object)lead));
        long wnstanceId = lead.getWnstanceId();
        this.workflowMapper.update(wnstanceId, null, toState.value(), null, fromStates, null);
        this.stopWorkflowGraph(wnstanceId, WorkflowGraph.of(this.workflowMapper.findByWnstanceId(wnstanceId)));
    }

    private void processWorkflowInstance(SchedInstance node) {
        if (!node.isWorkflowNode()) {
            return;
        }
        this.updateWorkflowNodeState(node, RunState.COMPLETED, RS_TERMINABLE);
        long wnstanceId = node.getWnstanceId();
        WorkflowGraph graph = WorkflowGraph.of(this.workflowMapper.findByWnstanceId(wnstanceId));
        if (this.stopWorkflowGraph(wnstanceId, graph)) {
            return;
        }
        Map<DAGEdge, SchedWorkflow> map = graph.successors(node.parseWorkflowCurNode());
        SchedInstance lead = this.instanceMapper.get(wnstanceId);
        Consumer<Throwable> errorHandler = t -> {
            LOG.error("Process workflow node error: {}", (Object)node, t);
            this.updateWorkflowLeadState(lead, RunState.CANCELED, RS_RUNNABLE);
        };
        Throwables.ThrowingSupplier persistenceAction = () -> this.processWorkflowGraph(lead, graph, map);
        List dispatchActions = (List)TransactionUtils.doInNestedTransaction((TransactionTemplate)this.transactionTemplate, (Throwables.ThrowingSupplier)persistenceAction, errorHandler);
        CollectionUtils.emptyIfNull((Collection)dispatchActions).forEach(Runnable::run);
    }

    private List<Runnable> processWorkflowGraph(SchedInstance lead, WorkflowGraph graph, Map<DAGEdge, SchedWorkflow> map) throws JobException {
        Assert.isTrue((boolean)lead.isWorkflowLead(), () -> "Process workflow node must be lead: " + (Object)((Object)lead));
        ArrayList<Runnable> dispatchActions = new ArrayList<Runnable>();
        if (!map.isEmpty()) {
            SchedJob job = this.getRequiredJob(lead.getJobId());
            HashSet<DAGNode> duplicates = new HashSet<DAGNode>();
            for (Map.Entry<DAGEdge, SchedWorkflow> edge : map.entrySet()) {
                this.processWorkflowGraph(dispatchActions, job, lead, graph, duplicates, edge);
            }
        }
        return dispatchActions;
    }

    private void processWorkflowGraph(List<Runnable> dispatchActions, SchedJob job, SchedInstance lead, WorkflowGraph graph, Set<DAGNode> duplicates, Map.Entry<DAGEdge, SchedWorkflow> edge) throws JobException {
        long wnstanceId = lead.getWnstanceId();
        DAGNode target = edge.getKey().getTarget();
        SchedWorkflow workflow = edge.getValue();
        if (target.isEnd() || !workflow.isWaiting() || !duplicates.add(target)) {
            return;
        }
        Collection<SchedWorkflow> predecessors = graph.predecessors(target).values();
        if (predecessors.stream().anyMatch(e -> !e.isTerminal())) {
            return;
        }
        if (predecessors.stream().anyMatch(SchedWorkflow::isFailure)) {
            RunState state = RunState.CANCELED;
            int row = this.workflowMapper.update(wnstanceId, workflow.getCurNode(), state.value(), null, RS_TERMINABLE, null);
            TransactionUtils.assertHasAffectedRow((int)row, () -> "Update workflow cur node state failed: " + (Object)((Object)workflow) + ", " + state);
            return;
        }
        long nextInstanceId = this.generateId();
        RunType runType = RunType.of((int)lead.getRunType());
        SchedWorkflow lastPredecessor = predecessors.stream().max(BaseEntity.UPDATED_AT_COMPARATOR).orElse(null);
        SchedInstance parent = lastPredecessor == null ? lead : this.instanceMapper.get(lastPredecessor.getInstanceId());
        SchedInstance nextInstance = SchedInstance.of(parent, nextInstanceId, job.getJobId(), runType, System.currentTimeMillis(), 0);
        nextInstance.setWorkflowCurNode(workflow.getCurNode());
        int row = this.workflowMapper.update(wnstanceId, workflow.getCurNode(), RunState.RUNNING.value(), nextInstanceId, RS_WAITING, null);
        TransactionUtils.assertHasAffectedRow((int)row, () -> "Start workflow node failed: " + (Object)((Object)workflow));
        List<PredecessorInstance> list = predecessors.isEmpty() ? null : this.loadWorkflowPredecessorInstances(job, wnstanceId, nextInstanceId);
        SplitJobParam splitJobParam = ModelConverter.toSplitJobParam(job, nextInstance, list);
        List<SchedTask> tasks = this.splitJob(job.getGroup(), nextInstanceId, splitJobParam);
        this.saveInstances(Collections.singletonList(nextInstance), null, tasks);
        dispatchActions.add(() -> this.dispatch(false, job, nextInstance, tasks));
    }

    private boolean stopWorkflowGraph(long wnstanceId, WorkflowGraph graph) {
        Map<DAGEdge, SchedWorkflow> ends;
        if (graph.anyMatch(e -> ((DAGEdge)e.getKey()).getTarget().isEnd() && !((SchedWorkflow)((Object)((Object)e.getValue()))).isTerminal()) && (ends = graph.predecessors(DAGNode.END)).values().stream().allMatch(SchedWorkflow::isTerminal)) {
            RunState endState = ends.values().stream().anyMatch(SchedWorkflow::isFailure) ? RunState.CANCELED : RunState.COMPLETED;
            int row = this.workflowMapper.update(wnstanceId, DAGNode.END.toString(), endState.value(), null, RS_TERMINABLE, null);
            TransactionUtils.assertHasAffectedRow((int)row, () -> "Update workflow end node failed: " + wnstanceId + ", " + endState);
            ends.forEach((k, v) -> graph.get(k.getTarget(), DAGNode.END).setRunState(endState.value()));
        }
        if (graph.allMatch(e -> ((SchedWorkflow)((Object)((Object)e.getValue()))).isTerminal())) {
            RunState state = graph.anyMatch(e -> ((SchedWorkflow)((Object)((Object)e.getValue()))).isFailure()) ? RunState.CANCELED : RunState.COMPLETED;
            boolean updated = this.instanceMapper.terminate(wnstanceId, state, RS_TERMINABLE, new Date());
            Assert.state((boolean)updated, () -> "Stop workflow instance failed: " + wnstanceId + ", " + state);
            SchedInstance lead = this.instanceMapper.get(wnstanceId);
            this.dependJob(lead);
            this.afterTerminatedInstance(lead);
            return true;
        }
        if (graph.allMatch(e -> ((SchedWorkflow)((Object)((Object)e.getValue()))).isTerminal() || ((SchedWorkflow)((Object)((Object)e.getValue()))).isPaused())) {
            boolean updated = this.instanceMapper.updateState(wnstanceId, RunState.PAUSED, RunState.RUNNING);
            Assert.state((boolean)updated, () -> "Update workflow pause state failed: " + wnstanceId);
            return true;
        }
        return false;
    }

    private List<PredecessorInstance> loadWorkflowPredecessorInstances(SchedJob job, long wnstanceId, Long instanceId) {
        List<SchedWorkflow> workflows = this.workflowMapper.findByWnstanceId(wnstanceId);
        SchedWorkflow curWorkflow = workflows.stream().filter(e -> instanceId.equals(e.getInstanceId())).findAny().orElse(null);
        Assert.state((curWorkflow != null ? 1 : 0) != 0, () -> "Not found current workflow node: " + wnstanceId + ", " + instanceId);
        Map<DAGEdge, SchedWorkflow> predecessors = WorkflowGraph.of(workflows).predecessors(curWorkflow.parseCurNode());
        if (predecessors.isEmpty()) {
            return null;
        }
        RetryType retryType = RetryType.of((int)job.getRetryType());
        return Collects.convert(predecessors.values(), e -> {
            SchedInstance prev;
            List<SchedTask> tasks = this.taskMapper.findLargeByInstanceId(e.getInstanceId());
            if (retryType == RetryType.FAILED && (prev = this.instanceMapper.get(e.getInstanceId())).isRunRetry()) {
                Set<Long> instanceIds = this.instanceMapper.findChildren(prev.getPnstanceId(), RunType.RETRY.value()).stream().map(SchedInstance::getInstanceId).filter(t -> !Objects.equals(t, e.getInstanceId())).collect(Collectors.toSet());
                instanceIds.add(prev.getPnstanceId());
                instanceIds.forEach(t -> tasks.addAll(this.taskMapper.findLargeByInstanceIdAndStates((long)t, ES_COMPLETED)));
            }
            tasks.sort(SchedTask.TASK_NO_COMPARATOR);
            return ModelConverter.toPredecessorInstance(e, tasks);
        });
    }
}

