/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.datax.core.taskgroup.runner;

import com.alibaba.datax.common.plugin.AbstractTaskPlugin;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.statistics.PerfRecord;
import com.alibaba.datax.core.statistics.communication.CommunicationTool;
import com.alibaba.datax.core.taskgroup.runner.AbstractRunner;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WriterRunner
extends AbstractRunner
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(WriterRunner.class);
    private RecordReceiver recordReceiver;

    public void setRecordReceiver(RecordReceiver receiver) {
        this.recordReceiver = receiver;
    }

    public WriterRunner(AbstractTaskPlugin abstractTaskPlugin) {
        super(abstractTaskPlugin);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        Validate.isTrue((this.recordReceiver != null ? 1 : 0) != 0);
        Writer.Task taskWriter = (Writer.Task)this.getPlugin();
        PerfRecord channelWaitRead = new PerfRecord(this.getTaskGroupId(), this.getTaskId(), PerfRecord.PHASE.WAIT_READ_TIME);
        try {
            channelWaitRead.start();
            LOG.debug("task writer starts to do init ...");
            PerfRecord initPerfRecord = new PerfRecord(this.getTaskGroupId(), this.getTaskId(), PerfRecord.PHASE.WRITE_TASK_INIT);
            initPerfRecord.start();
            taskWriter.init();
            initPerfRecord.end();
            LOG.debug("task writer starts to do prepare ...");
            PerfRecord preparePerfRecord = new PerfRecord(this.getTaskGroupId(), this.getTaskId(), PerfRecord.PHASE.WRITE_TASK_PREPARE);
            preparePerfRecord.start();
            taskWriter.prepare();
            preparePerfRecord.end();
            LOG.debug("task writer starts to write ...");
            PerfRecord dataPerfRecord = new PerfRecord(this.getTaskGroupId(), this.getTaskId(), PerfRecord.PHASE.WRITE_TASK_DATA);
            dataPerfRecord.start();
            taskWriter.startWrite(this.recordReceiver);
            dataPerfRecord.addCount(CommunicationTool.getTotalReadRecords(super.getRunnerCommunication()));
            dataPerfRecord.addSize(CommunicationTool.getTotalReadBytes(super.getRunnerCommunication()));
            dataPerfRecord.end();
            LOG.debug("task writer starts to do post ...");
            PerfRecord postPerfRecord = new PerfRecord(this.getTaskGroupId(), this.getTaskId(), PerfRecord.PHASE.WRITE_TASK_POST);
            postPerfRecord.start();
            taskWriter.post();
            postPerfRecord.end();
            super.markSuccess();
        }
        catch (Throwable e) {
            LOG.error("Writer Runner Received Exceptions:", e);
            super.markFail(e);
        }
        finally {
            LOG.debug("task writer starts to do destroy ...");
            PerfRecord desPerfRecord = new PerfRecord(this.getTaskGroupId(), this.getTaskId(), PerfRecord.PHASE.WRITE_TASK_DESTROY);
            desPerfRecord.start();
            super.destroy();
            desPerfRecord.end();
            channelWaitRead.end(super.getRunnerCommunication().getLongCounter("waitReaderTime"));
        }
    }

    public boolean supportFailOver() {
        Writer.Task taskWriter = (Writer.Task)this.getPlugin();
        return taskWriter.supportFailOver();
    }

    @Override
    public void shutdown() {
        this.recordReceiver.shutdown();
    }
}

