/*
 * Decompiled with CFR 0.152.
 */
package stream.io.active;

import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.Processor;
import stream.data.Data;
import stream.data.DataFactory;
import stream.io.DataStream;
import stream.io.active.ActiveDataStream;

public class ActiveDataStreamImpl
implements ActiveDataStream {
    static Logger log = LoggerFactory.getLogger(ActiveDataStreamImpl.class);
    protected final LinkedBlockingQueue<Data> queue;
    protected DataStream stream;
    protected StreamActivator activator;

    public ActiveDataStreamImpl(DataStream stream) {
        this.stream = stream;
        this.queue = new LinkedBlockingQueue(100);
    }

    @Override
    public Map<String, Class<?>> getAttributes() {
        return this.stream.getAttributes();
    }

    @Override
    public Data readNext() throws Exception {
        return this.readNext(DataFactory.create());
    }

    @Override
    public Data readNext(Data datum) throws Exception {
        if (this.queue.isEmpty()) {
            return null;
        }
        Data d = this.queue.poll();
        if (d != null) {
            datum.putAll(d);
        }
        return datum;
    }

    @Override
    public void close() throws Exception {
        this.stream.close();
        this.activator.setRun(false);
    }

    @Override
    public List<Processor> getPreprocessors() {
        return this.stream.getPreprocessors();
    }

    @Override
    public void activate() throws Exception {
        this.activator = new StreamActivator();
        this.activator.start();
    }

    @Override
    public void init() throws Exception {
        this.stream.init();
    }

    private class StreamActivator
    extends Thread {
        private boolean run = true;

        @Override
        public void run() {
            while (this.run) {
                try {
                    ActiveDataStreamImpl.this.queue.put(ActiveDataStreamImpl.this.stream.readNext());
                }
                catch (InterruptedException e) {
                    log.error("Interrupted while reading stream: {}", (Object)e.getMessage());
                    if (!log.isDebugEnabled()) continue;
                    e.printStackTrace();
                }
                catch (Exception e) {
                    log.error("Error while reading stream: {}", (Object)e.getMessage());
                    if (!log.isDebugEnabled()) continue;
                    e.printStackTrace();
                }
            }
        }

        public void setRun(boolean run2) {
            this.run = run2;
            this.interrupt();
        }
    }
}

