/*
 * Decompiled with CFR 0.152.
 */
package stream.io;

import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.Data;
import stream.io.AbstractQueue;

public class BlockingQueue
extends AbstractQueue {
    static Logger log = LoggerFactory.getLogger(BlockingQueue.class);
    protected boolean closed = false;
    protected LinkedBlockingQueue<Data> queue;

    @Override
    public void init() throws Exception {
        if (this.getLimit() < 1) {
            throw new IllegalArgumentException("Invalid queue-size '" + this.getLimit() + "'!");
        }
        this.queue = new LinkedBlockingQueue(this.getLimit());
    }

    @Override
    public void close() throws Exception {
        this.queue.clear();
        this.queue.add(Data.END_OF_STREAM);
        this.queue.notifyAll();
        this.closed = true;
    }

    @Override
    public Data read() throws Exception {
        Data item;
        block5: {
            item = null;
            try {
                if (this.closed) {
                    return null;
                }
                item = this.queue.take();
                log.debug("took item from queue: {}", (Object)item);
            }
            catch (InterruptedException e) {
                if (this.closed) {
                    return null;
                }
                log.error("Interruped while waiting for data: {}", (Object)e.getMessage());
                if (!log.isDebugEnabled()) break block5;
                e.printStackTrace();
            }
        }
        if (this.closed || item == Data.END_OF_STREAM) {
            log.debug("Next data-item is end-of-stream event!");
            this.closed = true;
            return null;
        }
        return item;
    }

    @Override
    public Data poll() {
        return this.queue.poll();
    }

    @Override
    public Data take() {
        try {
            Data item = this.queue.take();
            if (item == Data.END_OF_STREAM) {
                return null;
            }
            return item;
        }
        catch (Exception e) {
            log.error("Interrupted while reading on queue: {}", (Object)e.getMessage());
            if (log.isDebugEnabled()) {
                e.printStackTrace();
            }
            return null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean enqueue(Data item) {
        try {
            LinkedBlockingQueue<Data> linkedBlockingQueue = this.queue;
            synchronized (linkedBlockingQueue) {
                if (!this.closed) {
                    this.queue.put(item);
                }
                this.queue.notifyAll();
            }
            return true;
        }
        catch (Exception e) {
            log.error("Error enqueuing item: {}", (Object)e.getMessage());
            if (log.isDebugEnabled()) {
                e.printStackTrace();
            }
            return false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void write(Data item) throws Exception {
        LinkedBlockingQueue<Data> linkedBlockingQueue = this.queue;
        synchronized (linkedBlockingQueue) {
            if (this.closed) {
                this.queue.notifyAll();
                log.error("Queue already closed.");
            }
            this.queue.put(item);
            this.queue.notifyAll();
        }
    }

    @Override
    public int level() {
        return this.queue.size();
    }

    @Override
    public int capacity() {
        return this.getLimit();
    }

    @Override
    public void reset() throws Exception {
        if (this.queue == null) {
            this.queue = new LinkedBlockingQueue(this.getLimit());
        } else {
            this.queue.clear();
        }
    }
}

