/*
 * Decompiled with CFR 0.152.
 */
package stream.storm;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.io.ByteArrayInputStream;
import java.util.List;
import java.util.Map;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import stream.Processor;
import stream.ProcessorList;
import stream.StormRunner;
import stream.data.Data;
import stream.runtime.setup.ObjectFactory;
import stream.runtime.setup.ProcessorFactory;
import stream.storm.AbstractBolt;

public class ProcessBolt
extends AbstractBolt {
    private static final long serialVersionUID = -924312414467186051L;
    static Logger log = LoggerFactory.getLogger(ProcessBolt.class);
    transient ProcessorList process;

    public ProcessBolt(String xmlConfig, String uuid) {
        super(xmlConfig, uuid);
    }

    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.output = collector;
        try {
            DocumentBuilder builder = DocumentBuilderFactory.newInstance().newDocumentBuilder();
            Document config = builder.parse(new ByteArrayInputStream(this.xmlConfig.getBytes()));
            Element element = StormRunner.findElementByUUID(config.getDocumentElement(), this.uuid);
            if (element == null) {
                throw new Exception("Fuck! You screwed the XML :-)");
            }
            ObjectFactory obf = ObjectFactory.newInstance();
            ProcessorFactory pf = new ProcessorFactory(obf);
            log.debug("Creating processor-list from element {}", (Object)element);
            List list = pf.createNestedProcessors(element);
            this.process = new ProcessorList();
            for (Processor p : list) {
                this.process.getProcessors().add(p);
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void execute(Tuple input) {
        log.debug("Tuple received: {}", (Object)input);
        Object data = input.getValueByField("stream.Data");
        log.debug("Data is: {}", data);
        if (data != null) {
            Data item = (Data)data;
            item = this.process.process(item);
            log.debug("Emitting result item: {}", (Object)item);
            this.output.emit((List)new Values(new Object[]{item}));
            log.debug("ack'ing item {}", (Object)input);
            this.output.ack(input);
        }
    }
}

