/*
 * Decompiled with CFR 0.152.
 */
package cn.ponfee.scheduler.supervisor.manager;

import cn.ponfee.scheduler.common.base.IdGenerator;
import cn.ponfee.scheduler.common.base.LazyLoader;
import cn.ponfee.scheduler.common.graph.DAGEdge;
import cn.ponfee.scheduler.common.graph.DAGNode;
import cn.ponfee.scheduler.common.spring.RpcController;
import cn.ponfee.scheduler.common.spring.TransactionUtils;
import cn.ponfee.scheduler.common.tuple.Tuple2;
import cn.ponfee.scheduler.common.tuple.Tuple3;
import cn.ponfee.scheduler.common.util.Collects;
import cn.ponfee.scheduler.common.util.Jsons;
import cn.ponfee.scheduler.common.util.ObjectUtils;
import cn.ponfee.scheduler.core.base.SupervisorService;
import cn.ponfee.scheduler.core.base.Worker;
import cn.ponfee.scheduler.core.enums.ExecuteState;
import cn.ponfee.scheduler.core.enums.JobState;
import cn.ponfee.scheduler.core.enums.Operations;
import cn.ponfee.scheduler.core.enums.RetryType;
import cn.ponfee.scheduler.core.enums.RouteStrategy;
import cn.ponfee.scheduler.core.enums.RunState;
import cn.ponfee.scheduler.core.enums.RunType;
import cn.ponfee.scheduler.core.enums.TriggerType;
import cn.ponfee.scheduler.core.exception.JobException;
import cn.ponfee.scheduler.core.graph.WorkflowGraph;
import cn.ponfee.scheduler.core.model.SchedDepend;
import cn.ponfee.scheduler.core.model.SchedInstance;
import cn.ponfee.scheduler.core.model.SchedJob;
import cn.ponfee.scheduler.core.model.SchedTask;
import cn.ponfee.scheduler.core.model.SchedWorkflow;
import cn.ponfee.scheduler.core.model.WorkflowAttach;
import cn.ponfee.scheduler.core.param.ExecuteTaskParam;
import cn.ponfee.scheduler.core.param.ExecuteTaskParamBuilder;
import cn.ponfee.scheduler.core.param.StartTaskParam;
import cn.ponfee.scheduler.core.param.TaskWorkerParam;
import cn.ponfee.scheduler.core.param.TerminateTaskParam;
import cn.ponfee.scheduler.dispatch.TaskDispatcher;
import cn.ponfee.scheduler.registry.SupervisorRegistry;
import cn.ponfee.scheduler.supervisor.base.WorkerServiceClient;
import cn.ponfee.scheduler.supervisor.dao.mapper.SchedDependMapper;
import cn.ponfee.scheduler.supervisor.dao.mapper.SchedInstanceMapper;
import cn.ponfee.scheduler.supervisor.dao.mapper.SchedJobMapper;
import cn.ponfee.scheduler.supervisor.dao.mapper.SchedTaskMapper;
import cn.ponfee.scheduler.supervisor.dao.mapper.SchedWorkflowMapper;
import cn.ponfee.scheduler.supervisor.instance.NormalInstanceCreator;
import cn.ponfee.scheduler.supervisor.instance.TriggerInstance;
import cn.ponfee.scheduler.supervisor.instance.TriggerInstanceCreator;
import cn.ponfee.scheduler.supervisor.instance.WorkflowInstanceCreator;
import cn.ponfee.scheduler.supervisor.manager.AbstractJobManager;
import cn.ponfee.scheduler.supervisor.param.SplitJobParam;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.math.IntMath;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.Assert;

