/*
 * Decompiled with CFR 0.152.
 */
package cn.ponfee.disjob.supervisor.service;

import cn.ponfee.disjob.common.base.IdGenerator;
import cn.ponfee.disjob.common.base.LazyLoader;
import cn.ponfee.disjob.common.collect.Collects;
import cn.ponfee.disjob.common.dag.DAGEdge;
import cn.ponfee.disjob.common.dag.DAGNode;
import cn.ponfee.disjob.common.spring.TransactionUtils;
import cn.ponfee.disjob.common.tuple.Tuple2;
import cn.ponfee.disjob.common.tuple.Tuple3;
import cn.ponfee.disjob.common.util.Functions;
import cn.ponfee.disjob.common.util.Jsons;
import cn.ponfee.disjob.common.util.Strings;
import cn.ponfee.disjob.core.base.JobConstants;
import cn.ponfee.disjob.core.base.Worker;
import cn.ponfee.disjob.core.enums.ExecuteState;
import cn.ponfee.disjob.core.enums.JobState;
import cn.ponfee.disjob.core.enums.Operations;
import cn.ponfee.disjob.core.enums.RetryType;
import cn.ponfee.disjob.core.enums.RouteStrategy;
import cn.ponfee.disjob.core.enums.RunState;
import cn.ponfee.disjob.core.enums.RunType;
import cn.ponfee.disjob.core.exception.JobException;
import cn.ponfee.disjob.core.model.InstanceAttach;
import cn.ponfee.disjob.core.model.SchedDepend;
import cn.ponfee.disjob.core.model.SchedInstance;
import cn.ponfee.disjob.core.model.SchedJob;
import cn.ponfee.disjob.core.model.SchedTask;
import cn.ponfee.disjob.core.model.SchedWorkflow;
import cn.ponfee.disjob.core.param.supervisor.StartTaskParam;
import cn.ponfee.disjob.core.param.supervisor.TerminateTaskParam;
import cn.ponfee.disjob.core.param.supervisor.UpdateTaskWorkerParam;
import cn.ponfee.disjob.core.param.worker.JobHandlerParam;
import cn.ponfee.disjob.dispatch.ExecuteTaskParam;
import cn.ponfee.disjob.dispatch.TaskDispatcher;
import cn.ponfee.disjob.registry.SupervisorRegistry;
import cn.ponfee.disjob.supervisor.application.SchedGroupService;
import cn.ponfee.disjob.supervisor.base.WorkerRpcClient;
import cn.ponfee.disjob.supervisor.dag.WorkflowGraph;
import cn.ponfee.disjob.supervisor.dao.mapper.SchedDependMapper;
import cn.ponfee.disjob.supervisor.dao.mapper.SchedInstanceMapper;
import cn.ponfee.disjob.supervisor.dao.mapper.SchedJobMapper;
import cn.ponfee.disjob.supervisor.dao.mapper.SchedTaskMapper;
import cn.ponfee.disjob.supervisor.dao.mapper.SchedWorkflowMapper;
import cn.ponfee.disjob.supervisor.instance.GeneralInstanceCreator;
import cn.ponfee.disjob.supervisor.instance.TriggerInstance;
import cn.ponfee.disjob.supervisor.instance.TriggerInstanceCreator;
import cn.ponfee.disjob.supervisor.instance.WorkflowInstanceCreator;
import cn.ponfee.disjob.supervisor.service.AbstractJobManager;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.Assert;

