package com.github.joekerouac.async.task.flow.service;

import com.github.joekerouac.async.task.AsyncTaskService;
import com.github.joekerouac.async.task.Const;
import com.github.joekerouac.async.task.db.TransUtil;
import com.github.joekerouac.async.task.exception.ProcessorAlreadyExistException;
import com.github.joekerouac.async.task.flow.AbstractFlowProcessor;
import com.github.joekerouac.async.task.flow.FlowService;
import com.github.joekerouac.async.task.flow.enums.FlowTaskStatus;
import com.github.joekerouac.async.task.flow.enums.FlowTaskType;
import com.github.joekerouac.async.task.flow.enums.StrategyResult;
import com.github.joekerouac.async.task.flow.enums.TaskNodeStatus;
import com.github.joekerouac.async.task.flow.impl.repository.FlowTaskRepositoryImpl;
import com.github.joekerouac.async.task.flow.impl.repository.TaskNodeMapRepositoryImpl;
import com.github.joekerouac.async.task.flow.impl.repository.TaskNodeRepositoryImpl;
import com.github.joekerouac.async.task.flow.model.FlowServiceConfig;
import com.github.joekerouac.async.task.flow.model.FlowTask;
import com.github.joekerouac.async.task.flow.model.FlowTaskModel;
import com.github.joekerouac.async.task.flow.model.SetTaskModel;
import com.github.joekerouac.async.task.flow.model.StreamTaskModel;
import com.github.joekerouac.async.task.flow.model.TaskNode;
import com.github.joekerouac.async.task.flow.model.TaskNodeMap;
import com.github.joekerouac.async.task.flow.model.TaskNodeModel;
import com.github.joekerouac.async.task.flow.service.AbstractFlowTaskEngine;
import com.github.joekerouac.async.task.flow.spi.ExecuteStrategy;
import com.github.joekerouac.async.task.flow.spi.FlowTaskRepository;
import com.github.joekerouac.async.task.flow.spi.TaskNodeMapRepository;
import com.github.joekerouac.async.task.flow.spi.TaskNodeRepository;
import com.github.joekerouac.async.task.model.TransStrategy;
import com.github.joekerouac.async.task.spi.ConnectionSelector;
import com.github.joekerouac.async.task.spi.IDGenerator;
import com.github.joekerouac.async.task.spi.TransactionCallback;
import com.github.joekerouac.async.task.spi.TransactionHook;
import com.github.joekerouac.common.tools.collection.CollectionUtil;
import com.github.joekerouac.common.tools.collection.Pair;
import com.github.joekerouac.common.tools.constant.ExceptionProviderConst;
import com.github.joekerouac.common.tools.db.SqlUtil;
import com.github.joekerouac.common.tools.exception.ExceptionUtil;
import com.github.joekerouac.common.tools.log.Logger;
import com.github.joekerouac.common.tools.log.LoggerFactory;
import com.github.joekerouac.common.tools.scheduler.SchedulerSystem;
import com.github.joekerouac.common.tools.scheduler.SchedulerSystemImpl;
import com.github.joekerouac.common.tools.scheduler.SchedulerTask;
import com.github.joekerouac.common.tools.scheduler.SimpleSchedulerTask;
import com.github.joekerouac.common.tools.scheduler.TaskDescriptor;
import com.github.joekerouac.common.tools.string.StringUtils;
import com.github.joekerouac.common.tools.thread.NamedThreadFactory;
import com.github.joekerouac.common.tools.thread.ThreadPoolConfig;
import com.github.joekerouac.common.tools.thread.ThreadUtil;
import com.github.joekerouac.common.tools.util.Assert;
import com.github.joekerouac.common.tools.util.Starter;
import java.sql.SQLException;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

