/*
 * Decompiled with CFR 0.152.
 */
package stream.io;

import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.Processor;
import stream.data.Data;
import stream.data.DataFactory;
import stream.io.AbstractDataStream;
import stream.io.QueueService;

public abstract class DataStreamQueue
extends AbstractDataStream
implements Processor,
QueueService {
    static Logger log = LoggerFactory.getLogger(DataStreamQueue.class);
    int size = 1000;
    boolean closed = false;
    LinkedBlockingQueue<Data> queue = new LinkedBlockingQueue();

    public DataStreamQueue() {
        this.setSize(1000);
    }

    public DataStreamQueue(int size) {
        this.setSize(size);
    }

    @Override
    public void close() throws Exception {
        this.queue.clear();
        this.closed = true;
    }

    @Override
    public void readHeader() throws Exception {
    }

    @Override
    public Data readItem(Data instance) throws Exception {
        Data item;
        block5: {
            if (instance == null) {
                return this.readItem(DataFactory.create());
            }
            item = null;
            try {
                item = this.queue.take();
                log.debug("took item from queue: {}", item);
            }
            catch (InterruptedException e) {
                if (this.closed) {
                    return null;
                }
                log.error("Interruped while waiting for data: {}", (Object)e.getMessage());
                if (!log.isDebugEnabled()) break block5;
                e.printStackTrace();
            }
        }
        if (item == null) {
            return null;
        }
        instance.putAll(item);
        return instance;
    }

    @Override
    public Data readNext() throws Exception {
        return this.readNext(DataFactory.create());
    }

    @Override
    public Data process(Data input) {
        this.enqueue(input);
        return input;
    }

    @Override
    public Data poll() {
        return this.queue.poll();
    }

    @Override
    public boolean enqueue(Data item) {
        try {
            this.queue.put(item);
            return true;
        }
        catch (Exception e) {
            log.error("Error enqueuing item: {}", (Object)e.getMessage());
            if (log.isDebugEnabled()) {
                e.printStackTrace();
            }
            return false;
        }
    }

    @Override
    public void reset() throws Exception {
        log.debug("Cleared Queue.");
        this.queue.clear();
    }

    public int getSize() {
        return this.size;
    }

    public void setSize(int size) {
        if (size < 1) {
            log.error("Invalid queue-size '{}'!", size);
            return;
        }
        this.size = size;
        this.queue = new LinkedBlockingQueue(size);
    }
}

