/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.datax.core.transport.channel.memory;

import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.spi.ErrorCode;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.transport.channel.Channel;
import com.alibaba.datax.core.transport.record.TerminateRecord;
import com.alibaba.datax.core.util.FrameworkErrorCode;
import java.util.Collection;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class MemoryChannel
extends Channel {
    private int bufferSize = 0;
    private AtomicInteger memoryBytes = new AtomicInteger(0);
    private ArrayBlockingQueue<Record> queue = new ArrayBlockingQueue(this.getCapacity());
    private ReentrantLock lock;
    private Condition notInsufficient;
    private Condition notEmpty;

    public MemoryChannel(Configuration configuration) {
        super(configuration);
        this.bufferSize = configuration.getInt("core.transport.exchanger.bufferSize");
        this.lock = new ReentrantLock();
        this.notInsufficient = this.lock.newCondition();
        this.notEmpty = this.lock.newCondition();
    }

    @Override
    public void close() {
        super.close();
        try {
            this.queue.put(TerminateRecord.get());
        }
        catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
        }
    }

    @Override
    public void clear() {
        this.queue.clear();
    }

    @Override
    protected void doPush(Record r) {
        try {
            long startTime = System.nanoTime();
            this.queue.put(r);
            this.waitWriterTime += System.nanoTime() - startTime;
            this.memoryBytes.addAndGet(r.getMemorySize());
        }
        catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
        }
    }

    @Override
    protected void doPushAll(Collection<Record> rs) {
        try {
            long startTime = System.nanoTime();
            this.lock.lockInterruptibly();
            int bytes = this.getRecordBytes(rs);
            while (this.memoryBytes.get() + bytes > this.byteCapacity || rs.size() > this.queue.remainingCapacity()) {
                this.notInsufficient.await(200L, TimeUnit.MILLISECONDS);
            }
            this.queue.addAll(rs);
            this.waitWriterTime += System.nanoTime() - startTime;
            this.memoryBytes.addAndGet(bytes);
            this.notEmpty.signalAll();
        }
        catch (InterruptedException e) {
            throw DataXException.asDataXException((ErrorCode)FrameworkErrorCode.RUNTIME_ERROR, e);
        }
        finally {
            this.lock.unlock();
        }
    }

    @Override
    protected Record doPull() {
        try {
            long startTime = System.nanoTime();
            Record r = this.queue.take();
            this.waitReaderTime += System.nanoTime() - startTime;
            this.memoryBytes.addAndGet(-r.getMemorySize());
            return r;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IllegalStateException(e);
        }
    }

    @Override
    protected void doPullAll(Collection<Record> rs) {
        assert (rs != null);
        rs.clear();
        try {
            long startTime = System.nanoTime();
            this.lock.lockInterruptibly();
            while (this.queue.drainTo(rs, this.bufferSize) <= 0) {
                this.notEmpty.await(200L, TimeUnit.MILLISECONDS);
            }
            this.waitReaderTime += System.nanoTime() - startTime;
            int bytes = this.getRecordBytes(rs);
            this.memoryBytes.addAndGet(-bytes);
            this.notInsufficient.signalAll();
        }
        catch (InterruptedException e) {
            throw DataXException.asDataXException((ErrorCode)FrameworkErrorCode.RUNTIME_ERROR, e);
        }
        finally {
            this.lock.unlock();
        }
    }

    private int getRecordBytes(Collection<Record> rs) {
        int bytes = 0;
        for (Record r : rs) {
            bytes += r.getMemorySize();
        }
        return bytes;
    }

    @Override
    public int size() {
        return this.queue.size();
    }

    @Override
    public boolean isEmpty() {
        return this.queue.isEmpty();
    }
}

