/*
 * Decompiled with CFR 0.152.
 */
package stream.storm;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import stream.Data;
import stream.io.Stream;
import stream.runtime.Variables;
import stream.runtime.setup.ObjectFactory;
import stream.runtime.setup.StreamFactory;
import stream.storm.config.StreamHandler;
import stream.util.XMLElementMatch;
import stream.util.XMLUtils;

public class StreamSpout
extends BaseRichSpout {
    private static final long serialVersionUID = -786482575770711600L;
    static Logger log = LoggerFactory.getLogger(StreamSpout.class);
    transient Stream stream;
    protected SpoutOutputCollector output;
    protected final String className;
    protected final Variables parameters;
    protected final String xml;
    protected final String id;

    public StreamSpout(String xml, String id, String className, Map<String, String> params) throws Exception {
        log.debug("Creating spout for stream (class: {}, params: {})", (Object)className, params);
        this.xml = xml;
        this.id = id;
        this.className = className;
        this.parameters = new Variables(params);
        this.stream = this.createStream();
    }

    protected Stream createStream() throws Exception {
        Stream stream = null;
        Document doc = XMLUtils.parseDocument((String)this.xml);
        List els = XMLUtils.findElements((Document)doc, (XMLElementMatch)new StreamHandler.StreamFinder(this.id));
        if (els.size() != 1) {
            throw new RuntimeException("Failed to locate 'stream' element for id '" + this.id + "'!");
        }
        Element el = (Element)els.get(0);
        ObjectFactory objectFactory = ObjectFactory.newInstance();
        stream = StreamFactory.createStream((ObjectFactory)objectFactory, (Element)el, (Variables)this.parameters);
        return stream;
    }

    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.output = collector;
        try {
            if (this.stream == null) {
                this.stream = this.createStream();
            }
            this.stream.init();
        }
        catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException("Failed to open stream: " + e.getMessage());
        }
    }

    public void nextTuple() {
        block4: {
            log.debug("nextTuple() called");
            try {
                Data item = this.stream.read();
                log.debug("read item: {}", (Object)item);
                if (item == null) {
                    this.sleep(500L);
                } else {
                    log.debug("Emitting item as tuple...");
                    this.output.emit((List)new Values(new Object[]{item}));
                }
            }
            catch (Exception e) {
                log.error("Failed to read next item: {}", (Object)e.getMessage());
                if (!log.isDebugEnabled()) break block4;
                e.printStackTrace();
            }
        }
    }

    protected void sleep(long ms) {
        try {
            Thread.sleep(ms);
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        log.debug("Declaring output-field 'stream.Data'");
        declarer.declare(new Fields(new String[]{"stream.Data"}));
    }
}

