/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.datax.core.job.scheduler;

import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.spi.ErrorCode;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.statistics.communication.Communication;
import com.alibaba.datax.core.statistics.communication.CommunicationTool;
import com.alibaba.datax.core.statistics.container.communicator.AbstractContainerCommunicator;
import com.alibaba.datax.core.util.ErrorRecordChecker;
import com.alibaba.datax.core.util.FrameworkErrorCode;
import com.alibaba.datax.dataxservice.face.domain.enums.State;
import java.util.List;
import org.apache.commons.lang.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractScheduler {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractScheduler.class);
    private ErrorRecordChecker errorLimit;
    private AbstractContainerCommunicator containerCommunicator;
    private Long jobId;

    public Long getJobId() {
        return this.jobId;
    }

    public AbstractScheduler(AbstractContainerCommunicator containerCommunicator) {
        this.containerCommunicator = containerCommunicator;
    }

    public void schedule(List<Configuration> configurations) {
        Validate.notNull(configurations, (String)"scheduler\u914d\u7f6e\u4e0d\u80fd\u4e3a\u7a7a");
        int jobReportIntervalInMillSec = configurations.get(0).getInt("core.container.job.reportInterval", 30000);
        int jobSleepIntervalInMillSec = configurations.get(0).getInt("core.container.job.sleepInterval", 10000);
        this.jobId = configurations.get(0).getLong("core.container.job.id");
        this.errorLimit = new ErrorRecordChecker(configurations.get(0));
        this.containerCommunicator.registerCommunication(configurations);
        int totalTasks = this.calculateTaskCount(configurations);
        this.startAllTaskGroup(configurations);
        Communication lastJobContainerCommunication = new Communication();
        long lastReportTimeStamp = System.currentTimeMillis();
        try {
            while (true) {
                Communication nowJobContainerCommunication = this.containerCommunicator.collect();
                nowJobContainerCommunication.setTimestamp(System.currentTimeMillis());
                LOG.debug(nowJobContainerCommunication.toString());
                long now = System.currentTimeMillis();
                if (now - lastReportTimeStamp > (long)jobReportIntervalInMillSec) {
                    Communication reportCommunication = CommunicationTool.getReportCommunication(nowJobContainerCommunication, lastJobContainerCommunication, totalTasks);
                    this.containerCommunicator.report(reportCommunication);
                    lastReportTimeStamp = now;
                    lastJobContainerCommunication = nowJobContainerCommunication;
                }
                this.errorLimit.checkRecordLimit(nowJobContainerCommunication);
                if (nowJobContainerCommunication.getState() == State.SUCCEEDED) {
                    LOG.info("Scheduler accomplished all tasks.");
                    break;
                }
                if (this.isJobKilling(this.getJobId())) {
                    this.dealKillingStat(this.containerCommunicator, totalTasks);
                } else if (nowJobContainerCommunication.getState() == State.FAILED) {
                    this.dealFailedStat(this.containerCommunicator, nowJobContainerCommunication.getThrowable());
                }
                Thread.sleep(jobSleepIntervalInMillSec);
            }
        }
        catch (InterruptedException e) {
            LOG.error("\u6355\u83b7\u5230InterruptedException\u5f02\u5e38!", (Throwable)e);
            throw DataXException.asDataXException((ErrorCode)FrameworkErrorCode.RUNTIME_ERROR, e);
        }
    }

    protected abstract void startAllTaskGroup(List<Configuration> var1);

    protected abstract void dealFailedStat(AbstractContainerCommunicator var1, Throwable var2);

    protected abstract void dealKillingStat(AbstractContainerCommunicator var1, int var2);

    private int calculateTaskCount(List<Configuration> configurations) {
        int totalTasks = 0;
        for (Configuration taskGroupConfiguration : configurations) {
            totalTasks += taskGroupConfiguration.getListConfiguration("job.content").size();
        }
        return totalTasks;
    }

    protected abstract boolean isJobKilling(Long var1);
}

