/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.datax.core.transport.exchanger;

import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.CommonErrorCode;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.spi.ErrorCode;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.statistics.communication.Communication;
import com.alibaba.datax.core.transport.channel.Channel;
import com.alibaba.datax.core.transport.exchanger.TransformerExchanger;
import com.alibaba.datax.core.transport.record.TerminateRecord;
import com.alibaba.datax.core.transport.transformer.TransformerExecution;
import com.alibaba.datax.core.util.FrameworkErrorCode;
import java.util.List;

public class RecordExchanger
extends TransformerExchanger
implements RecordSender,
RecordReceiver {
    private Channel channel;
    private Configuration configuration;
    private static Class<? extends Record> RECORD_CLASS;
    private volatile boolean shutdown = false;

    public RecordExchanger(int taskGroupId, int taskId, Channel channel, Communication communication, List<TransformerExecution> transformerExecs, TaskPluginCollector pluginCollector) {
        super(taskGroupId, taskId, communication, transformerExecs, pluginCollector);
        assert (channel != null);
        this.channel = channel;
        this.configuration = channel.getConfiguration();
        try {
            RECORD_CLASS = Class.forName(this.configuration.getString("core.transport.record.class", "com.alibaba.datax.core.transport.record.DefaultRecord"));
        }
        catch (ClassNotFoundException e) {
            throw DataXException.asDataXException((ErrorCode)FrameworkErrorCode.CONFIG_ERROR, e);
        }
    }

    @Override
    public Record getFromReader() {
        if (this.shutdown) {
            throw DataXException.asDataXException((ErrorCode)CommonErrorCode.SHUT_DOWN_TASK, "");
        }
        Record record = this.channel.pull();
        return record instanceof TerminateRecord ? null : record;
    }

    @Override
    public Record createRecord() {
        try {
            return RECORD_CLASS.newInstance();
        }
        catch (Exception e) {
            throw DataXException.asDataXException((ErrorCode)FrameworkErrorCode.CONFIG_ERROR, e);
        }
    }

    @Override
    public void sendToWriter(Record record) {
        if (this.shutdown) {
            throw DataXException.asDataXException((ErrorCode)CommonErrorCode.SHUT_DOWN_TASK, "");
        }
        if ((record = this.doTransformer(record)) == null) {
            return;
        }
        this.channel.push(record);
        this.doStat();
    }

    @Override
    public void flush() {
    }

    @Override
    public void terminate() {
        if (this.shutdown) {
            throw DataXException.asDataXException((ErrorCode)CommonErrorCode.SHUT_DOWN_TASK, "");
        }
        this.channel.pushTerminate(TerminateRecord.get());
        this.doStat();
    }

    @Override
    public void shutdown() {
        this.shutdown = true;
    }
}