@Component
public class DistributedJobManager
extends AbstractJobManager {
    private static final Logger LOG = LoggerFactory.getLogger(DistributedJobManager.class);
    private static final List<Integer> RUN_STATE_TERMINABLE = Collects.convert((List)RunState.TERMINABLE_LIST, RunState::value);
    private static final List<Integer> RUN_STATE_RUNNABLE = Collects.convert((List)RunState.RUNNABLE_LIST, RunState::value);
    private static final List<Integer> RUN_STATE_PAUSABLE = Collects.convert((List)RunState.PAUSABLE_LIST, RunState::value);
    private static final List<Integer> RUN_STATE_WAITING = Collections.singletonList(RunState.WAITING.value());
    private static final List<Integer> RUN_STATE_RUNNING = Collections.singletonList(RunState.RUNNING.value());
    private static final List<Integer> RUN_STATE_PAUSED = Collections.singletonList(RunState.PAUSED.value());
    private static final List<Integer> EXECUTE_STATE_EXECUTABLE = Collects.convert((List)ExecuteState.EXECUTABLE_LIST, ExecuteState::value);
    private static final List<Integer> EXECUTE_STATE_PAUSABLE = Collects.convert((List)ExecuteState.PAUSABLE_LIST, ExecuteState::value);
    private static final List<Integer> EXECUTE_STATE_WAITING = Collections.singletonList(ExecuteState.WAITING.value());
    private static final List<Integer> EXECUTE_STATE_PAUSED = Collections.singletonList(ExecuteState.PAUSED.value());
    private final TransactionTemplate transactionTemplate;
    private final SchedInstanceMapper instanceMapper;
    private final SchedTaskMapper taskMapper;
    private final SchedWorkflowMapper workflowMapper;

    public DistributedJobManager(SchedJobMapper jobMapper, SchedDependMapper dependMapper, SchedInstanceMapper instanceMapper, SchedTaskMapper taskMapper, SchedWorkflowMapper workflowMapper, IdGenerator idGenerator, SupervisorRegistry discoveryWorker, TaskDispatcher taskDispatcher, WorkerRpcClient workerRpcClient, @Qualifier(value="disjobTransactionTemplate") TransactionTemplate transactionTemplate) {
        super(jobMapper, dependMapper, idGenerator, discoveryWorker, taskDispatcher, workerRpcClient);
        this.transactionTemplate = transactionTemplate;
        this.instanceMapper = instanceMapper;
        this.taskMapper = taskMapper;
        this.workflowMapper = workflowMapper;
    }

    public boolean renewInstanceUpdateTime(SchedInstance instance, Date updateTime) {
        return TransactionUtils.isOneAffectedRow((int)this.instanceMapper.renewUpdateTime(instance.getInstanceId(), updateTime, instance.getVersion()));
    }

    @Override
    protected boolean cancelWaitingTask(long taskId) {
        return TransactionUtils.isOneAffectedRow((int)this.taskMapper.terminate(taskId, null, ExecuteState.BROADCAST_ABORTED.value(), ExecuteState.WAITING.value(), null, null));
    }

    public void savepoint(long taskId, String executeSnapshot) {
        TransactionUtils.assertOneAffectedRow((int)this.taskMapper.savepoint(taskId, executeSnapshot), () -> "Save point failed: " + taskId + " | " + executeSnapshot);
    }

    @Transactional(transactionManager="disjobTransactionManager", rollbackFor={Exception.class})
    public void triggerJob(long jobId) throws JobException {
        SchedJob job = this.jobMapper.get(jobId);
        Assert.notNull((Object)job, () -> "Sched job not found: " + jobId);
        TriggerInstanceCreator<?> creator = TriggerInstanceCreator.of(job.getJobType(), this);
        Object tInstance = creator.create(job, RunType.MANUAL, System.currentTimeMillis());
        this.createInstance((TriggerInstance)tInstance);
        TransactionUtils.doAfterTransactionCommit(() -> creator.dispatch(job, tInstance));
    }

    @Transactional(transactionManager="disjobTransactionManager", rollbackFor={Exception.class})
    public boolean createInstance(SchedJob job, TriggerInstance triggerInstance) {
        if (this.jobMapper.updateNextTriggerTime(job) == 0) {
            return false;
        }
        this.createInstance(triggerInstance);
        return true;
    }

    @Transactional(transactionManager="disjobTransactionManager", rollbackFor={Exception.class})
    public void updateTaskWorker(List<UpdateTaskWorkerParam> list) {
        if (CollectionUtils.isNotEmpty(list)) {
            list.sort(Comparator.comparingLong(UpdateTaskWorkerParam::getTaskId));
            Collects.batchProcess(list, this.taskMapper::batchUpdateWorker, (int)200);
        }
    }

    @Transactional(transactionManager="disjobTransactionManager", rollbackFor={Exception.class})
    public boolean startTask(StartTaskParam param) {
        SchedInstance instance = this.instanceMapper.get(param.getInstanceId());
        Assert.notNull((Object)instance, () -> "Sched instance not found: " + param);
        if (!RUN_STATE_PAUSABLE.contains(instance.getRunState())) {
            return false;
        }
        LOG.info("Task trace [{}] starting: {}", (Object)param.getTaskId(), (Object)param.getWorker());
        Date now = new Date();
        int row = 0;
        if (RunState.WAITING.equals(instance.getRunState())) {
            row = this.instanceMapper.start(param.getInstanceId(), now);
        }
        if (TransactionUtils.isOneAffectedRow((int)this.taskMapper.start(param.getTaskId(), param.getWorker(), now))) {
            return true;
        }
        Assert.state((row == 0 ? 1 : 0) != 0, () -> "Start task failed: " + param);
        return false;
    }

    public void changeInstanceState(long instanceId, ExecuteState toExecuteState) {
        RunState toRunState = toExecuteState.runState();
        Assert.isTrue((toExecuteState != ExecuteState.EXECUTING ? 1 : 0) != 0, (String)"Cannot force update state to EXECUTING");
        this.doTransactionLockInSynchronized(instanceId, null, (SchedInstance instance) -> {
            Assert.notNull((Object)instance, () -> "Sched instance not found: " + instanceId);
            Assert.isTrue((!instance.isWorkflow() ? 1 : 0) != 0, () -> "Unsupported force change workflow instance state: " + instanceId);
            int instRow = this.instanceMapper.changeState(instanceId, toRunState.value());
            int taskRow = this.taskMapper.changeState(instanceId, toExecuteState.value());
            if (instRow == 0 && taskRow == 0) {
                throw new IllegalStateException("Force update instance state failed: " + instanceId);
            }
            if (toExecuteState == ExecuteState.WAITING) {
                Tuple3<SchedJob, SchedInstance, List<SchedTask>> param = this.buildDispatchParam(instanceId, taskRow);
                TransactionUtils.doAfterTransactionCommit(() -> super.dispatch((SchedJob)param.a, (SchedInstance)param.b, (List)param.c));
            }
            LOG.info("Force change state success {} | {}", (Object)instanceId, (Object)toExecuteState);
        });
    }

    public void deleteInstance(long instanceId) {
        Long wnstanceId = this.instanceMapper.getWnstanceId(instanceId);
        this.doTransactionLockInSynchronized(instanceId, wnstanceId, (SchedInstance instance) -> {
            Assert.notNull((Object)instance, () -> "Sched instance not found: " + instanceId);
            Assert.isTrue((boolean)RunState.of((Integer)instance.getRunState()).isTerminal(), () -> "Deleting instance must be terminal: " + instance);
            if (instance.isWorkflow()) {
                Assert.isTrue((boolean)instance.isWorkflowLead(), () -> "Cannot delete workflow node instance: " + instanceId);
                int row = this.instanceMapper.deleteByInstanceId(instanceId);
                TransactionUtils.assertOneAffectedRow((int)row, () -> "Delete workflow lead instance conflict: " + instanceId);
                for (SchedInstance e : this.instanceMapper.findWorkflowNode(instance.getWnstanceId())) {
                    row = this.taskMapper.deleteByInstanceId(e.getInstanceId());
                    TransactionUtils.assertManyAffectedRow((int)row, () -> "Delete sched task conflict: " + instanceId);
                }
                row = this.instanceMapper.deleteByWnstanceId(instanceId);
                TransactionUtils.assertManyAffectedRow((int)row, () -> "Delete workflow node instance conflict: " + instanceId);
                row = this.workflowMapper.deleteByWnstanceId(instanceId);
                TransactionUtils.assertManyAffectedRow((int)row, () -> "Delete sched workflow conflict: " + instanceId);
            } else {
                int row = this.instanceMapper.deleteByInstanceId(instanceId);
                TransactionUtils.assertOneAffectedRow((int)row, () -> "Delete sched instance conflict: " + instanceId);
                row = this.taskMapper.deleteByInstanceId(instanceId);
                TransactionUtils.assertManyAffectedRow((int)row, () -> "Delete sched task conflict: " + instanceId);
            }
            LOG.info("Delete sched instance success {}", (Object)instanceId);
        });
    }

    public boolean terminateTask(TerminateTaskParam param) {
        Assert.hasText((String)param.getWorker(), (String)"Terminate task worker cannot be blank.");
        ExecuteState toState = param.getToState();
        long instanceId = param.getInstanceId();
        Assert.isTrue((!ExecuteState.PAUSABLE_LIST.contains(toState) ? 1 : 0) != 0, () -> "Stop executing invalid to state " + toState);
        LOG.info("Task trace [{}] terminating: {} | {}", new Object[]{param.getTaskId(), param.getOperation(), param.getWorker()});
        return this.doTransactionLockInSynchronized(instanceId, param.getWnstanceId(), (SchedInstance instance) -> {
            Assert.notNull((Object)instance, () -> "Terminate executing task failed, instance not found: " + instanceId);
            Assert.isTrue((!instance.isWorkflowLead() ? 1 : 0) != 0, () -> "Cannot direct terminate workflow lead instance: " + instance);
            if (RunState.of((Integer)instance.getRunState()).isTerminal()) {
                return false;
            }
            Date executeEndTime = toState.isTerminal() ? new Date() : null;
            int row = this.taskMapper.terminate(param.getTaskId(), param.getWorker(), toState.value(), ExecuteState.EXECUTING.value(), executeEndTime, param.getErrorMsg());
            if (!TransactionUtils.isOneAffectedRow((int)row)) {
                LOG.warn("Conflict terminate executing task: {} | {}", (Object)param.getTaskId(), (Object)toState);
                return false;
            }
            Tuple2<RunState, Date> tuple = this.obtainRunState(this.taskMapper.findBaseByInstanceId(instanceId));
            if (tuple != null && this.instanceMapper.terminate(instanceId, ((RunState)tuple.a).value(), RUN_STATE_TERMINABLE, (Date)tuple.b) > 0) {
                if (param.getOperation().isTrigger()) {
                    instance.markTerminated((RunState)tuple.a, (Date)tuple.b);
                    this.afterTerminateTask((SchedInstance)instance);
                } else if (instance.isWorkflowNode()) {
                    this.updateWorkflowEdgeState((SchedInstance)instance, ((RunState)tuple.a).value(), RUN_STATE_TERMINABLE);
                    this.updateWorkflowLeadState(this.instanceMapper.get(param.getWnstanceId()));
                }
            }
            return true;
        });
    }

    public boolean purgeInstance(SchedInstance inst) {
        LOG.info("Purge instance: {}", (Object)inst.getInstanceId());
        Long instanceId = inst.getInstanceId();
        return this.doTransactionLockInSynchronized((long)instanceId, inst.getWnstanceId(), (SchedInstance instance) -> {
            Assert.notNull((Object)instance, () -> "Purge instance not found: " + instanceId);
            Assert.isTrue((!instance.isWorkflowLead() ? 1 : 0) != 0, () -> "Cannot purge workflow lead instance: " + instance);
            if (!RUN_STATE_PAUSABLE.contains(instance.getRunState())) {
                return false;
            }
            List<SchedTask> tasks = this.taskMapper.findBaseByInstanceId(instanceId);
            if (tasks.stream().anyMatch(e -> ExecuteState.WAITING.equals(e.getExecuteState()))) {
                LOG.warn("Purge instance failed, has waiting task: {}", tasks);
                return false;
            }
            if (this.hasAliveExecuting(tasks)) {
                LOG.warn("Purge instance failed, has alive executing task: {}", tasks);
                return false;
            }
            Tuple2 tuple = this.obtainRunState(tasks);
            if (tuple == null) {
                tuple = Tuple2.of((Object)RunState.CANCELED, (Object)new Date());
            } else {
                Assert.isTrue((boolean)((RunState)tuple.a).isTerminal(), () -> "Purge instance state must be terminal state: " + instance);
            }
            if (!TransactionUtils.isOneAffectedRow((int)this.instanceMapper.terminate(instanceId, ((RunState)tuple.a).value(), RUN_STATE_TERMINABLE, (Date)tuple.b))) {
                return false;
            }
            tasks.stream().filter(e -> EXECUTE_STATE_PAUSABLE.contains(e.getExecuteState())).forEach(e -> {
                String worker = ExecuteState.EXECUTING.equals(e.getExecuteState()) ? Strings.requireNonBlank((String)e.getWorker()) : null;
                this.taskMapper.terminate(e.getTaskId(), worker, ExecuteState.EXECUTE_TIMEOUT.value(), e.getExecuteState(), new Date(), null);
            });
            instance.markTerminated((RunState)tuple.a, (Date)tuple.b);
            this.afterTerminateTask((SchedInstance)instance);
            LOG.warn("Purge instance {} to state {}", (Object)instanceId, tuple.a);
            return true;
        });
    }

    public boolean pauseInstance(long instanceId) {
        LOG.info("Pause instance: {}", (Object)instanceId);
        Long wnstanceId = this.instanceMapper.getWnstanceId(instanceId);
        if (wnstanceId != null) {
            Assert.isTrue((instanceId == wnstanceId ? 1 : 0) != 0, () -> "Must pause lead workflow instance: " + instanceId);
        }
        return this.doTransactionLockInSynchronized(instanceId, wnstanceId, (SchedInstance instance) -> {
            Assert.notNull((Object)instance, () -> "Pause instance not found: " + instanceId);
            if (!RUN_STATE_PAUSABLE.contains(instance.getRunState())) {
                return false;
            }
            if (instance.isWorkflow()) {
                Assert.isTrue((boolean)instance.isWorkflowLead(), () -> "Cannot pause workflow node instance: " + instanceId);
                this.workflowMapper.update(instanceId, null, RunState.PAUSED.value(), null, RUN_STATE_WAITING, null);
                this.instanceMapper.findWorkflowNode(instanceId).stream().filter(e -> RUN_STATE_PAUSABLE.contains(e.getRunState())).forEach(this::pauseInstance);
                this.updateWorkflowLeadState((SchedInstance)instance);
            } else {
                this.pauseInstance((SchedInstance)instance);
            }
            return true;
        });
    }

    public boolean cancelInstance(long instanceId, Operations ops) {
        LOG.info("Cancel instance: {} | {}", (Object)instanceId, (Object)ops);
        Assert.isTrue((boolean)ops.toState().isFailure(), () -> "Cancel instance operation invalid: " + ops);
        Long wnstanceId = this.instanceMapper.getWnstanceId(instanceId);
        if (wnstanceId != null) {
            Assert.isTrue((instanceId == wnstanceId ? 1 : 0) != 0, () -> "Must pause lead workflow instance: " + instanceId);
        }
        return this.doTransactionLockInSynchronized(instanceId, wnstanceId, (SchedInstance instance) -> {
            Assert.notNull((Object)instance, () -> "Cancel instance not found: " + instanceId);
            if (RunState.of((Integer)instance.getRunState()).isTerminal()) {
                return false;
            }
            if (instance.isWorkflow()) {
                Assert.isTrue((boolean)instance.isWorkflowLead(), () -> "Cannot cancel workflow node instance: " + instanceId);
                this.workflowMapper.update(instanceId, null, RunState.CANCELED.value(), null, RUN_STATE_WAITING, null);
                this.instanceMapper.findWorkflowNode(instanceId).stream().filter(e -> !RunState.of((Integer)e.getRunState()).isTerminal()).forEach(e -> this.cancelInstance((SchedInstance)e, ops));
                this.updateWorkflowLeadState((SchedInstance)instance);
            } else {
                this.cancelInstance((SchedInstance)instance, ops);
            }
            return true;
        });
    }

    public boolean resumeInstance(long instanceId) {
        Long wnstanceId = this.instanceMapper.getWnstanceId(instanceId);
        return this.doTransactionLockInSynchronized(instanceId, wnstanceId, (SchedInstance instance) -> {
            Assert.notNull((Object)instance, () -> "Cancel failed, instance_id not found: " + instanceId);
            if (!RunState.PAUSED.equals(instance.getRunState())) {
                return false;
            }
            if (instance.isWorkflow()) {
                Assert.isTrue((boolean)instance.isWorkflowLead(), () -> "Cannot resume workflow node instance: " + instanceId);
                int row = this.instanceMapper.updateState(instanceId, RunState.RUNNING.value(), RunState.PAUSED.value());
                TransactionUtils.assertOneAffectedRow((int)row, () -> "Resume workflow lead instance failed: " + instanceId);
                this.workflowMapper.resumeWaiting(instanceId);
                for (SchedInstance nodeInstance : this.instanceMapper.findWorkflowNode(instanceId)) {
                    if (!RunState.PAUSED.equals(nodeInstance.getRunState())) continue;
                    this.resumeInstance(nodeInstance);
                    this.updateWorkflowEdgeState(nodeInstance, RunState.RUNNING.value(), RUN_STATE_PAUSED);
                }
                WorkflowGraph graph = new WorkflowGraph(this.workflowMapper.findByWnstanceId(wnstanceId));
                this.createWorkflowNode((SchedInstance)instance, graph, graph.map(), ExceptionUtils::rethrow);
            } else {
                this.resumeInstance((SchedInstance)instance);
            }
            return true;
        });
    }

    private void doTransactionLockInSynchronized(long instanceId, Long wnstanceId, Consumer<SchedInstance> action) {
        this.doTransactionLockInSynchronized(instanceId, wnstanceId, Functions.convert(action, (Object)Boolean.TRUE));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean doTransactionLockInSynchronized(long instanceId, Long wnstanceId, Function<SchedInstance, Boolean> action) {
        Long lockInstanceId = wnstanceId == null ? instanceId : wnstanceId;
        Long l = (Long)JobConstants.INSTANCE_LOCK_POOL.intern((Object)lockInstanceId);
        synchronized (l) {
            Boolean result = (Boolean)this.transactionTemplate.execute(status -> {
                SchedInstance lockedInstance = this.instanceMapper.lock(lockInstanceId);
                Assert.notNull((Object)lockedInstance, () -> "Lock instance not found: " + lockInstanceId);
                SchedInstance instance = instanceId == lockInstanceId ? lockedInstance : this.instanceMapper.get(instanceId);
                Assert.notNull((Object)instance, () -> "Instance not found: " + instance);
                if (!Objects.equals(instance.getWnstanceId(), wnstanceId)) {
                    throw new IllegalArgumentException("Invalid workflow instance id: " + wnstanceId + ", " + instance);
                }
                return (Boolean)action.apply(instance);
            });
            return Boolean.TRUE.equals(result);
        }
    }

    private Tuple2<RunState, Date> obtainRunState(List<SchedTask> tasks) {
        List states = tasks.stream().map(SchedTask::getExecuteState).map(ExecuteState::of).collect(Collectors.toList());
        if (states.stream().allMatch(ExecuteState::isTerminal)) {
            return Tuple2.of((Object)(states.stream().anyMatch(ExecuteState::isFailure) ? RunState.CANCELED : RunState.FINISHED), (Object)tasks.stream().map(SchedTask::getExecuteEndTime).filter(Objects::nonNull).max(Comparator.naturalOrder()).orElseGet(Date::new));
        }
        return states.stream().anyMatch(ExecuteState.PAUSABLE_LIST::contains) ? null : Tuple2.of((Object)RunState.PAUSED, null);
    }

    private void createInstance(TriggerInstance tInstance) {
        this.instanceMapper.insert(tInstance.getInstance());
        if (tInstance instanceof GeneralInstanceCreator.GeneralInstance) {
            GeneralInstanceCreator.GeneralInstance creator = (GeneralInstanceCreator.GeneralInstance)tInstance;
            Collects.batchProcess(creator.getTasks(), this.taskMapper::batchInsert, (int)200);
        } else if (tInstance instanceof WorkflowInstanceCreator.WorkflowInstance) {
            WorkflowInstanceCreator.WorkflowInstance creator = (WorkflowInstanceCreator.WorkflowInstance)tInstance;
            Collects.batchProcess(creator.getWorkflows(), this.workflowMapper::batchInsert, (int)200);
            for (Tuple2<SchedInstance, List<SchedTask>> sub : creator.getNodeInstances()) {
                this.instanceMapper.insert((SchedInstance)sub.a);
                Collects.batchProcess((List)((List)sub.b), this.taskMapper::batchInsert, (int)200);
            }
        } else {
            throw new UnsupportedOperationException("Unknown instance creator type: " + tInstance.getClass());
        }
    }

    private void pauseInstance(SchedInstance instance) {
        Assert.isTrue((boolean)RUN_STATE_PAUSABLE.contains(instance.getRunState()), () -> "Invalid pause instance state: " + instance);
        long instanceId = instance.getInstanceId();
        Operations ops = Operations.PAUSE;
        this.taskMapper.updateStateByInstanceId(instanceId, ops.toState().value(), EXECUTE_STATE_WAITING, null);
        List<ExecuteTaskParam> executingTasks = this.loadExecutingTasks(instance, ops);
        if (executingTasks.isEmpty()) {
            Tuple2<RunState, Date> tuple = this.obtainRunState(this.taskMapper.findBaseByInstanceId(instanceId));
            Assert.notNull(tuple, () -> "Pause instance failed: " + instanceId);
            int row = this.instanceMapper.terminate(instanceId, ((RunState)tuple.a).value(), RUN_STATE_PAUSABLE, (Date)tuple.b);
            TransactionUtils.assertOneAffectedRow((int)row, () -> "Pause instance failed: " + instance + " | " + tuple.a);
            if (instance.isWorkflowNode()) {
                this.updateWorkflowEdgeState(instance, ((RunState)tuple.a).value(), RUN_STATE_PAUSABLE);
            }
        } else {
            TransactionUtils.doAfterTransactionCommit(() -> super.dispatch(executingTasks));
        }
    }

    private void cancelInstance(SchedInstance instance, Operations ops) {
        long instanceId = instance.getInstanceId();
        this.taskMapper.updateStateByInstanceId(instanceId, ops.toState().value(), EXECUTE_STATE_EXECUTABLE, new Date());
        List<ExecuteTaskParam> executingTasks = this.loadExecutingTasks(instance, ops);
        if (executingTasks.isEmpty()) {
            Tuple2 tuple = this.obtainRunState(this.taskMapper.findBaseByInstanceId(instanceId));
            Assert.notNull(tuple, () -> "Cancel instance failed: " + instanceId);
            if (tuple.a == RunState.PAUSED) {
                tuple = Tuple2.of((Object)RunState.CANCELED, (Object)new Date());
            }
            RunState toState = (RunState)tuple.a;
            int row = this.instanceMapper.terminate(instanceId, toState.value(), RUN_STATE_TERMINABLE, (Date)tuple.b);
            TransactionUtils.assertOneAffectedRow((int)row, () -> "Cancel instance failed: " + instance + " | " + toState);
            if (instance.isWorkflowNode()) {
                this.updateWorkflowEdgeState(instance, ((RunState)tuple.a).value(), RUN_STATE_TERMINABLE);
            }
        } else {
            TransactionUtils.doAfterTransactionCommit(() -> super.dispatch(executingTasks));
        }
    }

    private void resumeInstance(SchedInstance instance) {
        long instanceId = instance.getInstanceId();
        int row = this.instanceMapper.updateState(instanceId, RunState.WAITING.value(), RunState.PAUSED.value());
        TransactionUtils.assertOneAffectedRow((int)row, (String)"Resume sched instance failed.");
        row = this.taskMapper.updateStateByInstanceId(instanceId, ExecuteState.WAITING.value(), EXECUTE_STATE_PAUSED, null);
        TransactionUtils.assertManyAffectedRow((int)row, (String)"Resume sched task failed.");
        Tuple3<SchedJob, SchedInstance, List<SchedTask>> param = this.buildDispatchParam(instanceId, row);
        TransactionUtils.doAfterTransactionCommit(() -> super.dispatch((SchedJob)param.a, (SchedInstance)param.b, (List)param.c));
    }

    private void updateWorkflowLeadState(SchedInstance instance) {
        Assert.isTrue((boolean)instance.isWorkflowLead(), () -> "Must terminate workflow lead instance: " + instance);
        long wnstanceId = instance.getWnstanceId();
        List<SchedWorkflow> workflows = this.workflowMapper.findByWnstanceId(wnstanceId);
        WorkflowGraph graph = new WorkflowGraph(workflows);
        this.updateWorkflowEndState(graph);
        if (graph.allMatch(e -> ((SchedWorkflow)e.getValue()).isTerminal())) {
            RunState state = graph.anyMatch(e -> ((SchedWorkflow)e.getValue()).isFailure()) ? RunState.CANCELED : RunState.FINISHED;
            int row = this.instanceMapper.terminate(instance.getWnstanceId(), state.value(), RUN_STATE_TERMINABLE, new Date());
            TransactionUtils.assertOneAffectedRow((int)row, () -> "Update workflow lead instance state failed: " + instance + " | " + state);
        } else if (workflows.stream().noneMatch(e -> RunState.RUNNING.equals(e.getRunState()))) {
            RunState state = RunState.PAUSED;
            int row = this.instanceMapper.updateState(instance.getWnstanceId(), state.value(), instance.getRunState());
            TransactionUtils.assertOneAffectedRow((int)row, () -> "Update workflow lead instance state failed: " + instance + " | " + state);
        }
    }

    private void updateWorkflowEdgeState(SchedInstance instance, Integer toState, List<Integer> fromStates) {
        String curNode = instance.parseAttach().getCurNode();
        int row = this.workflowMapper.update(instance.getWnstanceId(), curNode, toState, null, fromStates, instance.getInstanceId());
        Assert.isTrue((row > 0 ? 1 : 0) != 0, () -> "Update workflow state failed: " + instance + " | " + toState);
    }

    private void updateWorkflowEndState(WorkflowGraph graph) {
        Map<DAGEdge, SchedWorkflow> ends;
        long wnstanceId = ((SchedWorkflow)Collects.getFirst(graph.map().values())).getWnstanceId();
        if (graph.anyMatch(e -> ((DAGEdge)e.getKey()).getTarget().isEnd() && !((SchedWorkflow)e.getValue()).isTerminal()) && (ends = graph.predecessors(DAGNode.END)).values().stream().allMatch(SchedWorkflow::isTerminal)) {
            RunState endState = ends.values().stream().anyMatch(SchedWorkflow::isFailure) ? RunState.CANCELED : RunState.FINISHED;
            int row = this.workflowMapper.update(wnstanceId, DAGNode.END.toString(), endState.value(), null, RUN_STATE_TERMINABLE, null);
            Assert.isTrue((row > 0 ? 1 : 0) != 0, () -> "Update workflow end node failed: " + wnstanceId + " | " + endState);
            ends.forEach((k, v) -> graph.get(k.getTarget(), DAGNode.END).setRunState(Integer.valueOf(endState.value())));
        }
    }

    private void createWorkflowNode(SchedInstance leadInstance, WorkflowGraph graph, Map<DAGEdge, SchedWorkflow> map, Function<Throwable, Boolean> failHandler) {
        SchedJob job = (SchedJob)LazyLoader.of(SchedJob.class, this.jobMapper::get, (Object)leadInstance.getJobId());
        long wnstanceId = leadInstance.getWnstanceId();
        Date now = new Date();
        HashSet<DAGNode> duplicates = new HashSet<DAGNode>();
        for (Map.Entry<DAGEdge, SchedWorkflow> entry : map.entrySet()) {
            Map<DAGEdge, SchedWorkflow> predecessors;
            DAGNode target = entry.getKey().getTarget();
            SchedWorkflow workflow = entry.getValue();
            if (target.isEnd() || !RunState.WAITING.equals(workflow.getRunState()) || !duplicates.add(target) || (predecessors = graph.predecessors(target)).values().stream().anyMatch(e -> !RunState.of((Integer)e.getRunState()).isTerminal())) continue;
            if (predecessors.values().stream().anyMatch(e -> RunState.of((Integer)e.getRunState()).isFailure())) {
                RunState state = RunState.CANCELED;
                int row = this.workflowMapper.update(wnstanceId, workflow.getCurNode(), state.value(), null, RUN_STATE_TERMINABLE, null);
                Assert.isTrue((row > 0 ? 1 : 0) != 0, () -> "Update workflow cur node state failed: " + workflow + " | " + state);
                continue;
            }
            try {
                long nextInstanceId = this.generateId();
                List<SchedTask> tasks = this.splitTasks(JobHandlerParam.from((SchedJob)job, (String)target.getName()), nextInstanceId, new Date());
                long triggerTime = leadInstance.getTriggerTime() + (long)workflow.getSequence().intValue();
                SchedInstance nextInstance = SchedInstance.create((long)nextInstanceId, (long)job.getJobId(), (RunType)RunType.of((Integer)leadInstance.getRunType()), (long)triggerTime, (int)0, (Date)now);
                nextInstance.setRnstanceId(Long.valueOf(wnstanceId));
                nextInstance.setPnstanceId(predecessors.isEmpty() ? null : ((SchedWorkflow)Collects.getFirst(predecessors.values())).getInstanceId());
                nextInstance.setWnstanceId(Long.valueOf(wnstanceId));
                nextInstance.setAttach(Jsons.toJson((Object)new InstanceAttach(workflow.getCurNode())));
                int row = this.workflowMapper.update(wnstanceId, workflow.getCurNode(), RunState.RUNNING.value(), nextInstanceId, RUN_STATE_WAITING, null);
                Assert.isTrue((row > 0 ? 1 : 0) != 0, () -> "Start workflow node failed: " + workflow);
                this.instanceMapper.insert(nextInstance);
                Collects.batchProcess(tasks, this.taskMapper::batchInsert, (int)200);
                TransactionUtils.doAfterTransactionCommit(() -> super.dispatch(job, nextInstance, tasks));
            }
            catch (Throwable t) {
                Boolean result = failHandler.apply(t);
                if (!Boolean.FALSE.equals(result)) continue;
                return;
            }
        }
    }

    private void afterTerminateTask(SchedInstance instance) {
        RunState runState = RunState.of((Integer)instance.getRunState());
        LazyLoader lazyJob = LazyLoader.of(this.jobMapper::get, (Object)instance.getJobId());
        if (runState == RunState.CANCELED) {
            this.retryJob(instance, (LazyLoader<SchedJob>)lazyJob);
        } else if (runState == RunState.FINISHED) {
            if (instance.isWorkflowNode()) {
                this.processWorkflow(instance);
            } else {
                this.dependJob(instance);
            }
        } else {
            throw new IllegalStateException("Unknown terminate run state " + runState);
        }
        this.updateFixedDelayNextTriggerTime(instance, (LazyLoader<SchedJob>)lazyJob);
    }

    private void updateFixedDelayNextTriggerTime(SchedInstance curr, LazyLoader<SchedJob> lazyJob) {
        SchedInstance root;
        if (curr.isWorkflowNode()) {
            return;
        }
        long rnstanceId = curr.obtainRnstanceId();
        SchedInstance schedInstance = root = rnstanceId == curr.getInstanceId() ? curr : this.instanceMapper.get(rnstanceId);
        if (!root.getJobId().equals(curr.getJobId()) || !RunType.SCHEDULE.equals(root.getRunType())) {
            return;
        }
        SchedJob job = (SchedJob)lazyJob.orElse(null);
        if (job == null || job.retryable(RunState.of((Integer)curr.getRunState()), curr.obtainRetriedCount())) {
            return;
        }
        super.updateFixedDelayNextTriggerTime(job, curr.getRunEndTime());
    }

    private void processWorkflow(SchedInstance nodeInstance) {
        if (!nodeInstance.isWorkflowNode()) {
            return;
        }
        RunState runState = RunState.of((Integer)nodeInstance.getRunState());
        Long wnstanceId = nodeInstance.getWnstanceId();
        this.updateWorkflowEdgeState(nodeInstance, runState.value(), RUN_STATE_TERMINABLE);
        if (runState == RunState.CANCELED) {
            this.workflowMapper.update(wnstanceId, null, RunState.CANCELED.value(), null, RUN_STATE_RUNNABLE, null);
        }
        WorkflowGraph graph = new WorkflowGraph(this.workflowMapper.findByWnstanceId(wnstanceId));
        this.updateWorkflowEndState(graph);
        if (graph.allMatch(e -> ((SchedWorkflow)e.getValue()).isTerminal())) {
            RunState state = graph.anyMatch(e -> ((SchedWorkflow)e.getValue()).isFailure()) ? RunState.CANCELED : RunState.FINISHED;
            int row = this.instanceMapper.terminate(wnstanceId, state.value(), RUN_STATE_TERMINABLE, new Date());
            TransactionUtils.assertOneAffectedRow((int)row, () -> "Terminate workflow lead instance failed: " + nodeInstance + " | " + state);
            this.afterTerminateTask(this.instanceMapper.get(wnstanceId));
            return;
        }
        if (runState == RunState.CANCELED) {
            return;
        }
        this.createWorkflowNode(this.instanceMapper.get(wnstanceId), graph, graph.successors(DAGNode.fromString((String)nodeInstance.parseAttach().getCurNode())), throwable -> {
            LOG.error("Split workflow job task error: " + nodeInstance, throwable);
            this.onCreateWorkflowNodeFailed(nodeInstance.getWnstanceId());
            return false;
        });
    }

    private void onCreateWorkflowNodeFailed(Long wnstanceId) {
        Integer canceled = RunState.CANCELED.value();
        this.workflowMapper.update(wnstanceId, null, canceled, null, RUN_STATE_RUNNABLE, null);
        WorkflowGraph graph = new WorkflowGraph(this.workflowMapper.findByWnstanceId(wnstanceId));
        this.updateWorkflowEndState(graph);
        Assert.state((boolean)graph.allMatch(e -> ((SchedWorkflow)e.getValue()).isTerminal()), (String)"Workflow not all terminal.");
        int row = this.instanceMapper.terminate(wnstanceId, canceled, RUN_STATE_TERMINABLE, new Date());
        TransactionUtils.assertOneAffectedRow((int)row, () -> "Cancel workflow failed: " + wnstanceId);
        this.afterTerminateTask(this.instanceMapper.get(wnstanceId));
    }

    private void retryJob(SchedInstance prev, LazyLoader<SchedJob> lazyJob) {
        List<Object> tasks;
        SchedJob schedJob = (SchedJob)lazyJob.orElseGet(() -> {
            LOG.error("Sched job not found {}", (Object)prev.getJobId());
            return null;
        });
        int retriedCount = prev.obtainRetriedCount();
        if (schedJob == null || !schedJob.retryable(RunState.of((Integer)prev.getRunState()), retriedCount)) {
            this.processWorkflow(prev);
            return;
        }
        long retryInstanceId = this.generateId();
        if (prev.isWorkflowNode()) {
            String curNode = prev.parseAttach().getCurNode();
            int row = this.workflowMapper.update(prev.getWnstanceId(), curNode, null, retryInstanceId, RUN_STATE_RUNNING, prev.getInstanceId());
            Assert.isTrue((row > 0 ? 1 : 0) != 0, () -> "Retry workflow node failed: " + prev);
        }
        Date now = new Date();
        long triggerTime = schedJob.computeRetryTriggerTime(++retriedCount, now);
        SchedInstance retryInstance = SchedInstance.create((long)retryInstanceId, (long)schedJob.getJobId(), (RunType)RunType.RETRY, (long)triggerTime, (int)retriedCount, (Date)now);
        retryInstance.setRnstanceId(Long.valueOf(prev.obtainRnstanceId()));
        retryInstance.setPnstanceId(prev.getInstanceId());
        retryInstance.setWnstanceId(prev.getWnstanceId());
        retryInstance.setAttach(prev.getAttach());
        RetryType retryType = RetryType.of((Integer)schedJob.getRetryType());
        if (retryType == RetryType.ALL) {
            try {
                tasks = this.splitTasks(JobHandlerParam.from((SchedJob)schedJob), retryInstance.getInstanceId(), now);
            }
            catch (Throwable t) {
                LOG.error("Split retry job error: " + schedJob + ", " + prev, t);
                this.processWorkflow(prev);
                return;
            }
        } else if (retryType == RetryType.FAILED) {
            tasks = this.taskMapper.findLargeByInstanceId(prev.getInstanceId()).stream().filter(e -> ExecuteState.of((Integer)e.getExecuteState()).isFailure()).filter(e -> !RouteStrategy.BROADCAST.equals(schedJob.getRouteStrategy()) || super.isAliveWorker(e.getWorker())).map(e -> SchedTask.create((String)e.getTaskParam(), (long)this.generateId(), (long)retryInstanceId, (int)e.getTaskNo(), (int)e.getTaskCount(), (Date)now, (String)e.getWorker())).collect(Collectors.toList());
        } else {
            throw new IllegalArgumentException("Unknown job retry type: " + schedJob.getJobId() + ", " + retryType);
        }
        Assert.notEmpty(tasks, (String)"Insert list of task cannot be empty.");
        this.instanceMapper.insert(retryInstance);
        Collects.batchProcess(tasks, this.taskMapper::batchInsert, (int)200);
        TransactionUtils.doAfterTransactionCommit(() -> super.dispatch(schedJob, retryInstance, tasks));
    }

    private void dependJob(SchedInstance parentInstance) {
        List<SchedDepend> schedDepends = this.dependMapper.findByParentJobId(parentInstance.getJobId());
        if (CollectionUtils.isEmpty(schedDepends)) {
            return;
        }
        for (SchedDepend depend : schedDepends) {
            Runnable dispatchAction;
            SchedJob childJob = this.jobMapper.get(depend.getChildJobId());
            if (childJob == null) {
                LOG.error("Child sched job not found: {} | {}", (Object)depend.getParentJobId(), (Object)depend.getChildJobId());
                continue;
            }
            if (JobState.DISABLE.equals(childJob.getJobState()) || (dispatchAction = (Runnable)TransactionUtils.doInNestedTransaction((PlatformTransactionManager)Objects.requireNonNull(this.transactionTemplate.getTransactionManager()), () -> {
                TriggerInstanceCreator<?> creator = TriggerInstanceCreator.of(childJob.getJobType(), this);
                long triggerTime = parentInstance.getTriggerTime() / 1000L * 1000L + (long)depend.getSequence().intValue();
                Object tInstance = creator.create(childJob, RunType.DEPEND, triggerTime);
                ((TriggerInstance)tInstance).getInstance().setRnstanceId(Long.valueOf(parentInstance.obtainRnstanceId()));
                ((TriggerInstance)tInstance).getInstance().setPnstanceId(parentInstance.getInstanceId());
                this.createInstance((TriggerInstance)tInstance);
                return () -> creator.dispatch(childJob, tInstance);
            }, t -> LOG.error("Depend job instance created fail: " + parentInstance + " | " + childJob, t))) == null) continue;
            TransactionUtils.doAfterTransactionCommit((Runnable)dispatchAction);
        }
    }

    private List<ExecuteTaskParam> loadExecutingTasks(SchedInstance instance, Operations ops) {
        ArrayList<ExecuteTaskParam> executingTasks = new ArrayList<ExecuteTaskParam>();
        SchedJob schedJob = (SchedJob)LazyLoader.of(SchedJob.class, this.jobMapper::get, (Object)instance.getJobId());
        String supervisorToken = SchedGroupService.mapGroup(schedJob.getGroup()).getSupervisorToken();
        ExecuteTaskParam.Builder builder = ExecuteTaskParam.builder((SchedInstance)instance, (SchedJob)schedJob, (String)supervisorToken);
        long triggerTime = 0L;
        for (SchedTask task : this.taskMapper.findBaseByInstanceId(instance.getInstanceId())) {
            if (!ExecuteState.EXECUTING.equals(task.getExecuteState())) continue;
            Worker worker = Worker.deserialize((String)task.getWorker());
            if (super.isAliveWorker(worker)) {
                executingTasks.add(builder.build(ops, task.getTaskId().longValue(), triggerTime, worker));
                continue;
            }
            Date executeEndTime = ops.toState().isTerminal() ? new Date() : null;
            int row = this.taskMapper.terminate(task.getTaskId(), task.getWorker(), ops.toState().value(), ExecuteState.EXECUTING.value(), executeEndTime, null);
            if (!TransactionUtils.isOneAffectedRow((int)row)) {
                LOG.error("Cancel the dead task failed: {}", (Object)task);
                executingTasks.add(builder.build(ops, task.getTaskId().longValue(), triggerTime, worker));
                continue;
            }
            LOG.info("Cancel the dead task success: {}", (Object)task);
        }
        return executingTasks;
    }

    private Tuple3<SchedJob, SchedInstance, List<SchedTask>> buildDispatchParam(long instanceId, int expectTaskSize) {
        SchedInstance instance = this.instanceMapper.get(instanceId);
        SchedJob job = this.jobMapper.get(instance.getJobId());
        Assert.notNull((Object)job, (String)("Not found job: " + instance.getJobId()));
        List waitingTasks = this.taskMapper.findLargeByInstanceId(instanceId).stream().filter(e -> ExecuteState.WAITING.equals(e.getExecuteState())).collect(Collectors.toList());
        if (waitingTasks.size() != expectTaskSize) {
            throw new IllegalStateException("Invalid dispatching tasks size: expect=" + expectTaskSize + ", actual=" + waitingTasks.size());
        }
        return Tuple3.of((Object)job, (Object)instance, waitingTasks);
    }
}

