/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.datax.core.taskgroup.runner;

import com.alibaba.datax.common.plugin.AbstractTaskPlugin;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.spi.Reader;
import com.alibaba.datax.common.statistics.PerfRecord;
import com.alibaba.datax.core.statistics.communication.CommunicationTool;
import com.alibaba.datax.core.taskgroup.runner.AbstractRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReaderRunner
extends AbstractRunner
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(ReaderRunner.class);
    private RecordSender recordSender;

    public void setRecordSender(RecordSender recordSender) {
        this.recordSender = recordSender;
    }

    public ReaderRunner(AbstractTaskPlugin abstractTaskPlugin) {
        super(abstractTaskPlugin);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        assert (null != this.recordSender);
        Reader.Task taskReader = (Reader.Task)this.getPlugin();
        PerfRecord channelWaitWrite = new PerfRecord(this.getTaskGroupId(), this.getTaskId(), PerfRecord.PHASE.WAIT_WRITE_TIME);
        try {
            channelWaitWrite.start();
            LOG.debug("task reader starts to do init ...");
            PerfRecord initPerfRecord = new PerfRecord(this.getTaskGroupId(), this.getTaskId(), PerfRecord.PHASE.READ_TASK_INIT);
            initPerfRecord.start();
            taskReader.init();
            initPerfRecord.end();
            LOG.debug("task reader starts to do prepare ...");
            PerfRecord preparePerfRecord = new PerfRecord(this.getTaskGroupId(), this.getTaskId(), PerfRecord.PHASE.READ_TASK_PREPARE);
            preparePerfRecord.start();
            taskReader.prepare();
            preparePerfRecord.end();
            LOG.debug("task reader starts to read ...");
            PerfRecord dataPerfRecord = new PerfRecord(this.getTaskGroupId(), this.getTaskId(), PerfRecord.PHASE.READ_TASK_DATA);
            dataPerfRecord.start();
            taskReader.startRead(this.recordSender);
            this.recordSender.terminate();
            dataPerfRecord.addCount(CommunicationTool.getTotalReadRecords(super.getRunnerCommunication()));
            dataPerfRecord.addSize(CommunicationTool.getTotalReadBytes(super.getRunnerCommunication()));
            dataPerfRecord.end();
            LOG.debug("task reader starts to do post ...");
            PerfRecord postPerfRecord = new PerfRecord(this.getTaskGroupId(), this.getTaskId(), PerfRecord.PHASE.READ_TASK_POST);
            postPerfRecord.start();
            taskReader.post();
            postPerfRecord.end();
        }
        catch (Throwable e) {
            LOG.error("Reader runner Received Exceptions:", e);
            super.markFail(e);
        }
        finally {
            LOG.debug("task reader starts to do destroy ...");
            PerfRecord desPerfRecord = new PerfRecord(this.getTaskGroupId(), this.getTaskId(), PerfRecord.PHASE.READ_TASK_DESTROY);
            desPerfRecord.start();
            super.destroy();
            desPerfRecord.end();
            channelWaitWrite.end(super.getRunnerCommunication().getLongCounter("waitWriterTime"));
            long transformerUsedTime = super.getRunnerCommunication().getLongCounter("totalTransformerUsedTime");
            if (transformerUsedTime > 0L) {
                PerfRecord transformerRecord = new PerfRecord(this.getTaskGroupId(), this.getTaskId(), PerfRecord.PHASE.TRANSFORMER_TIME);
                transformerRecord.start();
                transformerRecord.end(transformerUsedTime);
            }
        }
    }

    @Override
    public void shutdown() {
        this.recordSender.shutdown();
    }
}

