/*
 * Decompiled with CFR 0.152.
 */
package cn.ponfee.disjob.supervisor.manager;

import cn.ponfee.disjob.common.base.IdGenerator;
import cn.ponfee.disjob.common.base.LazyLoader;
import cn.ponfee.disjob.common.dag.DAGEdge;
import cn.ponfee.disjob.common.dag.DAGNode;
import cn.ponfee.disjob.common.spring.TransactionUtils;
import cn.ponfee.disjob.common.tuple.Tuple2;
import cn.ponfee.disjob.common.tuple.Tuple3;
import cn.ponfee.disjob.common.util.Collects;
import cn.ponfee.disjob.common.util.Jsons;
import cn.ponfee.disjob.core.base.Worker;
import cn.ponfee.disjob.core.dag.WorkflowGraph;
import cn.ponfee.disjob.core.enums.ExecuteState;
import cn.ponfee.disjob.core.enums.JobState;
import cn.ponfee.disjob.core.enums.Operations;
import cn.ponfee.disjob.core.enums.RetryType;
import cn.ponfee.disjob.core.enums.RouteStrategy;
import cn.ponfee.disjob.core.enums.RunState;
import cn.ponfee.disjob.core.enums.RunType;
import cn.ponfee.disjob.core.enums.TriggerType;
import cn.ponfee.disjob.core.exception.JobException;
import cn.ponfee.disjob.core.model.InstanceAttach;
import cn.ponfee.disjob.core.model.SchedDepend;
import cn.ponfee.disjob.core.model.SchedInstance;
import cn.ponfee.disjob.core.model.SchedJob;
import cn.ponfee.disjob.core.model.SchedTask;
import cn.ponfee.disjob.core.model.SchedWorkflow;
import cn.ponfee.disjob.core.param.ExecuteTaskParam;
import cn.ponfee.disjob.core.param.ExecuteTaskParamBuilder;
import cn.ponfee.disjob.core.param.JobHandlerParam;
import cn.ponfee.disjob.core.param.StartTaskParam;
import cn.ponfee.disjob.core.param.TaskWorkerParam;
import cn.ponfee.disjob.core.param.TerminateTaskParam;
import cn.ponfee.disjob.dispatch.TaskDispatcher;
import cn.ponfee.disjob.registry.SupervisorRegistry;
import cn.ponfee.disjob.supervisor.base.WorkerServiceClient;
import cn.ponfee.disjob.supervisor.dao.mapper.SchedDependMapper;
import cn.ponfee.disjob.supervisor.dao.mapper.SchedInstanceMapper;
import cn.ponfee.disjob.supervisor.dao.mapper.SchedJobMapper;
import cn.ponfee.disjob.supervisor.dao.mapper.SchedTaskMapper;
import cn.ponfee.disjob.supervisor.dao.mapper.SchedWorkflowMapper;
import cn.ponfee.disjob.supervisor.instance.NormalInstanceCreator;
import cn.ponfee.disjob.supervisor.instance.TriggerInstance;
import cn.ponfee.disjob.supervisor.instance.TriggerInstanceCreator;
import cn.ponfee.disjob.supervisor.instance.WorkflowInstanceCreator;
import cn.ponfee.disjob.supervisor.manager.AbstractJobManager;
import com.google.common.base.Joiner;
import com.google.common.collect.Interner;
import com.google.common.collect.Interners;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.Assert;

