/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.datax.core.taskgroup;

import com.alibaba.datax.common.constant.PluginType;
import com.alibaba.datax.common.exception.CommonErrorCode;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.spi.ErrorCode;
import com.alibaba.datax.common.statistics.PerfRecord;
import com.alibaba.datax.common.statistics.PerfTrace;
import com.alibaba.datax.common.statistics.VMInfo;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.AbstractContainer;
import com.alibaba.datax.core.statistics.communication.Communication;
import com.alibaba.datax.core.statistics.communication.CommunicationTool;
import com.alibaba.datax.core.statistics.container.communicator.taskgroup.StandaloneTGContainerCommunicator;
import com.alibaba.datax.core.statistics.plugin.task.AbstractTaskPluginCollector;
import com.alibaba.datax.core.taskgroup.TaskMonitor;
import com.alibaba.datax.core.taskgroup.runner.AbstractRunner;
import com.alibaba.datax.core.taskgroup.runner.ReaderRunner;
import com.alibaba.datax.core.taskgroup.runner.WriterRunner;
import com.alibaba.datax.core.transport.channel.Channel;
import com.alibaba.datax.core.transport.exchanger.BufferedRecordExchanger;
import com.alibaba.datax.core.transport.exchanger.BufferedRecordTransformerExchanger;
import com.alibaba.datax.core.transport.transformer.TransformerExecution;
import com.alibaba.datax.core.util.ClassUtil;
import com.alibaba.datax.core.util.FrameworkErrorCode;
import com.alibaba.datax.core.util.TransformerUtil;
import com.alibaba.datax.core.util.container.LoadUtil;
import com.alibaba.datax.dataxservice.face.domain.enums.State;
import com.alibaba.fastjson.JSON;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskGroupContainer
extends AbstractContainer {
    private static final Logger LOG = LoggerFactory.getLogger(TaskGroupContainer.class);
    private long jobId;
    private int taskGroupId;
    private String channelClazz;
    private String taskCollectorClass;
    private TaskMonitor taskMonitor = TaskMonitor.getInstance();

    public TaskGroupContainer(Configuration configuration) {
        super(configuration);
        this.initCommunicator(configuration);
        this.jobId = this.configuration.getLong("core.container.job.id");
        this.taskGroupId = this.configuration.getInt("core.container.taskGroup.id");
        this.channelClazz = this.configuration.getString("core.transport.channel.class");
        this.taskCollectorClass = this.configuration.getString("core.statistics.collector.plugin.taskClass");
    }

    private void initCommunicator(Configuration configuration) {
        super.setContainerCommunicator(new StandaloneTGContainerCommunicator(configuration));
    }

    public long getJobId() {
        return this.jobId;
    }

    public int getTaskGroupId() {
        return this.taskGroupId;
    }

    @Override
    public void start() {
        try {
            int sleepIntervalInMillSec = this.configuration.getInt("core.container.taskGroup.sleepInterval", 100);
            long reportIntervalInMillSec = this.configuration.getLong("core.container.taskGroup.reportInterval", 10000L);
            int channelNumber = this.configuration.getInt("core.container.taskGroup.channel");
            int taskMaxRetryTimes = this.configuration.getInt("core.container.task.failOver.maxRetryTimes", 1);
            long taskRetryIntervalInMsec = this.configuration.getLong("core.container.task.failOver.retryIntervalInMsec", 10000L);
            long taskMaxWaitInMsec = this.configuration.getLong("core.container.task.failOver.maxWaitInMsec", 60000L);
            List<Configuration> taskConfigs = this.configuration.getListConfiguration("job.content");
            if (LOG.isDebugEnabled()) {
                LOG.debug("taskGroup[{}]'s task configs[{}]", (Object)this.taskGroupId, (Object)JSON.toJSONString(taskConfigs));
            }
            int taskCountInThisTaskGroup = taskConfigs.size();
            LOG.info(String.format("taskGroupId=[%d] start [%d] channels for [%d] tasks.", this.taskGroupId, channelNumber, taskCountInThisTaskGroup));
            this.containerCommunicator.registerCommunication(taskConfigs);
            Map<Integer, Configuration> taskConfigMap = this.buildTaskConfigMap(taskConfigs);
            List<Configuration> taskQueue = this.buildRemainTasks(taskConfigs);
            HashMap<Integer, TaskExecutor> taskFailedExecutorMap = new HashMap<Integer, TaskExecutor>();
            ArrayList<TaskExecutor> runTasks = new ArrayList<TaskExecutor>(channelNumber);
            HashMap<Integer, Long> taskStartTimeMap = new HashMap<Integer, Long>();
            long lastReportTimeStamp = 0L;
            Communication lastTaskGroupContainerCommunication = new Communication();
            while (true) {
                Integer taskId;
                boolean failedOrKilled = false;
                Map<Integer, Communication> communicationMap = this.containerCommunicator.getCommunicationMap();
                for (Map.Entry<Integer, Communication> entry : communicationMap.entrySet()) {
                    Long taskStartTime;
                    taskId = entry.getKey();
                    Communication taskCommunication = entry.getValue();
                    if (!taskCommunication.isFinished()) continue;
                    TaskExecutor taskExecutor = this.removeTask(runTasks, taskId);
                    this.taskMonitor.removeTask(taskId);
                    if (taskCommunication.getState() == State.FAILED) {
                        taskFailedExecutorMap.put(taskId, taskExecutor);
                        if (taskExecutor.supportFailOver() && taskExecutor.getAttemptCount() < taskMaxRetryTimes) {
                            taskExecutor.shutdown();
                            this.containerCommunicator.resetCommunication(taskId);
                            Configuration taskConfig = taskConfigMap.get(taskId);
                            taskQueue.add(taskConfig);
                            continue;
                        }
                        failedOrKilled = true;
                        break;
                    }
                    if (taskCommunication.getState() == State.KILLED) {
                        failedOrKilled = true;
                        break;
                    }
                    if (taskCommunication.getState() != State.SUCCEEDED || (taskStartTime = (Long)taskStartTimeMap.get(taskId)) == null) continue;
                    Long usedTime = System.currentTimeMillis() - taskStartTime;
                    LOG.info("taskGroup[{}] taskId[{}] is successed, used[{}]ms", new Object[]{this.taskGroupId, taskId, usedTime});
                    PerfRecord.addPerfRecord(this.taskGroupId, taskId, PerfRecord.PHASE.TASK_TOTAL, taskStartTime, usedTime * 1000L * 1000L);
                    taskStartTimeMap.remove(taskId);
                    taskConfigMap.remove(taskId);
                }
                if (failedOrKilled) {
                    lastTaskGroupContainerCommunication = this.reportTaskGroupCommunication(lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
                    throw DataXException.asDataXException((ErrorCode)FrameworkErrorCode.PLUGIN_RUNTIME_ERROR, lastTaskGroupContainerCommunication.getThrowable());
                }
                Iterator<Configuration> iterator = taskQueue.iterator();
                while (iterator.hasNext() && runTasks.size() < channelNumber) {
                    Configuration taskConfig = iterator.next();
                    taskId = taskConfig.getInt("taskId");
                    int attemptCount = 1;
                    TaskExecutor lastExecutor = (TaskExecutor)taskFailedExecutorMap.get(taskId);
                    if (lastExecutor != null) {
                        long failedTime;
                        attemptCount = lastExecutor.getAttemptCount() + 1;
                        long now = System.currentTimeMillis();
                        if (now - (failedTime = lastExecutor.getTimeStamp()) < taskRetryIntervalInMsec) continue;
                        if (!lastExecutor.isShutdown()) {
                            if (now - failedTime > taskMaxWaitInMsec) {
                                this.markCommunicationFailed(taskId);
                                this.reportTaskGroupCommunication(lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
                                throw DataXException.asDataXException((ErrorCode)CommonErrorCode.WAIT_TIME_EXCEED, "task failover\u7b49\u5f85\u8d85\u65f6");
                            }
                            lastExecutor.shutdown();
                            continue;
                        }
                        LOG.info("taskGroup[{}] taskId[{}] attemptCount[{}] has already shutdown", new Object[]{this.taskGroupId, taskId, lastExecutor.getAttemptCount()});
                    }
                    Configuration taskConfigForRun = taskMaxRetryTimes > 1 ? taskConfig.clone() : taskConfig;
                    TaskExecutor taskExecutor = new TaskExecutor(taskConfigForRun, attemptCount);
                    taskStartTimeMap.put(taskId, System.currentTimeMillis());
                    taskExecutor.doStart();
                    iterator.remove();
                    runTasks.add(taskExecutor);
                    this.taskMonitor.registerTask(taskId, this.containerCommunicator.getCommunication(taskId));
                    taskFailedExecutorMap.remove(taskId);
                    LOG.info("taskGroup[{}] taskId[{}] attemptCount[{}] is started", new Object[]{this.taskGroupId, taskId, attemptCount});
                }
                if (taskQueue.isEmpty() && this.isAllTaskDone(runTasks) && this.containerCommunicator.collectState() == State.SUCCEEDED) break;
                long now = System.currentTimeMillis();
                if (now - lastReportTimeStamp > reportIntervalInMillSec) {
                    lastTaskGroupContainerCommunication = this.reportTaskGroupCommunication(lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
                    lastReportTimeStamp = now;
                    for (TaskExecutor taskExecutor : runTasks) {
                        this.taskMonitor.report(taskExecutor.getTaskId(), this.containerCommunicator.getCommunication(taskExecutor.getTaskId()));
                    }
                }
                Thread.sleep(sleepIntervalInMillSec);
            }
            lastTaskGroupContainerCommunication = this.reportTaskGroupCommunication(lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
            LOG.info("taskGroup[{}] completed it's tasks.", (Object)this.taskGroupId);
            this.reportTaskGroupCommunication(lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
        }
        catch (Throwable e) {
            Communication nowTaskGroupContainerCommunication = this.containerCommunicator.collect();
            if (nowTaskGroupContainerCommunication.getThrowable() == null) {
                nowTaskGroupContainerCommunication.setThrowable(e);
            }
            nowTaskGroupContainerCommunication.setState(State.FAILED);
            this.containerCommunicator.report(nowTaskGroupContainerCommunication);
            throw DataXException.asDataXException((ErrorCode)FrameworkErrorCode.RUNTIME_ERROR, e);
        }
        finally {
            if (!PerfTrace.getInstance().isJob()) {
                VMInfo vmInfo = VMInfo.getVmInfo();
                if (vmInfo != null) {
                    vmInfo.getDelta(false);
                    LOG.info(vmInfo.totalString());
                }
                LOG.info(PerfTrace.getInstance().summarizeNoException());
            }
        }
    }

    private Map<Integer, Configuration> buildTaskConfigMap(List<Configuration> configurations) {
        HashMap<Integer, Configuration> map = new HashMap<Integer, Configuration>();
        for (Configuration taskConfig : configurations) {
            int taskId = taskConfig.getInt("taskId");
            map.put(taskId, taskConfig);
        }
        return map;
    }

    private List<Configuration> buildRemainTasks(List<Configuration> configurations) {
        LinkedList<Configuration> remainTasks = new LinkedList<Configuration>();
        for (Configuration taskConfig : configurations) {
            remainTasks.add(taskConfig);
        }
        return remainTasks;
    }

    private TaskExecutor removeTask(List<TaskExecutor> taskList, int taskId) {
        Iterator<TaskExecutor> iterator = taskList.iterator();
        while (iterator.hasNext()) {
            TaskExecutor taskExecutor = iterator.next();
            if (taskExecutor.getTaskId() != taskId) continue;
            iterator.remove();
            return taskExecutor;
        }
        return null;
    }

    private boolean isAllTaskDone(List<TaskExecutor> taskList) {
        for (TaskExecutor taskExecutor : taskList) {
            if (taskExecutor.isTaskFinished()) continue;
            return false;
        }
        return true;
    }

    private Communication reportTaskGroupCommunication(Communication lastTaskGroupContainerCommunication, int taskCount) {
        Communication nowTaskGroupContainerCommunication = this.containerCommunicator.collect();
        nowTaskGroupContainerCommunication.setTimestamp(System.currentTimeMillis());
        Communication reportCommunication = CommunicationTool.getReportCommunication(nowTaskGroupContainerCommunication, lastTaskGroupContainerCommunication, taskCount);
        this.containerCommunicator.report(reportCommunication);
        return reportCommunication;
    }

    private void markCommunicationFailed(Integer taskId) {
        Communication communication = this.containerCommunicator.getCommunication(taskId);
        communication.setState(State.FAILED);
    }

    class TaskExecutor {
        private Configuration taskConfig;
        private int taskId;
        private int attemptCount;
        private Channel channel;
        private Thread readerThread;
        private Thread writerThread;
        private ReaderRunner readerRunner;
        private WriterRunner writerRunner;
        private Communication taskCommunication;

        public TaskExecutor(Configuration taskConf, int attemptCount) {
            this.taskConfig = taskConf;
            Validate.isTrue((null != this.taskConfig.getConfiguration("reader") && null != this.taskConfig.getConfiguration("reader") ? 1 : 0) != 0, (String)"[reader|writer]\u7684\u63d2\u4ef6\u53c2\u6570\u4e0d\u80fd\u4e3a\u7a7a!", (Object[])new Object[0]);
            this.taskId = this.taskConfig.getInt("taskId");
            this.attemptCount = attemptCount;
            this.taskCommunication = TaskGroupContainer.this.containerCommunicator.getCommunication(this.taskId);
            Validate.notNull((Object)this.taskCommunication, (String)String.format("taskId[%d]\u7684Communication\u6ca1\u6709\u6ce8\u518c\u8fc7", this.taskId), (Object[])new Object[0]);
            this.channel = ClassUtil.instantiate(TaskGroupContainer.this.channelClazz, Channel.class, TaskGroupContainer.this.configuration);
            this.channel.setCommunication(this.taskCommunication);
            List<TransformerExecution> transformerInfoExecs = TransformerUtil.buildTransformerInfo(this.taskConfig);
            this.writerRunner = (WriterRunner)this.generateRunner(PluginType.WRITER);
            this.writerThread = new Thread((Runnable)this.writerRunner, String.format("%d-%d-%d-writer", TaskGroupContainer.this.jobId, TaskGroupContainer.this.taskGroupId, this.taskId));
            this.writerThread.setContextClassLoader(LoadUtil.getJarLoader(PluginType.WRITER, this.taskConfig.getString("writer.name")));
            this.readerRunner = (ReaderRunner)this.generateRunner(PluginType.READER, transformerInfoExecs);
            this.readerThread = new Thread((Runnable)this.readerRunner, String.format("%d-%d-%d-reader", TaskGroupContainer.this.jobId, TaskGroupContainer.this.taskGroupId, this.taskId));
            this.readerThread.setContextClassLoader(LoadUtil.getJarLoader(PluginType.READER, this.taskConfig.getString("reader.name")));
        }

        public void doStart() {
            this.writerThread.start();
            if (!this.writerThread.isAlive() || this.taskCommunication.getState() == State.FAILED) {
                throw DataXException.asDataXException((ErrorCode)FrameworkErrorCode.RUNTIME_ERROR, this.taskCommunication.getThrowable());
            }
            this.readerThread.start();
            if (!this.readerThread.isAlive() && this.taskCommunication.getState() == State.FAILED) {
                throw DataXException.asDataXException((ErrorCode)FrameworkErrorCode.RUNTIME_ERROR, this.taskCommunication.getThrowable());
            }
        }

        private AbstractRunner generateRunner(PluginType pluginType) {
            return this.generateRunner(pluginType, null);
        }

        private AbstractRunner generateRunner(PluginType pluginType, List<TransformerExecution> transformerInfoExecs) {
            AbstractRunner newRunner = null;
            switch (pluginType) {
                case READER: {
                    newRunner = LoadUtil.loadPluginRunner(pluginType, this.taskConfig.getString("reader.name"));
                    newRunner.setJobConf(this.taskConfig.getConfiguration("reader.parameter"));
                    TaskPluginCollector pluginCollector = ClassUtil.instantiate(TaskGroupContainer.this.taskCollectorClass, AbstractTaskPluginCollector.class, new Object[]{TaskGroupContainer.this.configuration, this.taskCommunication, PluginType.READER});
                    RecordReceiver recordSender = transformerInfoExecs != null && transformerInfoExecs.size() > 0 ? new BufferedRecordTransformerExchanger(TaskGroupContainer.this.taskGroupId, this.taskId, this.channel, this.taskCommunication, pluginCollector, transformerInfoExecs) : new BufferedRecordExchanger(this.channel, pluginCollector);
                    ((ReaderRunner)newRunner).setRecordSender((RecordSender)((Object)recordSender));
                    newRunner.setTaskPluginCollector(pluginCollector);
                    break;
                }
                case WRITER: {
                    newRunner = LoadUtil.loadPluginRunner(pluginType, this.taskConfig.getString("writer.name"));
                    newRunner.setJobConf(this.taskConfig.getConfiguration("writer.parameter"));
                    TaskPluginCollector pluginCollector = ClassUtil.instantiate(TaskGroupContainer.this.taskCollectorClass, AbstractTaskPluginCollector.class, new Object[]{TaskGroupContainer.this.configuration, this.taskCommunication, PluginType.WRITER});
                    ((WriterRunner)newRunner).setRecordReceiver(new BufferedRecordExchanger(this.channel, pluginCollector));
                    newRunner.setTaskPluginCollector(pluginCollector);
                    break;
                }
                default: {
                    throw DataXException.asDataXException((ErrorCode)FrameworkErrorCode.ARGUMENT_ERROR, "Cant generateRunner for:" + (Object)((Object)pluginType));
                }
            }
            newRunner.setTaskGroupId(TaskGroupContainer.this.taskGroupId);
            newRunner.setTaskId(this.taskId);
            newRunner.setRunnerCommunication(this.taskCommunication);
            return newRunner;
        }

        private boolean isTaskFinished() {
            if (this.readerThread.isAlive() || this.writerThread.isAlive()) {
                return false;
            }
            return this.taskCommunication != null && this.taskCommunication.isFinished();
        }

        private int getTaskId() {
            return this.taskId;
        }

        private long getTimeStamp() {
            return this.taskCommunication.getTimestamp();
        }

        private int getAttemptCount() {
            return this.attemptCount;
        }

        private boolean supportFailOver() {
            return this.writerRunner.supportFailOver();
        }

        private void shutdown() {
            this.writerRunner.shutdown();
            this.readerRunner.shutdown();
            if (this.writerThread.isAlive()) {
                this.writerThread.interrupt();
            }
            if (this.readerThread.isAlive()) {
                this.readerThread.interrupt();
            }
        }

        private boolean isShutdown() {
            return !this.readerThread.isAlive() && !this.writerThread.isAlive();
        }
    }
}

