/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.datax.core.transport.channel;

import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.statistics.communication.Communication;
import com.alibaba.datax.core.statistics.communication.CommunicationTool;
import com.alibaba.datax.core.transport.record.TerminateRecord;
import java.util.Collection;
import org.apache.commons.lang.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class Channel {
    private static final Logger LOG = LoggerFactory.getLogger(Channel.class);
    protected int taskGroupId;
    protected int capacity;
    protected int byteCapacity;
    protected long byteSpeed;
    protected long recordSpeed;
    protected long flowControlInterval;
    protected volatile boolean isClosed = false;
    protected Configuration configuration = null;
    protected volatile long waitReaderTime = 0L;
    protected volatile long waitWriterTime = 0L;
    private static Boolean isFirstPrint = true;
    private Communication currentCommunication;
    private Communication lastCommunication = new Communication();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Channel(Configuration configuration) {
        int capacity = configuration.getInt("core.transport.channel.capacity", 2048);
        long byteSpeed = configuration.getLong("core.transport.channel.speed.byte", 0x100000L);
        long recordSpeed = configuration.getLong("core.transport.channel.speed.record", 10000L);
        if (capacity <= 0) {
            throw new IllegalArgumentException(String.format("\u901a\u9053\u5bb9\u91cf[%d]\u5fc5\u987b\u5927\u4e8e0.", capacity));
        }
        Boolean bl = isFirstPrint;
        synchronized (bl) {
            if (isFirstPrint.booleanValue()) {
                LOG.info("Channel set byte_speed_limit to " + byteSpeed + (byteSpeed <= 0L ? ", No bps activated." : "."));
                LOG.info("Channel set record_speed_limit to " + recordSpeed + (recordSpeed <= 0L ? ", No tps activated." : "."));
                isFirstPrint = false;
            }
        }
        this.taskGroupId = configuration.getInt("core.container.taskGroup.id");
        this.capacity = capacity;
        this.byteSpeed = byteSpeed;
        this.recordSpeed = recordSpeed;
        this.flowControlInterval = configuration.getLong("core.transport.channel.flowControlInterval", 1000L);
        this.byteCapacity = configuration.getInt("core.transport.channel.byteCapacity", 0x800000);
        this.configuration = configuration;
    }

    public void close() {
        this.isClosed = true;
    }

    public void open() {
        this.isClosed = false;
    }

    public boolean isClosed() {
        return this.isClosed;
    }

    public int getTaskGroupId() {
        return this.taskGroupId;
    }

    public int getCapacity() {
        return this.capacity;
    }

    public long getByteSpeed() {
        return this.byteSpeed;
    }

    public Configuration getConfiguration() {
        return this.configuration;
    }

    public void setCommunication(Communication communication) {
        this.currentCommunication = communication;
        this.lastCommunication.reset();
    }

    public void push(Record r) {
        Validate.notNull((Object)r, (String)"record\u4e0d\u80fd\u4e3a\u7a7a.");
        this.doPush(r);
        this.statPush(1L, r.getByteSize());
    }

    public void pushTerminate(TerminateRecord r) {
        Validate.notNull((Object)r, (String)"record\u4e0d\u80fd\u4e3a\u7a7a.");
        this.doPush(r);
    }

    public void pushAll(Collection<Record> rs) {
        Validate.notNull(rs);
        Validate.noNullElements(rs);
        this.doPushAll(rs);
        this.statPush(rs.size(), this.getByteSize(rs));
    }

    public Record pull() {
        Record record = this.doPull();
        this.statPull(1L, record.getByteSize());
        return record;
    }

    public void pullAll(Collection<Record> rs) {
        Validate.notNull(rs);
        this.doPullAll(rs);
        this.statPull(rs.size(), this.getByteSize(rs));
    }

    protected abstract void doPush(Record var1);

    protected abstract void doPushAll(Collection<Record> var1);

    protected abstract Record doPull();

    protected abstract void doPullAll(Collection<Record> var1);

    public abstract int size();

    public abstract boolean isEmpty();

    public abstract void clear();

    private long getByteSize(Collection<Record> rs) {
        long size = 0L;
        for (Record each : rs) {
            size += (long)each.getByteSize();
        }
        return size;
    }

    private void statPush(long recordSize, long byteSize) {
        boolean isChannelRecordSpeedLimit;
        this.currentCommunication.increaseCounter("readSucceedRecords", recordSize);
        this.currentCommunication.increaseCounter("readSucceedBytes", byteSize);
        this.currentCommunication.setLongCounter("waitReaderTime", this.waitReaderTime);
        this.currentCommunication.setLongCounter("waitWriterTime", this.waitWriterTime);
        boolean isChannelByteSpeedLimit = this.byteSpeed > 0L;
        boolean bl = isChannelRecordSpeedLimit = this.recordSpeed > 0L;
        if (!isChannelByteSpeedLimit && !isChannelRecordSpeedLimit) {
            return;
        }
        long lastTimestamp = this.lastCommunication.getTimestamp();
        long nowTimestamp = System.currentTimeMillis();
        long interval = nowTimestamp - lastTimestamp;
        if (interval - this.flowControlInterval >= 0L) {
            long sleepTime;
            long currentRecordSpeed;
            long currentByteSpeed;
            long byteLimitSleepTime = 0L;
            long recordLimitSleepTime = 0L;
            if (isChannelByteSpeedLimit && (currentByteSpeed = (CommunicationTool.getTotalReadBytes(this.currentCommunication) - CommunicationTool.getTotalReadBytes(this.lastCommunication)) * 1000L / interval) > this.byteSpeed) {
                byteLimitSleepTime = currentByteSpeed * interval / this.byteSpeed - interval;
            }
            if (isChannelRecordSpeedLimit && (currentRecordSpeed = (CommunicationTool.getTotalReadRecords(this.currentCommunication) - CommunicationTool.getTotalReadRecords(this.lastCommunication)) * 1000L / interval) > this.recordSpeed) {
                recordLimitSleepTime = currentRecordSpeed * interval / this.recordSpeed - interval;
            }
            long l = sleepTime = byteLimitSleepTime < recordLimitSleepTime ? recordLimitSleepTime : byteLimitSleepTime;
            if (sleepTime > 0L) {
                try {
                    Thread.sleep(sleepTime);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            this.lastCommunication.setLongCounter("readSucceedBytes", this.currentCommunication.getLongCounter("readSucceedBytes"));
            this.lastCommunication.setLongCounter("readFailedBytes", this.currentCommunication.getLongCounter("readFailedBytes"));
            this.lastCommunication.setLongCounter("readSucceedRecords", this.currentCommunication.getLongCounter("readSucceedRecords"));
            this.lastCommunication.setLongCounter("readFailedRecords", this.currentCommunication.getLongCounter("readFailedRecords"));
            this.lastCommunication.setTimestamp(nowTimestamp);
        }
    }

    private void statPull(long recordSize, long byteSize) {
        this.currentCommunication.increaseCounter("writeReceivedRecords", recordSize);
        this.currentCommunication.increaseCounter("writeReceivedBytes", byteSize);
    }
}