@Component
public class DistributedJobManager
extends AbstractJobManager {
    private static final String TX_MANAGER_NAME = "disjobTransactionManager";
    private static final int AFFECTED_ONE_ROW = 1;
    private static final Interner<Long> INTERNER_POOL = Interners.newWeakInterner();
    private static final List<Integer> RUN_STATE_TERMINABLE = Collects.convert((List)RunState.TERMINABLE_LIST, RunState::value);
    private static final List<Integer> RUN_STATE_RUNNABLE = Collects.convert((List)RunState.RUNNABLE_LIST, RunState::value);
    private static final List<Integer> RUN_STATE_PAUSABLE = Collects.convert((List)RunState.PAUSABLE_LIST, RunState::value);
    private static final List<Integer> RUN_STATE_WAITING = Collections.singletonList(RunState.WAITING.value());
    private static final List<Integer> RUN_STATE_RUNNING = Collections.singletonList(RunState.RUNNING.value());
    private static final List<Integer> RUN_STATE_PAUSED = Collections.singletonList(RunState.PAUSED.value());
    private static final List<Integer> EXECUTE_STATE_EXECUTABLE = Collects.convert((List)ExecuteState.EXECUTABLE_LIST, ExecuteState::value);
    private static final List<Integer> EXECUTE_STATE_PAUSABLE = Collects.convert((List)ExecuteState.PAUSABLE_LIST, ExecuteState::value);
    private static final List<Integer> EXECUTE_STATE_WAITING = Collections.singletonList(ExecuteState.WAITING.value());
    private static final List<Integer> EXECUTE_STATE_PAUSED = Collections.singletonList(ExecuteState.PAUSED.value());
    private final TransactionTemplate transactionTemplate;
    private final SchedJobMapper jobMapper;
    private final SchedInstanceMapper instanceMapper;
    private final SchedTaskMapper taskMapper;
    private final SchedDependMapper dependMapper;
    private final SchedWorkflowMapper workflowMapper;

    public DistributedJobManager(IdGenerator idGenerator, SupervisorRegistry discoveryWorker, TaskDispatcher taskDispatcher, WorkerServiceClient workerServiceClient, @Qualifier(value="disjobTransactionTemplate") TransactionTemplate transactionTemplate, SchedJobMapper jobMapper, SchedInstanceMapper instanceMapper, SchedTaskMapper taskMapper, SchedDependMapper dependMapper, SchedWorkflowMapper workflowMapper) {
        super(idGenerator, discoveryWorker, taskDispatcher, workerServiceClient);
        this.transactionTemplate = transactionTemplate;
        this.jobMapper = jobMapper;
        this.instanceMapper = instanceMapper;
        this.taskMapper = taskMapper;
        this.dependMapper = dependMapper;
        this.workflowMapper = workflowMapper;
    }

    public SchedJob getJob(long jobId) {
        return this.jobMapper.getByJobId(jobId);
    }

    public SchedInstance getInstance(long instanceId) {
        return this.instanceMapper.getByInstanceId(instanceId);
    }

    public SchedInstance getInstance(long jobId, long triggerTime, int runType) {
        return this.instanceMapper.getByJobIdAndTriggerTimeAndRunType(jobId, triggerTime, runType);
    }

    public List<SchedJob> findBeTriggeringJob(long maxNextTriggerTime, int size) {
        return this.jobMapper.findBeTriggering(maxNextTriggerTime, size);
    }

    public List<SchedInstance> findExpireWaitingInstance(Date expireTime, int size) {
        return this.instanceMapper.findExpireState(RunState.WAITING.value(), expireTime.getTime(), expireTime, size);
    }

    public List<SchedInstance> findExpireRunningInstance(Date expireTime, int size) {
        return this.instanceMapper.findExpireState(RunState.RUNNING.value(), expireTime.getTime(), expireTime, size);
    }

    public List<SchedInstance> findUnterminatedRetryInstance(long rnstanceId) {
        return this.instanceMapper.findUnterminatedRetry(rnstanceId);
    }

    public List<SchedTask> findBaseInstanceTasks(long instanceId) {
        return this.taskMapper.findBaseByInstanceId(instanceId);
    }

    public boolean stopJob(SchedJob job) {
        return this.jobMapper.stop(job) == 1;
    }

    public boolean changeJobState(long jobId, JobState to) {
        return this.jobMapper.updateState(jobId, to.value(), 1 ^ to.value()) == 1;
    }

    public boolean updateJobNextTriggerTime(SchedJob job) {
        return this.jobMapper.updateNextTriggerTime(job) == 1;
    }

    public boolean updateJobNextScanTime(SchedJob schedJob) {
        return this.jobMapper.updateNextScanTime(schedJob) == 1;
    }

    public boolean renewInstanceUpdateTime(SchedInstance instance, Date updateTime) {
        return this.instanceMapper.renewUpdateTime(instance.getInstanceId(), updateTime, instance.getVersion()) == 1;
    }

    @Override
    protected boolean cancelWaitingTask(long taskId) {
        return this.taskMapper.terminate(taskId, ExecuteState.WAITING_CANCELED.value(), ExecuteState.WAITING.value(), null, null) == 1;
    }

    public boolean checkpoint(long taskId, String executeSnapshot) {
        return this.taskMapper.checkpoint(taskId, executeSnapshot) == 1;
    }

    @Transactional(transactionManager="disjobTransactionManager", rollbackFor={Exception.class})
    public void addJob(SchedJob job) throws JobException {
        job.setUpdatedBy(job.getCreatedBy());
        job.verifyBeforeAdd();
        job.checkAndDefaultSetting();
        super.verifyJob(job);
        job.setJobId(Long.valueOf(this.generateId()));
        this.parseTriggerConfig(job);
        this.jobMapper.insert(job);
    }

    @Transactional(transactionManager="disjobTransactionManager", rollbackFor={Exception.class})
    public void updateJob(SchedJob job) throws JobException {
        job.verifyBeforeUpdate();
        job.checkAndDefaultSetting();
        if (StringUtils.isEmpty((CharSequence)job.getJobHandler())) {
            Assert.hasText((String)job.getJobParam(), (String)"Job param must be null if not set job handler.");
        } else {
            super.verifyJob(job);
        }
        SchedJob dbJob = this.jobMapper.getByJobId(job.getJobId());
        Assert.notNull((Object)dbJob, () -> "Sched job id not found " + job.getJobId());
        job.setNextTriggerTime(dbJob.getNextTriggerTime());
        if (job.getTriggerType() == null) {
            Assert.isNull((Object)job.getTriggerValue(), (String)"Trigger value must be null if not set trigger type.");
        } else {
            Assert.notNull((Object)job.getTriggerValue(), (String)"Trigger value cannot be null if has set trigger type.");
            this.dependMapper.deleteByChildJobId(job.getJobId());
            this.parseTriggerConfig(job);
        }
        job.setUpdatedAt(new Date());
        Assert.state((this.jobMapper.updateByJobId(job) == 1 ? 1 : 0) != 0, (String)"Update sched job fail or conflict.");
    }

    @Transactional(transactionManager="disjobTransactionManager", rollbackFor={Exception.class})
    public void deleteJob(long jobId) {
        SchedJob job = this.jobMapper.getByJobId(jobId);
        Assert.notNull((Object)job, (String)("Job id not found: " + jobId));
        if (JobState.ENABLE.equals(job.getJobState())) {
            throw new IllegalStateException("Please disable job before delete this job.");
        }
        Assert.isTrue((this.jobMapper.deleteByJobId(jobId) == 1 ? 1 : 0) != 0, (String)"Delete sched job fail or conflict.");
        this.dependMapper.deleteByParentJobId(jobId);
        this.dependMapper.deleteByChildJobId(jobId);
    }

    @Transactional(transactionManager="disjobTransactionManager", rollbackFor={Exception.class})
    public void triggerJob(long jobId) throws JobException {
        SchedJob job = this.jobMapper.getByJobId(jobId);
        Assert.notNull((Object)job, () -> "Sched job not found: " + jobId);
        TriggerInstanceCreator<?> creator = TriggerInstanceCreator.of(job.getJobType(), this);
        Object tInstance = creator.create(job, RunType.MANUAL, System.currentTimeMillis());
        this.createInstance((TriggerInstance)tInstance);
        TransactionUtils.doAfterTransactionCommit(() -> creator.dispatch(job, tInstance));
    }

    @Transactional(transactionManager="disjobTransactionManager", rollbackFor={Exception.class})
    public boolean createInstance(SchedJob job, TriggerInstance triggerInstance) {
        if (this.jobMapper.updateNextTriggerTime(job) == 0) {
            return false;
        }
        this.createInstance(triggerInstance);
        return true;
    }

    @Transactional(transactionManager="disjobTransactionManager", rollbackFor={Exception.class})
    public void updateTaskWorker(List<TaskWorkerParam> params) {
        if (CollectionUtils.isNotEmpty(params)) {
            params.sort(Comparator.comparing(TaskWorkerParam::getTaskId));
            Collects.batchProcess(params, this.taskMapper::batchUpdateWorker, (int)200);
        }
    }

    @Transactional(transactionManager="disjobTransactionManager", rollbackFor={Exception.class})
    public boolean startTask(StartTaskParam param) {
        SchedInstance instance = this.instanceMapper.getByInstanceId(param.getInstanceId());
        Assert.notNull((Object)instance, () -> "Sched instance not found: " + param);
        if (!RUN_STATE_PAUSABLE.contains(instance.getRunState())) {
            return false;
        }
        Date now = new Date();
        int row = 0;
        if (RunState.WAITING.equals(instance.getRunState())) {
            row = this.instanceMapper.start(param.getInstanceId(), now);
        }
        if (this.taskMapper.start(param.getTaskId(), param.getWorker(), now) == 0) {
            Assert.state((row == 0 ? 1 : 0) != 0, () -> "Start task failed: " + param);
            return false;
        }
        return true;
    }

    public void changeInstanceState(long instanceId, ExecuteState toExecuteState) {
        RunState toRunState = toExecuteState.runState();
        Assert.isTrue((toExecuteState != ExecuteState.EXECUTING ? 1 : 0) != 0, (String)"Cannot force update state to EXECUTING");
        this.doTransactionLockInSynchronized(instanceId, null, (SchedInstance instance) -> {
            Assert.notNull((Object)instance, () -> "Sched instance not found: " + instanceId);
            Assert.isTrue((!instance.isWorkflow() ? 1 : 0) != 0, () -> "Unsupported force change workflow instance state: " + instanceId);
            int instRow = this.instanceMapper.changeState(instanceId, toRunState.value());
            int taskRow = this.taskMapper.changeState(instanceId, toExecuteState.value());
            if (instRow == 0 && taskRow == 0) {
                throw new IllegalStateException("Force update instance state failed: " + instanceId);
            }
            if (toExecuteState == ExecuteState.WAITING) {
                Tuple3<SchedJob, SchedInstance, List<SchedTask>> params = this.buildDispatchParams(instanceId, taskRow);
                TransactionUtils.doAfterTransactionCommit(() -> super.dispatch((SchedJob)params.a, (SchedInstance)params.b, (List)params.c));
            }
            this.log.info("Force change state success {} | {}", (Object)instanceId, (Object)toExecuteState);
        });
    }

    public void deleteInstance(long instanceId) {
        Long wnstanceId = this.instanceMapper.getWnstanceId(instanceId);
        this.doTransactionLockInSynchronized(instanceId, wnstanceId, (SchedInstance instance) -> {
            Assert.notNull((Object)instance, () -> "Sched instance not found: " + instanceId);
            Assert.isTrue((boolean)RunState.of((Integer)instance.getRunState()).isTerminal(), () -> "Deleting instance must be terminal: " + instance);
            if (instance.isWorkflow()) {
                Assert.isTrue((boolean)instance.isWorkflowLead(), () -> "Cannot delete workflow node instance: " + instanceId);
                int row = this.instanceMapper.deleteByInstanceId(instanceId);
                Assert.isTrue((row == 1 ? 1 : 0) != 0, () -> "Delete workflow lead instance conflict: " + instanceId);
                for (SchedInstance e : this.instanceMapper.findWorkflowNode(instance.getWnstanceId())) {
                    row = this.taskMapper.deleteByInstanceId(e.getInstanceId());
                    Assert.isTrue((row >= 1 ? 1 : 0) != 0, () -> "Delete sched task conflict: " + instanceId);
                }
                row = this.instanceMapper.deleteByWnstanceId(instanceId);
                Assert.isTrue((row >= 1 ? 1 : 0) != 0, () -> "Delete workflow node instance conflict: " + instanceId);
                row = this.workflowMapper.deleteByWnstanceId(instanceId);
                Assert.isTrue((row >= 1 ? 1 : 0) != 0, () -> "Delete sched workflow conflict: " + instanceId);
            } else {
                int row = this.instanceMapper.deleteByInstanceId(instanceId);
                Assert.isTrue((row == 1 ? 1 : 0) != 0, () -> "Delete sched instance conflict: " + instanceId);
                row = this.taskMapper.deleteByInstanceId(instanceId);
                Assert.isTrue((row >= 1 ? 1 : 0) != 0, () -> "Delete sched task conflict: " + instanceId);
            }
            this.log.info("Delete sched instance success {}", (Object)instanceId);
        });
    }

    public boolean terminateTask(TerminateTaskParam param) {
        ExecuteState toState = param.getToState();
        long instanceId = param.getInstanceId();
        Assert.isTrue((!ExecuteState.PAUSABLE_LIST.contains(toState) ? 1 : 0) != 0, () -> "Stop executing invalid to state " + toState);
        return this.doTransactionLockInSynchronized(instanceId, param.getWnstanceId(), (SchedInstance instance) -> {
            Assert.notNull((Object)instance, () -> "Terminate executing task failed, instance not found: " + instanceId);
            Assert.isTrue((!instance.isWorkflowLead() ? 1 : 0) != 0, () -> "Cannot direct terminate workflow lead instance: " + instance);
            if (RunState.of((Integer)instance.getRunState()).isTerminal()) {
                return false;
            }
            Date executeEndTime = toState.isTerminal() ? new Date() : null;
            int row = this.taskMapper.terminate(param.getTaskId(), toState.value(), ExecuteState.EXECUTING.value(), executeEndTime, param.getErrorMsg());
            if (row != 1) {
                this.log.warn("Conflict terminate executing task: {} | {}", (Object)param.getTaskId(), (Object)toState);
                return false;
            }
            Tuple2<RunState, Date> tuple = this.obtainRunState(this.taskMapper.findBaseByInstanceId(instanceId));
            if (tuple != null && this.instanceMapper.terminate(instanceId, ((RunState)tuple.a).value(), RUN_STATE_TERMINABLE, (Date)tuple.b) > 0) {
                if (param.getOperation().isTrigger()) {
                    instance.setRunState(Integer.valueOf(((RunState)tuple.a).value()));
                    this.afterTerminateTask((SchedInstance)instance);
                } else if (instance.isWorkflowNode()) {
                    this.updateWorkflowEdgeState((SchedInstance)instance, ((RunState)tuple.a).value(), RUN_STATE_TERMINABLE);
                    this.updateWorkflowLeadState(this.instanceMapper.getByInstanceId(param.getWnstanceId()));
                }
            }
            return true;
        });
    }

    public boolean purgeInstance(SchedInstance inst) {
        Long instanceId = inst.getInstanceId();
        return this.doTransactionLockInSynchronized((long)instanceId, inst.getWnstanceId(), (SchedInstance instance) -> {
            Assert.notNull((Object)instance, () -> "Purge instance not found: " + instanceId);
            Assert.isTrue((!instance.isWorkflowLead() ? 1 : 0) != 0, () -> "Cannot purge workflow lead instance: " + instance);
            if (!RUN_STATE_PAUSABLE.contains(instance.getRunState())) {
                return false;
            }
            List<SchedTask> tasks = this.taskMapper.findBaseByInstanceId(instanceId);
            if (tasks.stream().anyMatch(e -> ExecuteState.WAITING.equals(e.getExecuteState()))) {
                this.log.warn("Purge instance failed, has waiting task: {}", tasks);
                return false;
            }
            if (this.hasAliveExecuting(tasks)) {
                this.log.warn("Purge instance failed, has alive executing task: {}", tasks);
                return false;
            }
            Tuple2 tuple = this.obtainRunState(tasks);
            if (tuple == null) {
                tuple = Tuple2.of((Object)RunState.CANCELED, (Object)new Date());
            } else {
                Assert.isTrue((boolean)((RunState)tuple.a).isTerminal(), () -> "Purge instance state must be terminal state: " + instance);
            }
            if (this.instanceMapper.terminate(instanceId, ((RunState)tuple.a).value(), RUN_STATE_TERMINABLE, (Date)tuple.b) != 1) {
                return false;
            }
            tasks.stream().filter(e -> EXECUTE_STATE_PAUSABLE.contains(e.getExecuteState())).forEach(e -> this.taskMapper.terminate(e.getTaskId(), ExecuteState.EXECUTE_TIMEOUT.value(), e.getExecuteState(), new Date(), null));
            instance.setRunState(Integer.valueOf(((RunState)tuple.a).value()));
            this.afterTerminateTask((SchedInstance)instance);
            this.log.warn("Purge instance {} to state {}", (Object)instanceId, tuple.a);
            return true;
        });
    }

    public boolean pauseInstance(long instanceId) {
        return this.pauseInstance(instanceId, this.instanceMapper.getWnstanceId(instanceId));
    }

    public boolean pauseInstance(long instanceId, Long wnstanceId) {
        return this.doTransactionLockInSynchronized(instanceId, wnstanceId, (SchedInstance instance) -> {
            Assert.notNull((Object)instance, () -> "Pause instance not found: " + instanceId);
            if (!RUN_STATE_PAUSABLE.contains(instance.getRunState())) {
                return false;
            }
            if (instance.isWorkflow()) {
                Assert.isTrue((boolean)instance.isWorkflowLead(), () -> "Cannot pause workflow node instance: " + instanceId);
                this.workflowMapper.update(instanceId, null, RunState.PAUSED.value(), null, RUN_STATE_WAITING, null);
                this.instanceMapper.findWorkflowNode(instanceId).stream().filter(e -> RUN_STATE_PAUSABLE.contains(e.getRunState())).forEach(this::pauseInstance);
                this.updateWorkflowLeadState((SchedInstance)instance);
            } else {
                this.pauseInstance((SchedInstance)instance);
            }
            return true;
        });
    }

    public boolean cancelInstance(long instanceId, Operations ops) {
        return this.cancelInstance(instanceId, this.instanceMapper.getWnstanceId(instanceId), ops);
    }

    public boolean cancelInstance(long instanceId, Long wnstanceId, Operations ops) {
        Assert.isTrue((boolean)ops.toState().isFailure(), () -> "Cancel instance operation invalid: " + ops);
        return this.doTransactionLockInSynchronized(instanceId, wnstanceId, (SchedInstance instance) -> {
            Assert.notNull((Object)instance, () -> "Cancel instance not found: " + instanceId);
            if (RunState.of((Integer)instance.getRunState()).isTerminal()) {
                return false;
            }
            if (instance.isWorkflow()) {
                Assert.isTrue((boolean)instance.isWorkflowLead(), () -> "Cannot cancel workflow node instance: " + instanceId);
                this.workflowMapper.update(instanceId, null, RunState.CANCELED.value(), null, RUN_STATE_WAITING, null);
                this.instanceMapper.findWorkflowNode(instanceId).stream().filter(e -> !RunState.of((Integer)e.getRunState()).isTerminal()).forEach(e -> this.cancelInstance((SchedInstance)e, ops));
                this.updateWorkflowLeadState((SchedInstance)instance);
            } else {
                this.cancelInstance((SchedInstance)instance, ops);
            }
            return true;
        });
    }

    public boolean resumeInstance(long instanceId) {
        Long wnstanceId = this.instanceMapper.getWnstanceId(instanceId);
        return this.doTransactionLockInSynchronized(instanceId, wnstanceId, (SchedInstance instance) -> {
            Assert.notNull((Object)instance, () -> "Cancel failed, instance_id not found: " + instanceId);
            if (!RunState.PAUSED.equals(instance.getRunState())) {
                return false;
            }
            if (instance.isWorkflow()) {
                Assert.isTrue((boolean)instance.isWorkflowLead(), () -> "Cannot resume workflow node instance: " + instanceId);
                int row = this.instanceMapper.updateState(instanceId, RunState.RUNNING.value(), RunState.PAUSED.value());
                Assert.state((row == 1 ? 1 : 0) != 0, () -> "Resume workflow lead instance failed: " + instanceId);
                this.workflowMapper.resumeWaiting(instanceId);
                for (SchedInstance nodeInstance : this.instanceMapper.findWorkflowNode(instanceId)) {
                    if (!RunState.PAUSED.equals(nodeInstance.getRunState())) continue;
                    this.resumeInstance(nodeInstance);
                    this.updateWorkflowEdgeState(nodeInstance, RunState.RUNNING.value(), RUN_STATE_PAUSED);
                }
                WorkflowGraph graph = new WorkflowGraph(this.workflowMapper.findByWnstanceId(wnstanceId));
                this.createWorkflowNode((SchedInstance)instance, graph, graph.map(), ExceptionUtils::rethrow);
            } else {
                this.resumeInstance((SchedInstance)instance);
            }
            return true;
        });
    }

    private void doTransactionLockInSynchronized(long instanceId, Long wnstanceId, Consumer<SchedInstance> action) {
        this.doTransactionLockInSynchronized(instanceId, wnstanceId, (SchedInstance instance) -> {
            action.accept((SchedInstance)instance);
            return Boolean.TRUE;
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean doTransactionLockInSynchronized(long instanceId, Long wnstanceId, Function<SchedInstance, Boolean> action) {
        Long lockInstanceId = wnstanceId == null ? instanceId : wnstanceId;
        Long l = (Long)INTERNER_POOL.intern((Object)lockInstanceId);
        synchronized (l) {
            Boolean result = (Boolean)this.transactionTemplate.execute(status -> {
                SchedInstance lockInstance = this.instanceMapper.lock(lockInstanceId);
                Assert.notNull((Object)lockInstance, () -> "Lock instance not found: " + lockInstanceId);
                SchedInstance instance = instanceId == lockInstanceId ? lockInstance : this.instanceMapper.getByInstanceId(instanceId);
                Assert.notNull((Object)instance, () -> "Instance not found: " + instance);
                if (!Objects.equals(instance.getWnstanceId(), wnstanceId)) {
                    throw new IllegalArgumentException("Invalid workflow instance id: " + wnstanceId + ", " + instance);
                }
                return (Boolean)action.apply(instance);
            });
            return Boolean.TRUE.equals(result);
        }
    }

    private Tuple2<RunState, Date> obtainRunState(List<SchedTask> tasks) {
        List states = tasks.stream().map(SchedTask::getExecuteState).map(ExecuteState::of).collect(Collectors.toList());
        if (states.stream().allMatch(ExecuteState::isTerminal)) {
            return Tuple2.of((Object)(states.stream().anyMatch(ExecuteState::isFailure) ? RunState.CANCELED : RunState.FINISHED), (Object)tasks.stream().map(SchedTask::getExecuteEndTime).filter(Objects::nonNull).max(Comparator.naturalOrder()).orElseGet(Date::new));
        }
        return states.stream().anyMatch(ExecuteState.PAUSABLE_LIST::contains) ? null : Tuple2.of((Object)RunState.PAUSED, null);
    }

    private void createInstance(TriggerInstance tInstance) {
        this.instanceMapper.insert(tInstance.getInstance());
        if (tInstance instanceof NormalInstanceCreator.NormalInstance) {
            NormalInstanceCreator.NormalInstance creator = (NormalInstanceCreator.NormalInstance)tInstance;
            Collects.batchProcess(creator.getTasks(), this.taskMapper::batchInsert, (int)200);
        } else if (tInstance instanceof WorkflowInstanceCreator.WorkflowInstance) {
            WorkflowInstanceCreator.WorkflowInstance creator = (WorkflowInstanceCreator.WorkflowInstance)tInstance;
            Collects.batchProcess(creator.getWorkflows(), this.workflowMapper::batchInsert, (int)200);
            for (Tuple2<SchedInstance, List<SchedTask>> sub : creator.getNodeInstances()) {
                this.instanceMapper.insert((SchedInstance)sub.a);
                Collects.batchProcess((List)((List)sub.b), this.taskMapper::batchInsert, (int)200);
            }
        } else {
            throw new UnsupportedOperationException("Unknown instance creator type: " + tInstance.getClass());
        }
    }

    private void pauseInstance(SchedInstance instance) {
        Assert.isTrue((boolean)RUN_STATE_PAUSABLE.contains(instance.getRunState()), () -> "Invalid pause instance state: " + instance);
        long instanceId = instance.getInstanceId();
        Operations ops = Operations.PAUSE;
        this.taskMapper.updateStateByInstanceId(instanceId, ops.toState().value(), EXECUTE_STATE_WAITING, null);
        List<ExecuteTaskParam> executingTasks = this.loadExecutingTasks(instance, ops);
        if (executingTasks.isEmpty()) {
            Tuple2<RunState, Date> tuple = this.obtainRunState(this.taskMapper.findBaseByInstanceId(instanceId));
            Assert.notNull(tuple, () -> "Pause instance failed: " + instanceId);
            int row = this.instanceMapper.terminate(instanceId, ((RunState)tuple.a).value(), RUN_STATE_PAUSABLE, (Date)tuple.b);
            Assert.isTrue((row == 1 ? 1 : 0) != 0, () -> "Pause instance failed: " + instance + " | " + tuple.a);
            if (instance.isWorkflowNode()) {
                this.updateWorkflowEdgeState(instance, ((RunState)tuple.a).value(), RUN_STATE_PAUSABLE);
            }
        } else {
            TransactionUtils.doAfterTransactionCommit(() -> super.dispatch(executingTasks));
        }
    }

    private void cancelInstance(SchedInstance instance, Operations ops) {
        long instanceId = instance.getInstanceId();
        this.taskMapper.updateStateByInstanceId(instanceId, ops.toState().value(), EXECUTE_STATE_EXECUTABLE, new Date());
        List<ExecuteTaskParam> executingTasks = this.loadExecutingTasks(instance, ops);
        if (executingTasks.isEmpty()) {
            RunState toState;
            int row;
            Tuple2 tuple = this.obtainRunState(this.taskMapper.findBaseByInstanceId(instanceId));
            Assert.notNull(tuple, () -> "Cancel instance failed: " + instanceId);
            if (tuple.a == RunState.PAUSED) {
                tuple = Tuple2.of((Object)RunState.CANCELED, (Object)new Date());
            }
            Assert.isTrue(((row = this.instanceMapper.terminate(instanceId, (toState = (RunState)tuple.a).value(), RUN_STATE_TERMINABLE, (Date)tuple.b)) == 1 ? 1 : 0) != 0, () -> "Cancel instance failed: " + instance + " | " + toState);
            if (instance.isWorkflowNode()) {
                this.updateWorkflowEdgeState(instance, ((RunState)tuple.a).value(), RUN_STATE_TERMINABLE);
            }
        } else {
            TransactionUtils.doAfterTransactionCommit(() -> super.dispatch(executingTasks));
        }
    }

    private void resumeInstance(SchedInstance instance) {
        long instanceId = instance.getInstanceId();
        int row = this.instanceMapper.updateState(instanceId, RunState.WAITING.value(), RunState.PAUSED.value());
        Assert.state((row == 1 ? 1 : 0) != 0, (String)"Resume sched instance failed.");
        row = this.taskMapper.updateStateByInstanceId(instanceId, ExecuteState.WAITING.value(), EXECUTE_STATE_PAUSED, null);
        Assert.state((row >= 1 ? 1 : 0) != 0, (String)"Resume sched task failed.");
        Tuple3<SchedJob, SchedInstance, List<SchedTask>> params = this.buildDispatchParams(instanceId, row);
        TransactionUtils.doAfterTransactionCommit(() -> super.dispatch((SchedJob)params.a, (SchedInstance)params.b, (List)params.c));
    }

    private void updateWorkflowLeadState(SchedInstance instance) {
        Assert.isTrue((boolean)instance.isWorkflowLead(), () -> "Must terminate workflow lead instance: " + instance);
        long wnstanceId = instance.getWnstanceId();
        List<SchedWorkflow> workflows = this.workflowMapper.findByWnstanceId(wnstanceId);
        WorkflowGraph graph = new WorkflowGraph(workflows);
        this.updateWorkflowEndState(graph);
        if (graph.allMatch(e -> ((SchedWorkflow)e.getValue()).isTerminal())) {
            RunState state = graph.anyMatch(e -> ((SchedWorkflow)e.getValue()).isFailure()) ? RunState.CANCELED : RunState.FINISHED;
            int row = this.instanceMapper.terminate(instance.getWnstanceId(), state.value(), RUN_STATE_TERMINABLE, new Date());
            Assert.isTrue((row == 1 ? 1 : 0) != 0, () -> "Update workflow lead instance state failed: " + instance + " | " + state);
        } else if (workflows.stream().noneMatch(e -> RunState.RUNNING.equals(e.getRunState()))) {
            RunState state = RunState.PAUSED;
            int row = this.instanceMapper.updateState(instance.getWnstanceId(), state.value(), instance.getRunState());
            Assert.isTrue((row == 1 ? 1 : 0) != 0, () -> "Update workflow lead instance state failed: " + instance + " | " + state);
        }
    }

    private void updateWorkflowEdgeState(SchedInstance instance, Integer toState, List<Integer> fromStates) {
        String curNode = instance.parseAttach().getCurNode();
        int row = this.workflowMapper.update(instance.getWnstanceId(), curNode, toState, null, fromStates, instance.getInstanceId());
        Assert.isTrue((row > 0 ? 1 : 0) != 0, () -> "Update workflow state failed: " + instance + " | " + toState);
    }

    private void updateWorkflowEndState(WorkflowGraph graph) {
        Map ends;
        long wnstanceId = ((SchedWorkflow)Collects.getFirst(graph.map().values())).getWnstanceId();
        if (graph.anyMatch(e -> ((DAGEdge)e.getKey()).getTarget().isEnd() && !((SchedWorkflow)e.getValue()).isTerminal()) && (ends = graph.predecessors(DAGNode.END)).values().stream().allMatch(SchedWorkflow::isTerminal)) {
            RunState endState = ends.values().stream().anyMatch(SchedWorkflow::isFailure) ? RunState.CANCELED : RunState.FINISHED;
            int row = this.workflowMapper.update(wnstanceId, DAGNode.END.toString(), endState.value(), null, RUN_STATE_TERMINABLE, null);
            Assert.isTrue((row > 0 ? 1 : 0) != 0, () -> "Update workflow end node failed: " + wnstanceId + " | " + endState);
            ends.forEach((k, v) -> graph.get(k.getTarget(), DAGNode.END).setRunState(Integer.valueOf(endState.value())));
        }
    }

    private void createWorkflowNode(SchedInstance leadInstance, WorkflowGraph graph, Map<DAGEdge, SchedWorkflow> map, Function<Throwable, Boolean> failHandler) {
        SchedJob job = (SchedJob)LazyLoader.of(SchedJob.class, this.jobMapper::getByJobId, (Object)leadInstance.getJobId());
        long wnstanceId = leadInstance.getWnstanceId();
        Date now = new Date();
        HashSet<DAGNode> duplicates = new HashSet<DAGNode>();
        for (Map.Entry<DAGEdge, SchedWorkflow> entry : map.entrySet()) {
            Map predecessors;
            DAGNode target = entry.getKey().getTarget();
            SchedWorkflow workflow = entry.getValue();
            if (target.isEnd() || !RunState.WAITING.equals(workflow.getRunState()) || !duplicates.add(target) || (predecessors = graph.predecessors(target)).values().stream().anyMatch(e -> !RunState.of((Integer)e.getRunState()).isTerminal())) continue;
            if (predecessors.values().stream().anyMatch(e -> RunState.of((Integer)e.getRunState()).isFailure())) {
                RunState state = RunState.CANCELED;
                int row = this.workflowMapper.update(wnstanceId, workflow.getCurNode(), state.value(), null, RUN_STATE_TERMINABLE, null);
                Assert.isTrue((row > 0 ? 1 : 0) != 0, () -> "Update workflow cur node state failed: " + workflow + " | " + state);
                continue;
            }
            try {
                long nextInstanceId = this.generateId();
                List<SchedTask> tasks = this.splitTasks(JobHandlerParam.from((SchedJob)job, (String)target.getName()), nextInstanceId, new Date());
                long triggerTime = leadInstance.getTriggerTime() + (long)workflow.getSequence().intValue();
                SchedInstance nextInstance = SchedInstance.create((long)nextInstanceId, (long)job.getJobId(), (RunType)RunType.of((Integer)leadInstance.getRunType()), (long)triggerTime, (int)0, (Date)now);
                nextInstance.setRnstanceId(Long.valueOf(wnstanceId));
                nextInstance.setPnstanceId(predecessors.isEmpty() ? null : ((SchedWorkflow)Collects.getFirst(predecessors.values())).getInstanceId());
                nextInstance.setWnstanceId(Long.valueOf(wnstanceId));
                nextInstance.setAttach(Jsons.toJson((Object)new InstanceAttach(workflow.getCurNode())));
                int row = this.workflowMapper.update(wnstanceId, workflow.getCurNode(), RunState.RUNNING.value(), nextInstanceId, RUN_STATE_WAITING, null);
                Assert.isTrue((row > 0 ? 1 : 0) != 0, () -> "Start workflow node failed: " + workflow);
                this.instanceMapper.insert(nextInstance);
                Collects.batchProcess(tasks, this.taskMapper::batchInsert, (int)200);
                TransactionUtils.doAfterTransactionCommit(() -> super.dispatch(job, nextInstance, tasks));
            }
            catch (Throwable t) {
                Boolean result = failHandler.apply(t);
                if (!Boolean.FALSE.equals(result)) continue;
                return;
            }
        }
    }

    private void afterTerminateTask(SchedInstance instance) {
        RunState runState = RunState.of((Integer)instance.getRunState());
        if (runState == RunState.CANCELED) {
            this.retryJob(instance);
        } else if (runState == RunState.FINISHED) {
            if (instance.isWorkflowNode()) {
                this.processWorkflow(instance);
            } else {
                this.dependJob(instance);
            }
        } else {
            this.log.error("Unknown terminate run state " + runState);
        }
    }

    private void processWorkflow(SchedInstance nodeInstance) {
        if (!nodeInstance.isWorkflowNode()) {
            return;
        }
        RunState runState = RunState.of((Integer)nodeInstance.getRunState());
        Long wnstanceId = nodeInstance.getWnstanceId();
        this.updateWorkflowEdgeState(nodeInstance, runState.value(), RUN_STATE_TERMINABLE);
        if (runState == RunState.CANCELED) {
            this.workflowMapper.update(wnstanceId, null, RunState.CANCELED.value(), null, RUN_STATE_RUNNABLE, null);
        }
        WorkflowGraph graph = new WorkflowGraph(this.workflowMapper.findByWnstanceId(wnstanceId));
        this.updateWorkflowEndState(graph);
        if (graph.allMatch(e -> ((SchedWorkflow)e.getValue()).isTerminal())) {
            RunState state = graph.anyMatch(e -> ((SchedWorkflow)e.getValue()).isFailure()) ? RunState.CANCELED : RunState.FINISHED;
            int row = this.instanceMapper.terminate(wnstanceId, state.value(), RUN_STATE_TERMINABLE, new Date());
            Assert.isTrue((row == 1 ? 1 : 0) != 0, () -> "Terminate workflow lead instance failed: " + nodeInstance + " | " + state);
            this.afterTerminateTask(this.instanceMapper.getByInstanceId(wnstanceId));
            return;
        }
        if (runState == RunState.CANCELED) {
            return;
        }
        this.createWorkflowNode(this.instanceMapper.getByInstanceId(wnstanceId), graph, graph.successors(DAGNode.fromString((String)nodeInstance.parseAttach().getCurNode())), throwable -> {
            this.log.error("Split workflow job task error: " + nodeInstance, throwable);
            nodeInstance.setRunState(Integer.valueOf(RunState.CANCELED.value()));
            this.processWorkflow(nodeInstance);
            return false;
        });
    }

    private void retryJob(SchedInstance prev) {
        List<Object> tasks;
        SchedJob schedJob = this.jobMapper.getByJobId(prev.getJobId());
        if (schedJob == null) {
            this.log.error("Sched job not found {}", (Object)prev.getJobId());
            this.processWorkflow(prev);
            return;
        }
        List<SchedTask> prevTasks = this.taskMapper.findLargeByInstanceId(prev.getInstanceId());
        RetryType retryType = RetryType.of((Integer)schedJob.getRetryType());
        if (retryType == RetryType.NONE || schedJob.getRetryCount() < 1) {
            this.processWorkflow(prev);
            return;
        }
        int retriedCount = Optional.ofNullable(prev.getRetriedCount()).orElse(0);
        if (retriedCount >= schedJob.getRetryCount()) {
            this.processWorkflow(prev);
            return;
        }
        long retryInstanceId = this.generateId();
        if (prev.isWorkflowNode()) {
            String curNode = prev.parseAttach().getCurNode();
            int row = this.workflowMapper.update(prev.getWnstanceId(), curNode, null, retryInstanceId, RUN_STATE_RUNNING, prev.getInstanceId());
            Assert.isTrue((row > 0 ? 1 : 0) != 0, () -> "Retry workflow node failed: " + prev);
        }
        Date now = new Date();
        long triggerTime = schedJob.computeRetryTriggerTime(++retriedCount, now);
        SchedInstance retryInstance = SchedInstance.create((long)retryInstanceId, (long)schedJob.getJobId(), (RunType)RunType.RETRY, (long)triggerTime, (int)retriedCount, (Date)now);
        retryInstance.setRnstanceId(prev.obtainRnstanceId());
        retryInstance.setPnstanceId(prev.getInstanceId());
        retryInstance.setWnstanceId(prev.getWnstanceId());
        retryInstance.setAttach(prev.getAttach());
        if (retryType == RetryType.ALL) {
            try {
                tasks = this.splitTasks(JobHandlerParam.from((SchedJob)schedJob), retryInstance.getInstanceId(), now);
            }
            catch (Throwable t) {
                this.log.error("Split retry job error: " + schedJob + ", " + prev, t);
                this.processWorkflow(prev);
                return;
            }
        } else if (retryType == RetryType.FAILED) {
            tasks = prevTasks.stream().filter(e -> ExecuteState.of((Integer)e.getExecuteState()).isFailure()).filter(e -> !RouteStrategy.BROADCAST.equals(schedJob.getRouteStrategy()) || super.isAliveWorker(e.getWorker())).map(e -> SchedTask.create((String)e.getTaskParam(), (long)this.generateId(), (long)retryInstanceId, (int)e.getTaskNo(), (int)e.getTaskCount(), (Date)now, (String)e.getWorker())).collect(Collectors.toList());
        } else {
            this.log.error("Unknown job retry type {}", (Object)schedJob);
            this.processWorkflow(prev);
            return;
        }
        Assert.notEmpty(tasks, (String)"Insert list of task cannot be empty.");
        this.instanceMapper.insert(retryInstance);
        Collects.batchProcess(tasks, this.taskMapper::batchInsert, (int)200);
        TransactionUtils.doAfterTransactionCommit(() -> super.dispatch(schedJob, retryInstance, tasks));
    }

    private void dependJob(SchedInstance parentInstance) {
        List<SchedDepend> schedDepends = this.dependMapper.findByParentJobId(parentInstance.getJobId());
        if (CollectionUtils.isEmpty(schedDepends)) {
            return;
        }
        for (SchedDepend depend : schedDepends) {
            Runnable dispatchAction;
            SchedJob childJob = this.jobMapper.getByJobId(depend.getChildJobId());
            if (childJob == null) {
                this.log.error("Child sched job not found: {} | {}", (Object)depend.getParentJobId(), (Object)depend.getChildJobId());
                continue;
            }
            if (JobState.DISABLE.equals(childJob.getJobState()) || (dispatchAction = (Runnable)TransactionUtils.doInNestedTransaction((PlatformTransactionManager)Objects.requireNonNull(this.transactionTemplate.getTransactionManager()), () -> {
                TriggerInstanceCreator<?> creator = TriggerInstanceCreator.of(childJob.getJobType(), this);
                long triggerTime = parentInstance.getTriggerTime() / 1000L * 1000L + (long)depend.getSequence().intValue();
                Object tInstance = creator.create(childJob, RunType.DEPEND, triggerTime);
                ((TriggerInstance)tInstance).getInstance().setRnstanceId(parentInstance.obtainRnstanceId());
                ((TriggerInstance)tInstance).getInstance().setPnstanceId(parentInstance.getInstanceId());
                this.createInstance((TriggerInstance)tInstance);
                return () -> creator.dispatch(childJob, tInstance);
            }, t -> this.log.error("Depend job instance created fail: " + parentInstance + " | " + childJob, t))) == null) continue;
            TransactionUtils.doAfterTransactionCommit((Runnable)dispatchAction);
        }
    }

    private List<ExecuteTaskParam> loadExecutingTasks(SchedInstance instance, Operations ops) {
        ArrayList<ExecuteTaskParam> executingTasks = new ArrayList<ExecuteTaskParam>();
        ExecuteTaskParamBuilder builder = ExecuteTaskParam.builder((SchedInstance)instance, this.jobMapper::getByJobId);
        long triggerTime = 0L;
        for (SchedTask task : this.taskMapper.findBaseByInstanceId(instance.getInstanceId())) {
            if (!ExecuteState.EXECUTING.equals(task.getExecuteState())) continue;
            Worker worker = Worker.deserialize((String)task.getWorker());
            if (super.isAliveWorker(worker)) {
                executingTasks.add(builder.build(ops, task.getTaskId().longValue(), triggerTime, worker));
                continue;
            }
            Date executeEndTime = ops.toState().isTerminal() ? new Date() : null;
            int row = this.taskMapper.terminate(task.getTaskId(), ops.toState().value(), ExecuteState.EXECUTING.value(), executeEndTime, null);
            if (row != 1) {
                this.log.error("Cancel the dead task failed: {}", (Object)task);
                executingTasks.add(builder.build(ops, task.getTaskId().longValue(), triggerTime, worker));
                continue;
            }
            this.log.info("Cancel the dead task success: {}", (Object)task);
        }
        return executingTasks;
    }

    private Tuple3<SchedJob, SchedInstance, List<SchedTask>> buildDispatchParams(long instanceId, int expectTaskSize) {
        SchedInstance instance = this.instanceMapper.getByInstanceId(instanceId);
        SchedJob job = this.jobMapper.getByJobId(instance.getJobId());
        List waitingTasks = this.taskMapper.findLargeByInstanceId(instanceId).stream().filter(e -> ExecuteState.WAITING.equals(e.getExecuteState())).collect(Collectors.toList());
        if (waitingTasks.size() != expectTaskSize) {
            throw new IllegalStateException("Invalid dispatching tasks size: expect=" + expectTaskSize + ", actual=" + waitingTasks.size());
        }
        return Tuple3.of((Object)job, (Object)instance, waitingTasks);
    }

    private void parseTriggerConfig(SchedJob job) {
        TriggerType triggerType = TriggerType.of((Integer)job.getTriggerType());
        if (triggerType == TriggerType.DEPEND) {
            List<Long> parentJobIds = Arrays.stream(job.getTriggerValue().split(",")).filter(StringUtils::isNotBlank).map(e -> Long.parseLong(e.trim())).distinct().collect(Collectors.toList());
            Assert.notEmpty(parentJobIds, () -> "Invalid dependency parent job id config: " + job.getTriggerValue());
            Map parentJobMap = this.jobMapper.findByJobIds(parentJobIds).stream().collect(Collectors.toMap(SchedJob::getJobId, Function.identity()));
            for (Long parentJobId : parentJobIds) {
                SchedJob parentJob = (SchedJob)parentJobMap.get(parentJobId);
                Assert.notNull((Object)parentJob, () -> "Parent job id not found: " + parentJobId);
                if (job.getJobGroup().equals(parentJob.getJobGroup())) continue;
                throw new IllegalArgumentException("Invalid group: parent=" + parentJob.getJobGroup() + ", child=" + job.getJobGroup());
            }
            ArrayList<SchedDepend> list = new ArrayList<SchedDepend>(parentJobIds.size());
            for (int i = 0; i < parentJobIds.size(); ++i) {
                list.add(new SchedDepend(parentJobIds.get(i), job.getJobId(), Integer.valueOf(i + 1)));
            }
            this.dependMapper.batchInsert(list);
            job.setTriggerValue(Joiner.on((String)",").join(parentJobIds));
            job.setNextTriggerTime(null);
        } else {
            Date nextTriggerTime = triggerType.computeNextFireTime(job.getTriggerValue(), new Date());
            Assert.notNull((Object)nextTriggerTime, () -> "Has not next trigger time " + job.getTriggerValue());
            job.setNextTriggerTime(Long.valueOf(nextTriggerTime.getTime()));
        }
    }
}