/* loaded from: input_file:com/github/joekerouac/async/task/flow/service/FlowServiceImpl.class */
public class FlowServiceImpl implements FlowService {
    private static final Logger LOGGER = LoggerFactory.getLogger(FlowServiceImpl.class.getName());
    private final int streamNodeMapBatchSize;
    private final IDGenerator idGenerator;
    private final TransactionHook transactionHook;
    private final AsyncTaskService asyncTaskService;
    private final FlowTaskRepository flowTaskRepository;
    private final TaskNodeRepository taskNodeRepository;
    private final TaskNodeMapRepository taskNodeMapRepository;
    private final Map<String, AbstractFlowProcessor<?>> processors;
    private final Map<String, ExecuteStrategy> executeStrategies;
    private final SchedulerSystem schedulerSystem;
    private final SchedulerTask flowTaskLoader;
    private final StreamTaskEngine streamTaskEngine;
    private final Starter starter;

    /* renamed from: com.github.joekerouac.async.task.flow.service.FlowServiceImpl$2, reason: invalid class name */
    /* loaded from: input_file:com/github/joekerouac/async/task/flow/service/FlowServiceImpl$2.class */
    static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$com$github$joekerouac$async$task$flow$enums$StrategyResult = new int[StrategyResult.values().length];

        static {
            try {
                $SwitchMap$com$github$joekerouac$async$task$flow$enums$StrategyResult[StrategyResult.PENDING.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$github$joekerouac$async$task$flow$enums$StrategyResult[StrategyResult.RUNNING.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public FlowServiceImpl(FlowServiceConfig flowServiceConfig) {
        Const.VALIDATION_SERVICE.validate(flowServiceConfig);
        Assert.assertTrue(((flowServiceConfig.getFlowTaskRepository() == null || flowServiceConfig.getTaskNodeRepository() == null || flowServiceConfig.getTaskNodeMapRepository() == null) && flowServiceConfig.getConnectionSelector() == null) ? false : true, "repository和connectionSlector不能同时为空", ExceptionProviderConst.IllegalArgumentExceptionProvider);
        this.streamNodeMapBatchSize = flowServiceConfig.getStreamNodeMapBatchSize();
        this.idGenerator = flowServiceConfig.getIdGenerator();
        this.transactionHook = flowServiceConfig.getTransactionHook();
        this.asyncTaskService = flowServiceConfig.getAsyncTaskService();
        ConnectionSelector connectionSelector = flowServiceConfig.getConnectionSelector();
        this.flowTaskRepository = flowServiceConfig.getFlowTaskRepository() == null ? new FlowTaskRepositoryImpl(connectionSelector) : flowServiceConfig.getFlowTaskRepository();
        this.taskNodeRepository = flowServiceConfig.getTaskNodeRepository() == null ? new TaskNodeRepositoryImpl(connectionSelector) : flowServiceConfig.getTaskNodeRepository();
        this.taskNodeMapRepository = flowServiceConfig.getTaskNodeMapRepository() == null ? new TaskNodeMapRepositoryImpl(connectionSelector) : flowServiceConfig.getTaskNodeMapRepository();
        this.processors = new ConcurrentHashMap();
        Iterator<AbstractFlowProcessor> it = flowServiceConfig.getProcessors().iterator();
        while (it.hasNext()) {
            addProcessor(it.next());
        }
        this.executeStrategies = flowServiceConfig.getExecuteStrategies();
        this.starter = new Starter();
        ThreadPoolConfig threadPoolConfig = new ThreadPoolConfig();
        threadPoolConfig.setCorePoolSize(flowServiceConfig.getFlowTaskBatchSize());
        threadPoolConfig.setMaximumPoolSize(flowServiceConfig.getFlowTaskBatchSize());
        threadPoolConfig.setWorkQueue(new LinkedBlockingQueue());
        threadPoolConfig.setThreadFactory(new NamedThreadFactory("无限流任务图构建线程"));
        threadPoolConfig.setRejectedExecutionHandler((runnable, threadPoolExecutor) -> {
            LOGGER.warn("无限流任务图构建任务 [{}] 被丢弃", new Object[]{runnable});
        });
        this.schedulerSystem = new SchedulerSystemImpl("流式任务调度系统", ThreadUtil.newThreadPool(threadPoolConfig), true);
        AbstractFlowTaskEngine.EngineConfig build = AbstractFlowTaskEngine.EngineConfig.builder().processors(this.processors).asyncTaskService(this.asyncTaskService).flowMonitorService(flowServiceConfig.getFlowMonitorService()).flowTaskRepository(this.flowTaskRepository).taskNodeRepository(this.taskNodeRepository).taskNodeMapRepository(this.taskNodeMapRepository).executeStrategies(this.executeStrategies).connectionSelector(connectionSelector).build();
        this.streamTaskEngine = new StreamTaskEngine(build, this.schedulerSystem);
        this.asyncTaskService.addProcessor(new SetTaskEngine(build));
        this.asyncTaskService.addProcessor(this.streamTaskEngine);
        this.flowTaskLoader = new SimpleSchedulerTask(() -> {
            boolean z = true;
            int i = 0;
            ArrayList<FlowTask> arrayList = new ArrayList();
            while (z) {
                List<FlowTask> selectByType = this.flowTaskRepository.selectByType(FlowTaskType.STREAM, i, 20);
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("本次捞取到的流式任务为： [{}]", new Object[]{selectByType});
                }
                z = !CollectionUtil.isEmpty(selectByType);
                i += 20;
                arrayList.addAll(selectByType);
            }
            Set set = (Set) this.schedulerSystem.getAll().stream().map((v0) -> {
                return v0.getId();
            }).collect(Collectors.toSet());
            for (FlowTask flowTask : arrayList) {
                set.remove(flowTask.getRequestId());
                registerStreamTaskBuildTask(flowTask.getRequestId());
            }
            Iterator it2 = set.iterator();
            while (it2.hasNext()) {
                this.schedulerSystem.removeTask((String) it2.next());
            }
        }, "stream-task-batch-add", true);
        this.flowTaskLoader.setFixedDelay(15000L);
    }

    @Override // com.github.joekerouac.async.task.flow.FlowService
    public void start() {
        this.starter.start(() -> {
            this.schedulerSystem.start();
            this.flowTaskLoader.start();
        });
    }

    @Override // com.github.joekerouac.async.task.flow.FlowService
    public void stop() {
        this.starter.stop(() -> {
            this.flowTaskLoader.stop();
            this.schedulerSystem.stop();
        });
    }

    @Override // com.github.joekerouac.async.task.flow.FlowService
    public void addProcessor(AbstractFlowProcessor<?> abstractFlowProcessor) {
        Assert.notNull(abstractFlowProcessor, "要添加的processor不能为空", ExceptionProviderConst.IllegalArgumentExceptionProvider);
        Assert.assertTrue(abstractFlowProcessor.processors() != null && abstractFlowProcessor.processors().length > 0, StringUtils.format("处理器 [{}] 的处理器名(processors)返回了空", new Object[]{abstractFlowProcessor}), ExceptionProviderConst.CodeErrorExceptionProvider);
        for (String str : abstractFlowProcessor.processors()) {
            Assert.assertTrue(StringUtils.isNotBlank(str), StringUtils.format("处理器 [{}] 的处理器名(processors)中存在空的", new Object[]{abstractFlowProcessor}), ExceptionProviderConst.CodeErrorExceptionProvider);
            AbstractFlowProcessor<?> put = this.processors.put(str, abstractFlowProcessor);
            if (put != null) {
                throw new ProcessorAlreadyExistException(StringUtils.format("当前有两个叫 [{}] 的处理器，分别是：[{}, {}]", new Object[]{str, abstractFlowProcessor, put}), put);
            }
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("注册处理器：[{}:{}]", new Object[]{str, abstractFlowProcessor});
            }
        }
    }

    @Override // com.github.joekerouac.async.task.flow.FlowService
    public AbstractFlowProcessor<?> removeProcessor(String str) {
        Assert.notBlank(str, "要移除的processorName不能为空", ExceptionProviderConst.IllegalArgumentExceptionProvider);
        return this.processors.remove(str);
    }

    @Override // com.github.joekerouac.async.task.flow.FlowService
    public void addTask(FlowTaskModel flowTaskModel) {
        Assert.notNull(flowTaskModel, "要添加的任务不能为null", ExceptionProviderConst.IllegalArgumentExceptionProvider);
        this.starter.runWithStarted(() -> {
            Const.VALIDATION_SERVICE.validate(flowTaskModel);
            if (flowTaskModel instanceof SetTaskModel) {
                addSetTask((SetTaskModel) flowTaskModel);
            } else {
                if (!(flowTaskModel instanceof StreamTaskModel)) {
                    throw new UnsupportedOperationException(StringUtils.format("不支持的task类型： [{}]", new Object[]{flowTaskModel.getClass()}));
                }
                addStreamTask((StreamTaskModel) flowTaskModel);
            }
        });
    }

    @Override // com.github.joekerouac.async.task.flow.FlowService
    public void finishStream(String str) {
        this.flowTaskRepository.updateStatus(str, FlowTaskStatus.FINISH);
    }

    @Override // com.github.joekerouac.async.task.flow.FlowService
    public FlowTaskStatus queryTaskStatus(String str) {
        Assert.notBlank(str, StringUtils.format("任务requestId不能为空", new Object[0]), ExceptionProviderConst.IllegalArgumentExceptionProvider);
        return (FlowTaskStatus) this.starter.runWithStarted(() -> {
            FlowTask selectForLock = this.flowTaskRepository.selectForLock(str);
            if (selectForLock == null) {
                return null;
            }
            return selectForLock.getStatus();
        });
    }

    @Override // com.github.joekerouac.async.task.flow.FlowService
    public TaskNodeStatus queryNodeStatus(String str) {
        Assert.notBlank(str, StringUtils.format("任务节点requestId不能为空", new Object[0]), ExceptionProviderConst.IllegalArgumentExceptionProvider);
        return (TaskNodeStatus) this.starter.runWithStarted(() -> {
            TaskNode selectByRequestId = this.taskNodeRepository.selectByRequestId(str);
            if (selectByRequestId == null) {
                return null;
            }
            return selectByRequestId.getStatus();
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void registerStreamTaskBuildTask(String str) {
        this.schedulerSystem.registerTask(new TaskDescriptor(str, 10000L, () -> {
            try {
                buildNodeMap(str);
            } catch (Throwable th) {
                Throwable rootCause = ExceptionUtil.getRootCause(th);
                if (!(th instanceof SQLException) || SqlUtil.causeForUpdateNowaitError((SQLException) rootCause)) {
                    LOGGER.warn(th, "流式任务批量添加处理失败，等待下次处理", new Object[0]);
                }
            }
        }));
    }

    private boolean buildNodeMap(String str) {
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        TransUtil.run(TransStrategy.REQUIRED, () -> {
            FlowTask selectForLock = this.flowTaskRepository.selectForLock(str);
            Assert.notNull(selectForLock, StringUtils.format("系统错误，当前主任务 [{}] 不存在", new Object[]{str}), ExceptionProviderConst.DBExceptionProvider);
            int i = 0;
            int i2 = this.streamNodeMapBatchSize;
            ArrayList arrayList = new ArrayList();
            while (true) {
                if (i2 <= 0) {
                    break;
                }
                int min = Math.min(100, i2);
                List<TaskNode> selectByStatus = this.taskNodeRepository.selectByStatus(str, TaskNodeStatus.INIT, i, min);
                arrayList.addAll(selectByStatus);
                i2 -= min;
                i += min;
                if (selectByStatus.size() < min) {
                    atomicBoolean.set(false);
                    break;
                }
                atomicBoolean.set(true);
            }
            if (arrayList.isEmpty()) {
                return;
            }
            ArrayList arrayList2 = new ArrayList(arrayList.size());
            TaskNode taskNode = (TaskNode) arrayList.get(0);
            for (int i3 = 1; i3 < arrayList.size(); i3++) {
                TaskNode taskNode2 = (TaskNode) arrayList.get(i3);
                TaskNodeMap taskNodeMap = new TaskNodeMap();
                taskNodeMap.setId(this.idGenerator.generateId());
                taskNodeMap.setTaskRequestId(str);
                taskNodeMap.setParentNode(taskNode.getRequestId());
                taskNodeMap.setChildNode(taskNode2.getRequestId());
                arrayList2.add(taskNodeMap);
                taskNode = taskNode2;
            }
            String lastTaskId = selectForLock.getLastTaskId();
            TaskNode taskNode3 = (TaskNode) arrayList.get(0);
            TaskNode taskNode4 = (TaskNode) arrayList.get(arrayList.size() - 1);
            TaskNodeMap taskNodeMap2 = new TaskNodeMap();
            taskNodeMap2.setId(this.idGenerator.generateId());
            taskNodeMap2.setTaskRequestId(str);
            taskNodeMap2.setParentNode(lastTaskId);
            taskNodeMap2.setChildNode(taskNode3.getRequestId());
            arrayList2.add(taskNodeMap2);
            TaskNodeStatus status = this.taskNodeRepository.selectByRequestId(lastTaskId).getStatus();
            TaskNodeStatus taskNodeStatus = TaskNodeStatus.WAIT;
            if (status == TaskNodeStatus.SUCCESS) {
                taskNodeStatus = TaskNodeStatus.READY;
            } else if (status == TaskNodeStatus.ERROR) {
                switch (AnonymousClass2.$SwitchMap$com$github$joekerouac$async$task$flow$enums$StrategyResult[this.streamTaskEngine.decideNodeStatus(null, taskNode3).ordinal()]) {
                    case TransactionCallback.STATUS_ROLLED_BACK /* 1 */:
                        taskNodeStatus = TaskNodeStatus.PENDING;
                        break;
                    case TransactionCallback.STATUS_UNKNOWN /* 2 */:
                        taskNodeStatus = TaskNodeStatus.READY;
                        break;
                    default:
                        throw new IllegalStateException(StringUtils.format("流式任务节点 [{}] 决策失败，结果不应该式UNKNOWN", new Object[]{taskNode3.getRequestId()}));
                }
            }
            List<String> list = (List) arrayList.stream().map((v0) -> {
                return v0.getRequestId();
            }).filter(str2 -> {
                return !str2.equals(taskNode3.getRequestId());
            }).collect(Collectors.toList());
            this.taskNodeMapRepository.save(arrayList2);
            this.taskNodeRepository.batchUpdateStatus(list, TaskNodeStatus.WAIT);
            this.taskNodeRepository.updateStatus(taskNode3.getRequestId(), taskNodeStatus);
            this.flowTaskRepository.updateLastTaskId(str, taskNode4.getRequestId());
            if (taskNodeStatus != TaskNodeStatus.WAIT) {
                this.asyncTaskService.notifyTask(taskNode3.getRequestId());
            }
        });
        return atomicBoolean.get();
    }

    private void addStreamTask(StreamTaskModel streamTaskModel) {
        TaskNodeModel firstTask = streamTaskModel.getFirstTask();
        final String streamId = streamTaskModel.getStreamId();
        FlowTask flowTask = new FlowTask();
        flowTask.setId(streamId);
        flowTask.setRequestId(streamId);
        flowTask.setType(FlowTaskType.STREAM);
        flowTask.setFirstTaskId(firstTask.getRequestId());
        flowTask.setStatus(FlowTaskStatus.RUNNING);
        Pair<List<TaskNode>, List<TaskNodeMap>> buildGraph = buildGraph(firstTask, 1, streamId, null, new Pair<>(new ArrayList(), new ArrayList()), new HashSet(), new HashSet());
        List list = (List) buildGraph.getKey();
        flowTask.setLastTaskId(((TaskNode) list.get(list.size() - 1)).getRequestId());
        TransUtil.run(TransStrategy.REQUIRED, () -> {
            boolean z = false;
            if (this.flowTaskRepository.selectForLock(streamId) == null) {
                try {
                    this.flowTaskRepository.save(flowTask);
                    z = true;
                } catch (RuntimeException e) {
                    Throwable rootCause = ExceptionUtil.getRootCause(e);
                    if (!(rootCause instanceof SQLException) || !SqlUtil.causeDuplicateKey((SQLException) rootCause)) {
                        throw e;
                    }
                    Assert.notNull(this.flowTaskRepository.selectForLock(streamId), StringUtils.format("程序未知BUG，当前任务[{}]不存在", new Object[]{streamId}), ExceptionProviderConst.IllegalStateExceptionProvider);
                }
            }
            if (!z) {
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    ((TaskNode) it.next()).setStatus(TaskNodeStatus.INIT);
                }
            } else if (((List) buildGraph.getValue()).size() > 0) {
                this.taskNodeMapRepository.save((List) buildGraph.getValue());
            }
            this.taskNodeRepository.save(list);
            Iterator it2 = list.iterator();
            while (it2.hasNext()) {
                TaskNode taskNode = (TaskNode) it2.next();
                this.asyncTaskService.addTaskWithWait(taskNode.getRequestId(), taskNode.getRequestId(), taskNode.getMaxRetry(), LocalDateTime.now(), StreamTaskEngine.PROCESSOR_NAME, TransStrategy.SUPPORTS);
            }
            if (z) {
                this.taskNodeRepository.updateStatus(firstTask.getRequestId(), TaskNodeStatus.READY);
                this.asyncTaskService.notifyTask(firstTask.getRequestId());
            }
        });
        if (this.transactionHook != null && this.transactionHook.isActualTransactionActive()) {
            this.transactionHook.registerCallback(new TransactionCallback() { // from class: com.github.joekerouac.async.task.flow.service.FlowServiceImpl.1
                @Override // com.github.joekerouac.async.task.spi.TransactionCallback
                public void afterCommit() throws RuntimeException {
                    FlowServiceImpl.this.registerStreamTaskBuildTask(streamId);
                    FlowServiceImpl.this.schedulerSystem.scheduler(streamId, false);
                }
            });
        } else {
            registerStreamTaskBuildTask(streamId);
            this.schedulerSystem.scheduler(streamId, false);
        }
    }

    private void addSetTask(SetTaskModel setTaskModel) {
        String requestId = setTaskModel.getRequestId();
        FlowTask flowTask = new FlowTask();
        flowTask.setId(this.idGenerator.generateId());
        flowTask.setRequestId(requestId);
        flowTask.setType(FlowTaskType.SET);
        flowTask.setFirstTaskId(setTaskModel.getFirstTask().getRequestId());
        flowTask.setLastTaskId(setTaskModel.getLastTask().getRequestId());
        flowTask.setStatus(FlowTaskStatus.RUNNING);
        Pair<List<TaskNode>, List<TaskNodeMap>> buildGraph = buildGraph(setTaskModel.getFirstTask(), 0, requestId, setTaskModel.getLastTask(), new Pair<>(new ArrayList(), new ArrayList()), new HashSet(), new HashSet());
        TransUtil.run(TransStrategy.REQUIRED, () -> {
            this.flowTaskRepository.save(flowTask);
            for (TaskNode taskNode : (List) buildGraph.getKey()) {
                if (taskNode.getRequestId().equals(setTaskModel.getFirstTask().getRequestId())) {
                    taskNode.setStatus(TaskNodeStatus.READY);
                }
            }
            this.taskNodeRepository.save((List) buildGraph.getKey());
            this.taskNodeMapRepository.save((List) buildGraph.getValue());
            for (TaskNode taskNode2 : (List) buildGraph.getKey()) {
                this.asyncTaskService.addTaskWithWait(taskNode2.getRequestId(), taskNode2.getRequestId(), taskNode2.getMaxRetry(), LocalDateTime.now(), SetTaskEngine.PROCESSOR_NAME, TransStrategy.SUPPORTS);
            }
            this.asyncTaskService.notifyTask(flowTask.getFirstTaskId());
        });
    }

    private Pair<List<TaskNode>, List<TaskNodeMap>> buildGraph(TaskNodeModel taskNodeModel, int i, String str, TaskNodeModel taskNodeModel2, Pair<List<TaskNode>, List<TaskNodeMap>> pair, Set<String> set, Set<String> set2) {
        Assert.assertTrue(set.add(taskNodeModel.getRequestId()), StringUtils.format("当前在依赖 [{}] 处存在环形依赖，请检测， [{}]", new Object[]{taskNodeModel.getRequestId(), set}), ExceptionProviderConst.IllegalArgumentExceptionProvider);
        if (set2.add(taskNodeModel.getRequestId())) {
            ((List) pair.getKey()).add(build(taskNodeModel, this.idGenerator, str));
        }
        List<TaskNodeModel> allChild = taskNodeModel.getAllChild();
        if (!CollectionUtil.isEmpty(allChild)) {
            if (i > 0) {
                Assert.assertTrue(allChild.size() <= i, StringUtils.format("当前允许的最大子节点数为： [{}]， 当前节点 [{}] 有 [{}] 个子节点", new Object[]{Integer.valueOf(i), taskNodeModel.getRequestId(), Integer.valueOf(allChild.size())}), ExceptionProviderConst.IllegalArgumentExceptionProvider);
            }
            for (TaskNodeModel taskNodeModel3 : allChild) {
                TaskNodeMap taskNodeMap = new TaskNodeMap();
                taskNodeMap.setId(this.idGenerator.generateId());
                taskNodeMap.setTaskRequestId(str);
                taskNodeMap.setParentNode(taskNodeModel.getRequestId());
                taskNodeMap.setChildNode(taskNodeModel3.getRequestId());
                ((List) pair.getValue()).add(taskNodeMap);
                buildGraph(taskNodeModel3, i, str, taskNodeModel2, pair, set, set2);
            }
        } else if (taskNodeModel2 != null) {
            Assert.assertTrue(taskNodeModel2 == taskNodeModel, StringUtils.format("当前任务[{}]的结束节点[{}]存在多个对象", new Object[]{str, taskNodeModel.getRequestId(), taskNodeModel2.getRequestId()}), ExceptionProviderConst.IllegalArgumentExceptionProvider);
        }
        set.remove(taskNodeModel.getRequestId());
        return pair;
    }

    private TaskNode build(TaskNodeModel taskNodeModel, IDGenerator iDGenerator, String str) {
        String orDefault = StringUtils.getOrDefault(taskNodeModel.getProcessor(), taskNodeModel.getData().getClass().getSimpleName());
        AbstractFlowProcessor<?> abstractFlowProcessor = this.processors.get(orDefault);
        Assert.notNull(abstractFlowProcessor, StringUtils.format("任务 [{}:{}] 对应的处理器 [{}] 不存在", new Object[]{str, taskNodeModel.getRequestId(), orDefault}), ExceptionProviderConst.IllegalArgumentExceptionProvider);
        Assert.notNull(this.executeStrategies.get(taskNodeModel.getExecuteStrategy()), StringUtils.format("任务 [{}:{}] 对应的执行策略 [{}] 不存在", new Object[]{str, taskNodeModel.getRequestId(), taskNodeModel.getExecuteStrategy()}), ExceptionProviderConst.IllegalArgumentExceptionProvider);
        TaskNode taskNode = new TaskNode();
        taskNode.setId(iDGenerator.generateId());
        taskNode.setRequestId(taskNodeModel.getRequestId());
        taskNode.setTaskRequestId(str);
        taskNode.setNodeData(abstractFlowProcessor.serialize(taskNodeModel.getData()));
        taskNode.setProcessor(orDefault);
        taskNode.setStatus(TaskNodeStatus.WAIT);
        taskNode.setFailStrategy(taskNodeModel.getFailStrategy());
        taskNode.setExecuteStrategy(taskNodeModel.getExecuteStrategy());
        taskNode.setStrategyContext(StringUtils.getOrDefault(taskNodeModel.getStrategyContext(), Const.NULL));
        taskNode.setMaxRetry(taskNodeModel.getMaxRetry());
        return taskNode;
    }
}
