/*
 * Decompiled with CFR 0.152.
 */
package stream.io.active;

import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.Data;
import stream.data.DataFactory;
import stream.io.Stream;
import stream.io.active.ActiveDataStream;

public class ActiveDataStreamImpl
implements ActiveDataStream {
    static Logger log = LoggerFactory.getLogger(ActiveDataStreamImpl.class);
    protected final LinkedBlockingQueue<Data> queue;
    protected Stream stream;
    protected StreamActivator activator;
    protected String id;
    protected Long limit = -1L;
    protected Long count = 0L;

    public ActiveDataStreamImpl(Stream stream) {
        this.stream = stream;
        this.queue = new LinkedBlockingQueue(100);
    }

    @Override
    public Long getLimit() {
        return this.limit;
    }

    @Override
    public void setLimit(Long limit) {
        this.limit = limit;
    }

    @Override
    public String getId() {
        return this.id;
    }

    @Override
    public void setId(String id) {
        this.id = id;
    }

    @Override
    public Data read() throws Exception {
        if (this.limit > 0L && this.count > this.limit) {
            return null;
        }
        if (this.queue.isEmpty()) {
            return null;
        }
        Data datum = DataFactory.create();
        Data d = this.queue.poll();
        if (d != null) {
            datum.putAll(d);
        }
        Long l = this.count;
        Long l2 = this.count = Long.valueOf(this.count + 1L);
        return datum;
    }

    @Override
    public void close() throws Exception {
        this.stream.close();
        this.activator.setRun(false);
    }

    @Override
    public void activate() throws Exception {
        this.activator = new StreamActivator();
        this.activator.start();
    }

    @Override
    public void init() throws Exception {
        this.stream.init();
    }

    private class StreamActivator
    extends Thread {
        private boolean run = true;

        @Override
        public void run() {
            while (this.run) {
                try {
                    ActiveDataStreamImpl.this.queue.put(ActiveDataStreamImpl.this.stream.read());
                }
                catch (InterruptedException e) {
                    log.error("Interrupted while reading stream: {}", (Object)e.getMessage());
                    if (!log.isDebugEnabled()) continue;
                    e.printStackTrace();
                }
                catch (Exception e) {
                    log.error("Error while reading stream: {}", (Object)e.getMessage());
                    if (!log.isDebugEnabled()) continue;
                    e.printStackTrace();
                }
            }
        }

        public void setRun(boolean run2) {
            this.run = run2;
            this.interrupt();
        }
    }
}

