/*
 * Decompiled with CFR 0.152.
 */
package cn.ponfee.disjob.supervisor.instance;

import cn.ponfee.disjob.common.dag.DAGEdge;
import cn.ponfee.disjob.common.dag.DAGExpressionParser;
import cn.ponfee.disjob.common.dag.DAGNode;
import cn.ponfee.disjob.common.date.Dates;
import cn.ponfee.disjob.common.tuple.Tuple2;
import cn.ponfee.disjob.common.util.Jsons;
import cn.ponfee.disjob.core.enums.RunState;
import cn.ponfee.disjob.core.enums.RunType;
import cn.ponfee.disjob.core.exception.JobException;
import cn.ponfee.disjob.core.model.InstanceAttach;
import cn.ponfee.disjob.core.model.SchedInstance;
import cn.ponfee.disjob.core.model.SchedJob;
import cn.ponfee.disjob.core.model.SchedTask;
import cn.ponfee.disjob.core.model.SchedWorkflow;
import cn.ponfee.disjob.core.param.worker.JobHandlerParam;
import cn.ponfee.disjob.supervisor.dag.WorkflowGraph;
import cn.ponfee.disjob.supervisor.instance.TriggerInstance;
import cn.ponfee.disjob.supervisor.instance.TriggerInstanceCreator;
import cn.ponfee.disjob.supervisor.service.DistributedJobManager;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.mutable.MutableInt;

public class WorkflowInstanceCreator
extends TriggerInstanceCreator<WorkflowInstance> {
    public WorkflowInstanceCreator(DistributedJobManager jobManager) {
        super(jobManager);
    }

    @Override
    public WorkflowInstance create(SchedJob job, RunType runType, long triggerTime) throws JobException {
        Date now = new Date();
        long wnstanceId = this.jobManager.generateId();
        SchedInstance leadInstance = SchedInstance.create((long)wnstanceId, (long)job.getJobId(), (RunType)runType, (long)triggerTime, (int)0, (Date)now);
        leadInstance.setRunState(Integer.valueOf(RunState.RUNNING.value()));
        leadInstance.setRunStartTime(Dates.max((Date)now, (Date)new Date(triggerTime)));
        leadInstance.setWnstanceId(Long.valueOf(wnstanceId));
        MutableInt sequence = new MutableInt(1);
        List<SchedWorkflow> workflows = new DAGExpressionParser(job.getJobHandler()).parse().edges().stream().map(e -> new SchedWorkflow(Long.valueOf(wnstanceId), ((DAGNode)e.target()).toString(), ((DAGNode)e.source()).toString(), sequence.getAndIncrement())).collect(Collectors.toList());
        ArrayList<Tuple2<SchedInstance, List<SchedTask>>> nodeInstances = new ArrayList<Tuple2<SchedInstance, List<SchedTask>>>();
        for (Map.Entry<DAGEdge, SchedWorkflow> firstTriggers : new WorkflowGraph(workflows).successors(DAGNode.START).entrySet()) {
            DAGNode node = firstTriggers.getKey().getTarget();
            SchedWorkflow workflow = firstTriggers.getValue();
            long nodeInstanceId = this.jobManager.generateId();
            long nodeTriggerTime = triggerTime + (long)workflow.getSequence().intValue();
            workflow.setInstanceId(Long.valueOf(nodeInstanceId));
            workflow.setRunState(Integer.valueOf(RunState.RUNNING.value()));
            SchedInstance nodeInstance = SchedInstance.create((long)nodeInstanceId, (long)job.getJobId(), (RunType)runType, (long)nodeTriggerTime, (int)0, (Date)now);
            nodeInstance.setRnstanceId(Long.valueOf(wnstanceId));
            nodeInstance.setPnstanceId(Long.valueOf(wnstanceId));
            nodeInstance.setWnstanceId(Long.valueOf(wnstanceId));
            nodeInstance.setAttach(Jsons.toJson((Object)InstanceAttach.of((DAGNode)node)));
            JobHandlerParam param = JobHandlerParam.from((SchedJob)job, (String)node.getName());
            List<SchedTask> tasks = this.jobManager.splitTasks(param, nodeInstance.getInstanceId(), now);
            nodeInstances.add((Tuple2<SchedInstance, List<SchedTask>>)Tuple2.of((Object)nodeInstance, tasks));
        }
        return new WorkflowInstance(leadInstance, workflows, nodeInstances);
    }

    @Override
    public void dispatch(SchedJob job, WorkflowInstance wnstance) {
        for (Tuple2<SchedInstance, List<SchedTask>> nodeInstance : wnstance.getNodeInstances()) {
            this.jobManager.dispatch(job, (SchedInstance)nodeInstance.a, (List)nodeInstance.b);
        }
    }

    public static class WorkflowInstance
    extends TriggerInstance {
        private final List<SchedWorkflow> workflows;
        private final List<Tuple2<SchedInstance, List<SchedTask>>> nodeInstances;

        public WorkflowInstance(SchedInstance instance, List<SchedWorkflow> workflows, List<Tuple2<SchedInstance, List<SchedTask>>> nodeInstances) {
            super(instance);
            this.workflows = workflows;
            this.nodeInstances = nodeInstances;
        }

        public List<SchedWorkflow> getWorkflows() {
            return this.workflows;
        }

        public List<Tuple2<SchedInstance, List<SchedTask>>> getNodeInstances() {
            return this.nodeInstances;
        }
    }
}

