/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.datax.core.transport.exchanger;

import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.CommonErrorCode;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.spi.ErrorCode;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.transport.channel.Channel;
import com.alibaba.datax.core.transport.record.TerminateRecord;
import com.alibaba.datax.core.util.FrameworkErrorCode;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang.Validate;

public class BufferedRecordExchanger
implements RecordSender,
RecordReceiver {
    private final Channel channel;
    private final Configuration configuration;
    private final List<Record> buffer;
    private int bufferSize;
    protected final int byteCapacity;
    private final AtomicInteger memoryBytes = new AtomicInteger(0);
    private int bufferIndex = 0;
    private static Class<? extends Record> RECORD_CLASS;
    private volatile boolean shutdown = false;
    private final TaskPluginCollector pluginCollector;

    public BufferedRecordExchanger(Channel channel, TaskPluginCollector pluginCollector) {
        assert (null != channel);
        assert (null != channel.getConfiguration());
        this.channel = channel;
        this.pluginCollector = pluginCollector;
        this.configuration = channel.getConfiguration();
        this.bufferSize = this.configuration.getInt("core.transport.exchanger.bufferSize");
        this.buffer = new ArrayList<Record>(this.bufferSize);
        this.byteCapacity = this.configuration.getInt("core.transport.channel.byteCapacity", 0x800000);
        try {
            RECORD_CLASS = Class.forName(this.configuration.getString("core.transport.record.class", "com.alibaba.datax.core.transport.record.DefaultRecord"));
        }
        catch (Exception e) {
            throw DataXException.asDataXException((ErrorCode)FrameworkErrorCode.CONFIG_ERROR, e);
        }
    }

    @Override
    public Record createRecord() {
        try {
            return RECORD_CLASS.newInstance();
        }
        catch (Exception e) {
            throw DataXException.asDataXException((ErrorCode)FrameworkErrorCode.CONFIG_ERROR, e);
        }
    }

    @Override
    public void sendToWriter(Record record) {
        boolean isFull;
        if (this.shutdown) {
            throw DataXException.asDataXException((ErrorCode)CommonErrorCode.SHUT_DOWN_TASK, "");
        }
        Validate.notNull((Object)record, (String)"record\u4e0d\u80fd\u4e3a\u7a7a.");
        if (record.getMemorySize() > this.byteCapacity) {
            this.pluginCollector.collectDirtyRecord(record, new Exception(String.format("\u5355\u6761\u8bb0\u5f55\u8d85\u8fc7\u5927\u5c0f\u9650\u5236\uff0c\u5f53\u524d\u9650\u5236\u4e3a:%s", this.byteCapacity)));
            return;
        }
        boolean bl = isFull = this.bufferIndex >= this.bufferSize || this.memoryBytes.get() + record.getMemorySize() > this.byteCapacity;
        if (isFull) {
            this.flush();
        }
        this.buffer.add(record);
        ++this.bufferIndex;
        this.memoryBytes.addAndGet(record.getMemorySize());
    }

    @Override
    public void flush() {
        if (this.shutdown) {
            throw DataXException.asDataXException((ErrorCode)CommonErrorCode.SHUT_DOWN_TASK, "");
        }
        this.channel.pushAll(this.buffer);
        this.buffer.clear();
        this.bufferIndex = 0;
        this.memoryBytes.set(0);
    }

    @Override
    public void terminate() {
        if (this.shutdown) {
            throw DataXException.asDataXException((ErrorCode)CommonErrorCode.SHUT_DOWN_TASK, "");
        }
        this.flush();
        this.channel.pushTerminate(TerminateRecord.get());
    }

    @Override
    public Record getFromReader() {
        Record record;
        boolean isEmpty;
        if (this.shutdown) {
            throw DataXException.asDataXException((ErrorCode)CommonErrorCode.SHUT_DOWN_TASK, "");
        }
        boolean bl = isEmpty = this.bufferIndex >= this.buffer.size();
        if (isEmpty) {
            this.receive();
        }
        if ((record = this.buffer.get(this.bufferIndex++)) instanceof TerminateRecord) {
            record = null;
        }
        return record;
    }

    @Override
    public void shutdown() {
        this.shutdown = true;
        try {
            this.buffer.clear();
            this.channel.clear();
        }
        catch (Throwable t) {
            t.printStackTrace();
        }
    }

    private void receive() {
        this.channel.pullAll(this.buffer);
        this.bufferIndex = 0;
        this.bufferSize = this.buffer.size();
    }
}

