/*
 * Decompiled with CFR 0.152.
 */
package cn.ponfee.scheduler.supervisor.instance;

import cn.ponfee.scheduler.common.graph.DAGEdge;
import cn.ponfee.scheduler.common.graph.DAGExpressionParser;
import cn.ponfee.scheduler.common.graph.DAGNode;
import cn.ponfee.scheduler.common.tuple.Tuple2;
import cn.ponfee.scheduler.common.util.Jsons;
import cn.ponfee.scheduler.core.enums.RunState;
import cn.ponfee.scheduler.core.enums.RunType;
import cn.ponfee.scheduler.core.exception.JobException;
import cn.ponfee.scheduler.core.graph.WorkflowGraph;
import cn.ponfee.scheduler.core.model.SchedInstance;
import cn.ponfee.scheduler.core.model.SchedJob;
import cn.ponfee.scheduler.core.model.SchedTask;
import cn.ponfee.scheduler.core.model.SchedWorkflow;
import cn.ponfee.scheduler.core.model.WorkflowAttach;
import cn.ponfee.scheduler.supervisor.instance.TriggerInstance;
import cn.ponfee.scheduler.supervisor.instance.TriggerInstanceCreator;
import cn.ponfee.scheduler.supervisor.manager.SchedulerJobManager;
import cn.ponfee.scheduler.supervisor.param.SplitJobParam;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.mutable.MutableInt;

public class WorkflowInstanceCreator
extends TriggerInstanceCreator<WorkflowInstance> {
    public WorkflowInstanceCreator(SchedulerJobManager manager) {
        super(manager);
    }

    @Override
    public WorkflowInstance create(SchedJob job, RunType runType, long triggerTime) throws JobException {
        Date now = new Date();
        long workflowInstanceId = this.manager.generateId();
        SchedInstance workflowInstance = SchedInstance.create((long)workflowInstanceId, (long)job.getJobId(), (RunType)runType, (long)triggerTime, (int)0, (Date)now);
        workflowInstance.setRunState(Integer.valueOf(RunState.RUNNING.value()));
        workflowInstance.setRunStartTime(now);
        workflowInstance.setWorkflowInstanceId(Long.valueOf(workflowInstanceId));
        MutableInt sequence = new MutableInt(1);
        List<SchedWorkflow> workflows = new DAGExpressionParser(job.getJobHandler()).parse().edges().stream().map(e -> new SchedWorkflow(Long.valueOf(workflowInstanceId), ((DAGNode)e.target()).toString(), ((DAGNode)e.source()).toString(), sequence.getAndIncrement())).collect(Collectors.toList());
        ArrayList<Tuple2<SchedInstance, List<SchedTask>>> subInstances = new ArrayList<Tuple2<SchedInstance, List<SchedTask>>>();
        for (Map.Entry each : new WorkflowGraph(workflows).successors(DAGNode.START).entrySet()) {
            long subTriggerTime = triggerTime + (long)((SchedWorkflow)each.getValue()).getSequence().intValue();
            long subInstanceId = this.manager.generateId();
            ((SchedWorkflow)each.getValue()).setInstanceId(Long.valueOf(subInstanceId));
            SchedInstance subInstance = SchedInstance.create((long)subInstanceId, (long)job.getJobId(), (RunType)runType, (long)subTriggerTime, (int)0, (Date)now);
            subInstance.setRootInstanceId(Long.valueOf(workflowInstanceId));
            subInstance.setParentInstanceId(Long.valueOf(workflowInstanceId));
            subInstance.setWorkflowInstanceId(Long.valueOf(workflowInstanceId));
            subInstance.setAttach(Jsons.toJson((Object)WorkflowAttach.of((DAGNode)((DAGEdge)each.getKey()).getTarget())));
            SplitJobParam param = SplitJobParam.from(job, ((DAGEdge)each.getKey()).getTarget().getName());
            List<SchedTask> tasks = this.manager.splitTasks(param, subInstance.getInstanceId(), now);
            subInstances.add((Tuple2<SchedInstance, List<SchedTask>>)Tuple2.of((Object)subInstance, tasks));
        }
        return new WorkflowInstance(workflowInstance, workflows, subInstances);
    }

    @Override
    public void dispatch(SchedJob job, WorkflowInstance instance) {
        for (Tuple2<SchedInstance, List<SchedTask>> subInstance : instance.getSubInstances()) {
            this.manager.dispatch(job, (SchedInstance)subInstance.a, (List)subInstance.b);
        }
    }

    public static class WorkflowInstance
    extends TriggerInstance {
        private final List<SchedWorkflow> workflows;
        private final List<Tuple2<SchedInstance, List<SchedTask>>> subInstances;

        public WorkflowInstance(SchedInstance instance, List<SchedWorkflow> workflows, List<Tuple2<SchedInstance, List<SchedTask>>> subInstances) {
            super(instance);
            this.workflows = workflows;
            this.subInstances = subInstances;
        }

        public List<SchedWorkflow> getWorkflows() {
            return this.workflows;
        }

        public List<Tuple2<SchedInstance, List<SchedTask>>> getSubInstances() {
            return this.subInstances;
        }
    }
}