@Component
public class SchedulerJobManager
extends AbstractJobManager
implements SupervisorService,
RpcController {
    private static final String TX_MANAGER_NAME = "schedulerTransactionManager";
    private static final int AFFECTED_ONE_ROW = 1;
    private static final List<Integer> RUN_STATE_CANCELABLE = Collects.convert((List)RunState.CANCELABLE_LIST, RunState::value);
    private static final List<Integer> RUN_STATE_PAUSABLE = Collects.convert((List)RunState.PAUSABLE_LIST, RunState::value);
    private static final List<Integer> RUN_STATE_WAITING = Collections.singletonList(RunState.WAITING.value());
    private static final List<Integer> RUN_STATE_RUNNING = Collections.singletonList(RunState.RUNNING.value());
    private static final List<Integer> EXECUTE_STATE_EXECUTABLE = Collects.convert((List)ExecuteState.EXECUTABLE_LIST, ExecuteState::value);
    private static final List<Integer> EXECUTE_STATE_PAUSED = Collections.singletonList(ExecuteState.PAUSED.value());
    private static final List<Integer> EXECUTE_STATE_WAITING = Collections.singletonList(ExecuteState.WAITING.value());
    private static final List<Integer> EXECUTE_STATE_PAUSABLE = Collects.convert((List)ExecuteState.PAUSABLE_LIST, ExecuteState::value);
    private final TransactionTemplate transactionTemplate;
    private final SchedJobMapper jobMapper;
    private final SchedInstanceMapper instanceMapper;
    private final SchedTaskMapper taskMapper;
    private final SchedDependMapper dependMapper;
    private final SchedWorkflowMapper workflowMapper;

    public SchedulerJobManager(IdGenerator idGenerator, SupervisorRegistry discoveryWorker, TaskDispatcher taskDispatcher, WorkerServiceClient workerServiceClient, @Qualifier(value="schedulerTransactionTemplate") TransactionTemplate transactionTemplate, SchedJobMapper jobMapper, SchedInstanceMapper instanceMapper, SchedTaskMapper taskMapper, SchedDependMapper dependMapper, SchedWorkflowMapper workflowMapper) {
        super(idGenerator, discoveryWorker, taskDispatcher, workerServiceClient);
        this.transactionTemplate = transactionTemplate;
        this.jobMapper = jobMapper;
        this.instanceMapper = instanceMapper;
        this.taskMapper = taskMapper;
        this.dependMapper = dependMapper;
        this.workflowMapper = workflowMapper;
    }

    public SchedJob getJob(long jobId) {
        return this.jobMapper.getByJobId(jobId);
    }

    public SchedInstance getInstance(long instanceId) {
        return this.instanceMapper.getByInstanceId(instanceId);
    }

    public SchedTask getTask(long taskId) {
        return this.taskMapper.getByTaskId(taskId);
    }

    public List<SchedTask> findMediumTaskByInstanceId(long instanceId) {
        return this.taskMapper.findMediumByInstanceId(instanceId);
    }

    public List<SchedTask> findLargeTaskByInstanceId(long instanceId) {
        return this.taskMapper.findLargeByInstanceId(instanceId);
    }

    public List<SchedJob> findBeTriggering(long maxNextTriggerTime, int size) {
        return this.jobMapper.findBeTriggering(maxNextTriggerTime, size);
    }

    public List<SchedInstance> findExpireWaiting(Date expireTime, int size) {
        return this.instanceMapper.findExpireState(RunState.WAITING.value(), expireTime.getTime(), expireTime, size);
    }

    public List<SchedInstance> findExpireRunning(Date expireTime, int size) {
        return this.instanceMapper.findExpireState(RunState.RUNNING.value(), expireTime.getTime(), expireTime, size);
    }

    public SchedInstance getByTriggerTime(long jobId, long triggerTime, int runType) {
        return this.instanceMapper.getByTriggerTime(jobId, triggerTime, runType);
    }

    public List<SchedInstance> findUnterminatedRetry(long rootInstanceId) {
        return this.instanceMapper.findUnterminatedRetry(rootInstanceId);
    }

    public boolean checkpoint(long taskId, String executeSnapshot) {
        return this.taskMapper.checkpoint(taskId, executeSnapshot) == 1;
    }

    public boolean renewUpdateTime(SchedInstance instance, Date updateTime) {
        return this.instanceMapper.renewUpdateTime(instance.getInstanceId(), updateTime, instance.getVersion()) == 1;
    }

    public boolean changeJobState(long jobId, JobState to) {
        return this.jobMapper.updateState(jobId, to.value(), 1 ^ to.value()) == 1;
    }

    public boolean stopJob(SchedJob job) {
        return 1 == this.jobMapper.stop(job);
    }

    public boolean updateNextTriggerTime(SchedJob job) {
        return this.jobMapper.updateNextTriggerTime(job) == 1;
    }

    public boolean updateNextScanTime(long jobId, Date nextScanTime, int version) {
        return this.jobMapper.updateNextScanTime(jobId, nextScanTime, version) == 1;
    }

    @Override
    protected boolean cancelWaitingTask(long taskId) {
        return this.taskMapper.terminate(taskId, ExecuteState.WAITING_CANCELED.value(), ExecuteState.WAITING.value(), null, null) == 1;
    }

    @Transactional(transactionManager="schedulerTransactionManager", rollbackFor={Exception.class})
    public void addJob(SchedJob job) {
        job.verifyBeforeAdd();
        super.verifyJob(job);
        job.checkAndDefaultSetting();
        job.setJobId(Long.valueOf(this.generateId()));
        Date now = new Date();
        this.parseTriggerConfig(job, now);
        job.setCreatedAt(now);
        job.setUpdatedAt(now);
        this.jobMapper.insert(job);
    }

    @Transactional(transactionManager="schedulerTransactionManager", rollbackFor={Exception.class})
    public void updateJob(SchedJob job) {
        job.verifyBeforeUpdate();
        if (StringUtils.isEmpty((CharSequence)job.getJobHandler())) {
            Assert.hasText((String)job.getJobParam(), (String)"Job param must be null if not set job handler.");
        } else {
            super.verifyJob(job);
        }
        job.checkAndDefaultSetting();
        SchedJob dbSchedJob = this.jobMapper.getByJobId(job.getJobId());
        Assert.notNull((Object)dbSchedJob, () -> "Sched job id not found " + job.getJobId());
        job.setNextTriggerTime(dbSchedJob.getNextTriggerTime());
        Date now = new Date();
        if (job.getTriggerType() == null) {
            Assert.isNull((Object)job.getTriggerValue(), (String)"Trigger value must be null if not set trigger type.");
        } else {
            Assert.notNull((Object)job.getTriggerValue(), (String)"Trigger value cannot be null if has set trigger type.");
            this.dependMapper.deleteByChildJobId(job.getJobId());
            this.parseTriggerConfig(job, now);
        }
        job.setUpdatedAt(now);
        Assert.state((this.jobMapper.updateByJobId(job) == 1 ? 1 : 0) != 0, (String)"Update sched job fail or conflict.");
    }

    @Transactional(transactionManager="schedulerTransactionManager", rollbackFor={Exception.class})
    public void deleteJob(long jobId) {
        Assert.isTrue((this.jobMapper.deleteByJobId(jobId) == 1 ? 1 : 0) != 0, (String)"Delete sched job fail or conflict.");
        this.dependMapper.deleteByParentJobId(jobId);
        this.dependMapper.deleteByChildJobId(jobId);
    }

    @Transactional(transactionManager="schedulerTransactionManager", rollbackFor={Exception.class})
    public void triggerJob(long jobId) throws JobException {
        SchedJob job = this.jobMapper.getByJobId(jobId);
        Assert.notNull((Object)job, () -> "Sched job not found: " + jobId);
        TriggerInstanceCreator<?> creator = TriggerInstanceCreator.of(job.getJobType(), this);
        Object tInstance = creator.create(job, RunType.MANUAL, System.currentTimeMillis());
        this.createInstance((TriggerInstance)tInstance);
        TransactionUtils.doAfterTransactionCommit(() -> creator.dispatch(job, tInstance));
    }

    @Transactional(transactionManager="schedulerTransactionManager", rollbackFor={Exception.class})
    public boolean createInstance(SchedJob job, TriggerInstance tInstance) {
        int row = this.jobMapper.updateNextTriggerTime(job);
        if (row == 0) {
            return false;
        }
        this.createInstance(tInstance);
        return true;
    }

    public void deleteInstance(long instanceId) {
        this.doTransactionInSynchronized(instanceId, () -> {
            SchedInstance instance = this.instanceMapper.lock(instanceId);
            Assert.notNull((Object)instance, () -> "Sched instance not found: " + instanceId);
            RunState runState = RunState.of((Integer)instance.getRunState());
            Assert.isTrue((boolean)runState.isTerminal(), () -> "Cannot delete unterminated sched instance: " + instanceId + ", run state=" + runState);
            int row = this.instanceMapper.deleteByInstanceId(instanceId);
            Assert.isTrue((row == 1 ? 1 : 0) != 0, () -> "Delete sched instance conflict: " + instanceId);
            this.taskMapper.deleteByInstanceId(instanceId);
        });
    }

    public void forceChangeState(long instanceId, int targetExecuteState) {
        ExecuteState toExecuteState = ExecuteState.of((Integer)targetExecuteState);
        RunState toRunState = toExecuteState.runState();
        Assert.isTrue((toExecuteState != ExecuteState.EXECUTING ? 1 : 0) != 0, (String)"Cannot force update state to EXECUTING");
        this.doTransactionInSynchronized(instanceId, () -> {
            Assert.notNull((Object)this.instanceMapper.lock(instanceId), () -> "Sched instance not found: " + instanceId);
            int row1 = this.instanceMapper.forceChangeState(instanceId, toRunState.value());
            int row2 = this.taskMapper.forceChangeState(instanceId, toExecuteState.value());
            if (row1 == 0 && row2 == 0) {
                throw new IllegalStateException("Force update instance state failed: " + instanceId);
            }
            if (toExecuteState == ExecuteState.WAITING) {
                Tuple3<SchedJob, SchedInstance, List<SchedTask>> params = this.buildDispatchParams(instanceId, row2);
                TransactionUtils.doAfterTransactionCommit(() -> super.dispatch((SchedJob)params.a, (SchedInstance)params.b, (List)params.c));
            }
            this.log.info("Force change state success {} | {}", (Object)instanceId, (Object)toExecuteState);
        });
    }

    @Transactional(transactionManager="schedulerTransactionManager", rollbackFor={Exception.class})
    public void updateTaskWorker(List<TaskWorkerParam> params) {
        if (CollectionUtils.isEmpty(params)) {
            return;
        }
        params.sort(Comparator.comparing(TaskWorkerParam::getTaskId));
        if (params.size() <= 200) {
            this.taskMapper.batchUpdateWorker(params);
        } else {
            Lists.partition(params, (int)200).forEach(this.taskMapper::batchUpdateWorker);
        }
    }

    @Transactional(transactionManager="schedulerTransactionManager", rollbackFor={Exception.class})
    public boolean startTask(StartTaskParam param) {
        SchedInstance instance = this.instanceMapper.getByInstanceId(param.getInstanceId());
        Assert.notNull((Object)instance, () -> "Sched instance not found: " + param);
        Integer state = instance.getRunState();
        Assert.state((boolean)RUN_STATE_PAUSABLE.contains(state), () -> "Start instance failed: " + param + ", " + state);
        Date now = new Date();
        int instanceRow = this.instanceMapper.start(param.getInstanceId(), now);
        int taskRow = this.taskMapper.start(param.getTaskId(), param.getWorker(), now);
        if (instanceRow == 0 && taskRow == 0) {
            return false;
        }
        Assert.state((taskRow == 1 ? 1 : 0) != 0, () -> "Start task failed: " + param);
        return true;
    }

    public boolean terminateTask(TerminateTaskParam param) {
        ExecuteState toState = param.getToState();
        Assert.isTrue((!ExecuteState.PAUSABLE_LIST.contains(toState) ? 1 : 0) != 0, () -> "Stop executing invalid to state " + toState);
        return this.doTransactionInSynchronized(param.getInstanceId(), () -> {
            SchedInstance instance = this.instanceMapper.lock(param.getInstanceId());
            Assert.notNull((Object)instance, () -> "Terminate executing task failed, instance not found: " + param.getInstanceId());
            if (RunState.of((Integer)instance.getRunState()).isTerminal()) {
                return false;
            }
            Date executeEndTime = toState.isTerminal() ? new Date() : null;
            int row = this.taskMapper.terminate(param.getTaskId(), toState.value(), ExecuteState.EXECUTING.value(), executeEndTime, param.getErrorMsg());
            if (row != 1) {
                this.log.warn("Conflict terminate executing task: {} | {}", (Object)param.getTaskId(), (Object)toState);
                return false;
            }
            Tuple2<RunState, Date> tuple = this.obtainRunState(this.taskMapper.findMediumByInstanceId(param.getInstanceId()));
            if (tuple != null && (row = this.instanceMapper.terminate(param.getInstanceId(), ((RunState)tuple.a).value(), RUN_STATE_CANCELABLE, (Date)tuple.b)) == 1 && param.getOperation() == Operations.TRIGGER) {
                instance.setRunState(Integer.valueOf(((RunState)tuple.a).value()));
                this.afterTerminateTask(instance);
            }
            return true;
        });
    }

    public boolean purgeInstance(long instanceId) {
        return this.doTransactionInSynchronized(instanceId, () -> {
            SchedInstance instance = this.instanceMapper.lock(instanceId);
            Assert.notNull((Object)instance, () -> "Purge instance not found: " + instanceId);
            if (!RUN_STATE_PAUSABLE.contains(instance.getRunState())) {
                return false;
            }
            List<SchedTask> tasks = this.taskMapper.findMediumByInstanceId(instanceId);
            if (tasks.stream().anyMatch(e -> ExecuteState.WAITING.equals(e.getExecuteState()))) {
                this.log.warn("Purge instance failed, has waiting task: {}", tasks);
                return false;
            }
            if (this.hasAliveExecuting(tasks)) {
                this.log.warn("Purge instance failed, has alive executing task: {}", tasks);
                return false;
            }
            Tuple2 tuple = (Tuple2)ObjectUtils.defaultIfNull(this.obtainRunState(tasks), () -> Tuple2.of((Object)RunState.CANCELED, (Object)new Date()));
            if (this.instanceMapper.terminate(instanceId, ((RunState)tuple.a).value(), RUN_STATE_CANCELABLE, (Date)tuple.b) != 1) {
                return false;
            }
            tasks.stream().filter(e -> EXECUTE_STATE_PAUSABLE.contains(e.getExecuteState())).forEach(e -> this.taskMapper.terminate(e.getTaskId(), ExecuteState.EXECUTE_TIMEOUT.value(), e.getExecuteState(), new Date(), null));
            instance.setRunState(Integer.valueOf(((RunState)tuple.a).value()));
            this.afterTerminateTask(instance);
            this.log.warn("Purge instance {} to state {}", (Object)instanceId, tuple.a);
            return true;
        });
    }

    public boolean pauseInstance(long instanceId) {
        return this.doTransactionInSynchronized(instanceId, () -> {
            SchedInstance instance = this.instanceMapper.lock(instanceId);
            Assert.notNull((Object)instance, () -> "Pause instance not found: " + instanceId);
            if (!RUN_STATE_PAUSABLE.contains(instance.getRunState())) {
                return false;
            }
            Operations ops = Operations.PAUSE;
            this.taskMapper.updateStateByInstanceId(instanceId, ops.toState().value(), EXECUTE_STATE_WAITING, null);
            List<ExecuteTaskParam> executingTasks = this.loadExecutingTasks(instance, ops);
            if (executingTasks.isEmpty()) {
                Tuple2<RunState, Date> tuple = this.obtainRunState(this.taskMapper.findMediumByInstanceId(instanceId));
                Assert.notNull(tuple, () -> "Pause instance failed: " + instanceId);
                if (this.instanceMapper.terminate(instanceId, ((RunState)tuple.a).value(), RUN_STATE_CANCELABLE, (Date)tuple.b) != 1) {
                    this.log.warn("Pause instance from {} to {} conflict", (Object)RunState.of((Integer)instance.getRunState()), tuple.a);
                }
            } else {
                TransactionUtils.doAfterTransactionCommit(() -> super.dispatch(executingTasks));
            }
            return true;
        });
    }

    public boolean cancelInstance(long instanceId, Operations ops) {
        Assert.isTrue((boolean)ops.toState().isFailure(), () -> "Cancel instance operation invalid: " + ops);
        return this.doTransactionInSynchronized(instanceId, () -> {
            SchedInstance instance = this.instanceMapper.lock(instanceId);
            Assert.notNull((Object)instance, () -> "Cancel instance not found: " + instanceId);
            RunState runState = RunState.of((Integer)instance.getRunState());
            if (runState.isTerminal()) {
                return false;
            }
            this.taskMapper.updateStateByInstanceId(instanceId, ops.toState().value(), EXECUTE_STATE_EXECUTABLE, new Date());
            List<ExecuteTaskParam> executingTasks = this.loadExecutingTasks(instance, ops);
            if (executingTasks.isEmpty()) {
                Tuple2 tuple = this.obtainRunState(this.taskMapper.findMediumByInstanceId(instanceId));
                Assert.notNull(tuple, () -> "Cancel instance failed: " + instanceId);
                if (tuple.a == RunState.PAUSED) {
                    tuple = Tuple2.of((Object)RunState.CANCELED, (Object)new Date());
                }
                if (this.instanceMapper.terminate(instanceId, ((RunState)tuple.a).value(), RUN_STATE_CANCELABLE, (Date)tuple.b) != 1) {
                    this.log.warn("Cancel instance from {} to {} conflict", (Object)runState, tuple.a);
                }
            } else {
                TransactionUtils.doAfterTransactionCommit(() -> super.dispatch(executingTasks));
            }
            return true;
        });
    }

    public boolean resumeInstance(long instanceId) {
        return this.doTransactionInSynchronized(instanceId, () -> {
            SchedInstance instance = this.instanceMapper.lock(instanceId);
            Assert.notNull((Object)instance, () -> "Cancel failed, instance_id not found: " + instanceId);
            if (!RunState.PAUSED.equals(instance.getRunState())) {
                return false;
            }
            int row = this.instanceMapper.updateState(instanceId, RunState.WAITING.value(), RunState.PAUSED.value());
            Assert.state((row == 1 ? 1 : 0) != 0, (String)"Resume sched instance failed.");
            row = this.taskMapper.updateStateByInstanceId(instanceId, ExecuteState.WAITING.value(), EXECUTE_STATE_PAUSED, null);
            Assert.state((row >= 1 ? 1 : 0) != 0, (String)"Resume sched task failed.");
            Tuple3<SchedJob, SchedInstance, List<SchedTask>> params = this.buildDispatchParams(instanceId, row);
            TransactionUtils.doAfterTransactionCommit(() -> super.dispatch((SchedJob)params.a, (SchedInstance)params.b, (List)params.c));
            return true;
        });
    }

    private void createInstance(TriggerInstance tInstance) {
        int row = this.instanceMapper.insert(tInstance.getInstance());
        Assert.state((row == 1 ? 1 : 0) != 0, () -> "Insert sched instance fail: " + tInstance.getInstance());
        if (tInstance instanceof NormalInstanceCreator.NormalInstance) {
            NormalInstanceCreator.NormalInstance normal = (NormalInstanceCreator.NormalInstance)tInstance;
            this.insertBatchTask(normal.getTasks());
        } else if (tInstance instanceof WorkflowInstanceCreator.WorkflowInstance) {
            WorkflowInstanceCreator.WorkflowInstance wInstance = (WorkflowInstanceCreator.WorkflowInstance)tInstance;
            this.insertBatchWorkflow(wInstance.getWorkflows());
            for (Tuple2<SchedInstance, List<SchedTask>> sub : wInstance.getSubInstances()) {
                row = this.instanceMapper.insert((SchedInstance)sub.a);
                Assert.state((row == 1 ? 1 : 0) != 0, () -> "Insert sub sched instance fail: " + tInstance.getInstance());
                this.insertBatchTask((List)sub.b);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean doTransactionInSynchronized(long lockKey, BooleanSupplier action) {
        String string = Long.toString(lockKey).intern();
        synchronized (string) {
            return Boolean.TRUE.equals(this.transactionTemplate.execute(status -> action.getAsBoolean()));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doTransactionInSynchronized(long lockKey, Runnable action) {
        String string = Long.toString(lockKey).intern();
        synchronized (string) {
            this.transactionTemplate.executeWithoutResult(status -> action.run());
        }
    }

    private Tuple2<RunState, Date> obtainRunState(List<SchedTask> tasks) {
        List states = tasks.stream().map(SchedTask::getExecuteState).map(ExecuteState::of).collect(Collectors.toList());
        if (states.stream().allMatch(ExecuteState::isTerminal)) {
            return Tuple2.of((Object)(states.stream().anyMatch(ExecuteState::isFailure) ? RunState.CANCELED : RunState.FINISHED), (Object)tasks.stream().map(SchedTask::getExecuteEndTime).filter(Objects::nonNull).max(Comparator.naturalOrder()).orElseGet(Date::new));
        }
        return states.stream().anyMatch(ExecuteState.PAUSABLE_LIST::contains) ? null : Tuple2.of((Object)RunState.PAUSED, null);
    }

    private void afterTerminateTask(SchedInstance instance) {
        RunState runState = RunState.of((Integer)instance.getRunState());
        if (runState == RunState.CANCELED) {
            this.retryJob(instance);
        } else if (runState == RunState.FINISHED) {
            if (instance.isWorkflowNode()) {
                this.workflowNode(instance);
            } else {
                this.dependJob(instance);
            }
        } else {
            this.log.error("Unknown terminate run state " + runState);
        }
    }

    private void workflowNode(SchedInstance subInstance) {
        WorkflowGraph graph;
        Map ends;
        if (!subInstance.isWorkflowNode()) {
            return;
        }
        RunState runState = RunState.of((Integer)subInstance.getRunState());
        Assert.hasText((String)subInstance.getAttach(), () -> "Workflow instance attach cannot blank: " + subInstance);
        WorkflowAttach attach = (WorkflowAttach)Jsons.fromJson((String)subInstance.getAttach(), WorkflowAttach.class);
        Long workflowInstanceId = subInstance.getWorkflowInstanceId();
        Assert.isTrue((boolean)TransactionSynchronizationManager.isActualTransactionActive(), (String)"Workflow instance must be in transaction.");
        SchedInstance workflowInstance = this.instanceMapper.lock(workflowInstanceId);
        int row = this.workflowMapper.update(workflowInstanceId, attach.getCurNode(), runState.value(), null, RUN_STATE_CANCELABLE, null);
        if (row < 1) {
            this.log.warn("Update workflow node conflict: {} | {}", (Object)subInstance, (Object)runState);
            return;
        }
        if (runState == RunState.CANCELED) {
            this.workflowMapper.cancelWorkflow(workflowInstanceId);
        }
        if ((ends = (graph = new WorkflowGraph(this.workflowMapper.findByWorkflowInstanceId(workflowInstanceId))).predecessors(DAGNode.END)).values().stream().allMatch(SchedWorkflow::isTerminal)) {
            RunState endState = ends.values().stream().anyMatch(SchedWorkflow::isFailure) ? RunState.CANCELED : RunState.FINISHED;
            row = this.workflowMapper.update(workflowInstanceId, DAGNode.END.toString(), endState.value(), null, RUN_STATE_CANCELABLE, null);
            if (row < 1) {
                this.log.warn("Update workflow end conflict: {} | {}", (Object)subInstance, (Object)endState);
                return;
            }
            ends.forEach((k, v) -> graph.get(k.getTarget(), DAGNode.END).setRunState(Integer.valueOf(endState.value())));
        }
        Date now = new Date();
        if (graph.allMatch(SchedWorkflow::isTerminal)) {
            RunState state;
            RunState runState2 = state = graph.anyMatch(SchedWorkflow::isFailure) ? RunState.CANCELED : RunState.FINISHED;
            if (this.instanceMapper.terminate(workflowInstanceId, state.value(), RUN_STATE_CANCELABLE, now) == 1) {
                this.afterTerminateTask(this.instanceMapper.getByInstanceId(workflowInstanceId));
            } else {
                this.log.warn("Terminate workflow instance conflict: {} | {}", (Object)subInstance, (Object)state);
            }
            return;
        }
        Long jobId = workflowInstance.getJobId();
        SchedJob job = (SchedJob)LazyLoader.of(SchedJob.class, this.jobMapper::getByJobId, (Object)jobId);
        DAGNode curNode = DAGNode.fromString((String)attach.getCurNode());
        for (Map.Entry node : graph.successors(curNode).entrySet()) {
            DAGNode target = ((DAGEdge)node.getKey()).getTarget();
            SchedWorkflow workflow = (SchedWorkflow)node.getValue();
            if (target.isEnd() || !RunState.WAITING.equals(workflow.getRunState()) || graph.predecessors(target).values().stream().anyMatch(e -> !RunState.FINISHED.equals(e.getRunState()))) continue;
            long instanceId = this.generateId();
            row = this.workflowMapper.update(workflowInstanceId, target.toString(), RunState.RUNNING.value(), instanceId, RUN_STATE_WAITING, null);
            if (row < 1) continue;
            long triggerTime = workflowInstance.getTriggerTime() + (long)workflow.getSequence().intValue();
            SchedInstance nextInstance = SchedInstance.create((long)instanceId, (long)jobId, (RunType)RunType.of((Integer)workflowInstance.getRunType()), (long)triggerTime, (int)0, (Date)now);
            nextInstance.setRootInstanceId(subInstance.obtainRootInstanceId());
            nextInstance.setParentInstanceId(subInstance.getInstanceId());
            nextInstance.setWorkflowInstanceId(subInstance.getWorkflowInstanceId());
            nextInstance.setAttach(Jsons.toJson((Object)new WorkflowAttach(workflow.getCurNode())));
            try {
                List<SchedTask> tasks = this.splitTasks(SplitJobParam.from(job, target.getName()), nextInstance.getInstanceId(), new Date());
                this.instanceMapper.insert(nextInstance);
                this.insertBatchTask(tasks);
                TransactionUtils.doAfterTransactionCommit(() -> super.dispatch(job, nextInstance, tasks));
            }
            catch (Exception e2) {
                this.log.error("Split workflow job task error: " + subInstance, (Throwable)e2);
            }
        }
    }

    private void retryJob(SchedInstance prev) {
        List<Object> tasks;
        SchedJob schedJob = this.jobMapper.getByJobId(prev.getJobId());
        if (schedJob == null) {
            this.log.error("Sched job not found {}", (Object)prev.getJobId());
            this.workflowNode(prev);
            return;
        }
        List<SchedTask> prevTasks = this.taskMapper.findLargeByInstanceId(prev.getInstanceId());
        RetryType retryType = RetryType.of((Integer)schedJob.getRetryType());
        if (retryType == RetryType.NONE || schedJob.getRetryCount() < 1) {
            this.workflowNode(prev);
            return;
        }
        int retriedCount = Optional.ofNullable(prev.getRetriedCount()).orElse(0);
        if (retriedCount >= schedJob.getRetryCount()) {
            this.workflowNode(prev);
            return;
        }
        long retryInstanceId = this.generateId();
        if (prev.isWorkflowNode()) {
            String curNode = ((WorkflowAttach)Jsons.fromJson((String)prev.getAttach(), WorkflowAttach.class)).getCurNode();
            int row = this.workflowMapper.update(prev.getWorkflowInstanceId(), curNode, null, retryInstanceId, RUN_STATE_RUNNING, prev.getInstanceId());
            if (row < 1) {
                return;
            }
        }
        Date now = new Date();
        long triggerTime = SchedulerJobManager.computeRetryTriggerTime(schedJob, ++retriedCount, now);
        SchedInstance retryInstance = SchedInstance.create((long)retryInstanceId, (long)schedJob.getJobId(), (RunType)RunType.RETRY, (long)triggerTime, (int)retriedCount, (Date)now);
        retryInstance.setRootInstanceId(prev.obtainRootInstanceId());
        retryInstance.setParentInstanceId(prev.getInstanceId());
        retryInstance.setWorkflowInstanceId(prev.getWorkflowInstanceId());
        retryInstance.setAttach(prev.getAttach());
        switch (retryType) {
            case ALL: {
                try {
                    tasks = this.splitTasks(SplitJobParam.from(schedJob), retryInstance.getInstanceId(), now);
                    break;
                }
                catch (Exception e2) {
                    this.log.error("Split job error: " + schedJob + ", " + prev, (Throwable)e2);
                    return;
                }
            }
            case FAILED: {
                tasks = prevTasks.stream().filter(e -> ExecuteState.of((Integer)e.getExecuteState()).isFailure()).filter(e -> !RouteStrategy.BROADCAST.equals(schedJob.getRouteStrategy()) || super.isAliveWorker(e.getWorker())).map(e -> SchedTask.create((String)e.getTaskParam(), (long)this.generateId(), (long)retryInstance.getInstanceId(), (int)e.getTaskNo(), (int)e.getTaskCount(), (Date)now, (String)e.getWorker())).collect(Collectors.toList());
                break;
            }
            default: {
                this.log.error("Job unsupported retry type {}", (Object)schedJob);
                return;
            }
        }
        Assert.notEmpty(tasks, (String)"Insert list of task cannot be empty.");
        this.instanceMapper.insert(retryInstance);
        this.insertBatchTask(tasks);
        TransactionUtils.doAfterTransactionCommit(() -> super.dispatch(schedJob, retryInstance, tasks));
    }

    private void dependJob(SchedInstance parentInstance) {
        List<SchedDepend> schedDepends = this.dependMapper.findByParentJobId(parentInstance.getJobId());
        if (CollectionUtils.isEmpty(schedDepends)) {
            return;
        }
        for (SchedDepend depend : schedDepends) {
            SchedJob childJob = this.jobMapper.getByJobId(depend.getChildJobId());
            if (childJob == null) {
                this.log.error("Child sched job not found: {} | {}", (Object)depend.getParentJobId(), (Object)depend.getChildJobId());
                continue;
            }
            if (JobState.DISABLE.equals(childJob.getJobState())) continue;
            try {
                TriggerInstanceCreator<?> creator = TriggerInstanceCreator.of(childJob.getJobType(), this);
                Object tInstance = creator.create(childJob, RunType.DEPEND, parentInstance.getTriggerTime());
                ((TriggerInstance)tInstance).getInstance().setRootInstanceId(parentInstance.obtainRootInstanceId());
                ((TriggerInstance)tInstance).getInstance().setParentInstanceId(parentInstance.getInstanceId());
                this.createInstance((TriggerInstance)tInstance);
                TransactionUtils.doAfterTransactionCommit(() -> creator.dispatch(childJob, tInstance));
            }
            catch (Exception e) {
                this.log.error("Depend job split failed: " + childJob, (Throwable)e);
            }
        }
    }

    private void insertBatchTask(List<SchedTask> tasks) {
        Assert.notEmpty(tasks, (String)"Insert list of task cannot be empty.");
        if (tasks.size() <= 200) {
            int row = this.taskMapper.insertBatch(tasks);
            Assert.state((row == tasks.size() ? 1 : 0) != 0, () -> "Insert sched task fail: " + tasks);
        } else {
            List partition = Lists.partition(tasks, (int)200);
            for (List list : partition) {
                int row = this.taskMapper.insertBatch(list);
                Assert.state((row == list.size() ? 1 : 0) != 0, () -> "Insert sched task fail: " + tasks);
            }
        }
    }

    private void insertBatchWorkflow(List<SchedWorkflow> workflows) {
        Assert.notEmpty(workflows, (String)"Insert list of workflow cannot be empty.");
        if (workflows.size() <= 200) {
            int row = this.workflowMapper.insertBatch(workflows);
            Assert.state((row == workflows.size() ? 1 : 0) != 0, () -> "Insert sched workflow fail: " + ((SchedWorkflow)workflows.get(0)).getWorkflowInstanceId());
        } else {
            List partition = Lists.partition(workflows, (int)200);
            for (List list : partition) {
                int row = this.workflowMapper.insertBatch(list);
                Assert.state((row == list.size() ? 1 : 0) != 0, () -> "Insert sched task fail: " + ((SchedWorkflow)workflows.get(0)).getWorkflowInstanceId());
            }
        }
    }

    private List<ExecuteTaskParam> loadExecutingTasks(SchedInstance instance, Operations ops) {
        ArrayList<ExecuteTaskParam> executingTasks = new ArrayList<ExecuteTaskParam>();
        ExecuteTaskParamBuilder builder = ExecuteTaskParam.builder((SchedInstance)instance, this.jobMapper::getByJobId);
        long triggerTime = 0L;
        this.taskMapper.findMediumByInstanceId(instance.getInstanceId()).stream().filter(e -> ExecuteState.EXECUTING.equals(e.getExecuteState())).forEach(task -> {
            Worker worker = Worker.deserialize((String)task.getWorker());
            if (super.isAliveWorker(worker)) {
                executingTasks.add(builder.build(ops, task.getTaskId().longValue(), triggerTime, worker));
            } else {
                Date executeEndTime = ops.toState().isTerminal() ? new Date() : null;
                int row = this.taskMapper.terminate(task.getTaskId(), ops.toState().value(), ExecuteState.EXECUTING.value(), executeEndTime, null);
                if (row != 1) {
                    this.log.warn("Cancel the dead task failed: {}", task);
                    executingTasks.add(builder.build(ops, task.getTaskId().longValue(), triggerTime, worker));
                } else {
                    this.log.info("Cancel the dead task success: {}", task);
                }
            }
        });
        return executingTasks;
    }

    private Tuple3<SchedJob, SchedInstance, List<SchedTask>> buildDispatchParams(long instanceId, int expectTaskSize) {
        SchedInstance instance = this.instanceMapper.getByInstanceId(instanceId);
        SchedJob job = this.jobMapper.getByJobId(instance.getJobId());
        List waitingTasks = this.taskMapper.findLargeByInstanceId(instanceId).stream().filter(e -> ExecuteState.WAITING.equals(e.getExecuteState())).collect(Collectors.toList());
        Assert.isTrue((waitingTasks.size() == expectTaskSize ? 1 : 0) != 0, () -> "Dispatching tasks size inconsistent, expect=" + expectTaskSize + ", actual=" + waitingTasks.size());
        return Tuple3.of((Object)job, (Object)instance, waitingTasks);
    }

    private void parseTriggerConfig(SchedJob job, Date date) {
        TriggerType triggerType = TriggerType.of((Integer)job.getTriggerType());
        Assert.isTrue((boolean)triggerType.isValid(job.getTriggerValue()), () -> "Invalid trigger value: " + job.getTriggerType() + ", " + job.getTriggerValue());
        if (triggerType == TriggerType.DEPEND) {
            List<Long> parentJobIds = Arrays.stream(job.getTriggerValue().split(",")).filter(StringUtils::isNotBlank).map(e -> Long.parseLong(e.trim())).distinct().collect(Collectors.toList());
            Assert.notEmpty(parentJobIds, () -> "Invalid dependency parent job id config: " + job.getTriggerValue());
            Map parentJobMap = this.jobMapper.findByJobIds(parentJobIds).stream().collect(Collectors.toMap(SchedJob::getJobId, Function.identity()));
            for (Long parentJobId : parentJobIds) {
                SchedJob parentJob = (SchedJob)parentJobMap.get(parentJobId);
                Assert.notNull((Object)parentJob, () -> "Parent job id not found: " + parentJobId);
                Assert.isTrue((boolean)job.getJobGroup().equals(parentJob.getJobGroup()), () -> "Parent job '" + parentJob.getJobId() + "' group '" + parentJob.getJobGroup() + "' different '" + job.getJobGroup() + "'");
            }
            this.dependMapper.insertBatch(parentJobIds.stream().map(e -> new SchedDepend(e, job.getJobId())).collect(Collectors.toList()));
            job.setTriggerValue(Joiner.on((String)",").join(parentJobIds));
            job.setNextTriggerTime(null);
        } else {
            Date nextTriggerTime = triggerType.computeNextFireTime(job.getTriggerValue(), date);
            Assert.notNull((Object)nextTriggerTime, () -> "Has not next trigger time " + job.getTriggerValue());
            job.setNextTriggerTime(Long.valueOf(nextTriggerTime.getTime()));
        }
    }

    private static long computeRetryTriggerTime(SchedJob job, int failCount, Date current) {
        Assert.isTrue((!RetryType.NONE.equals(job.getRetryType()) ? 1 : 0) != 0, () -> "Sched job '" + job.getJobId() + "' retry type is NONE.");
        Assert.isTrue((job.getRetryCount() > 0 ? 1 : 0) != 0, () -> "Sched job '" + job.getJobId() + "' retry count must greater than 0, but actual " + job.getRetryCount());
        Assert.isTrue((failCount <= job.getRetryCount() ? 1 : 0) != 0, () -> "Sched job '" + job.getJobId() + "' retried " + failCount + " exceed " + job.getRetryCount() + " limit.");
        return current.getTime() + (long)job.getRetryInterval().intValue() * (long)IntMath.pow((int)failCount, (int)2);
    }
}

