/*
 * Decompiled with CFR 0.152.
 */
package stream.storm;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.io.ByteArrayInputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import stream.Data;
import stream.ProcessContext;
import stream.Processor;
import stream.ProcessorList;
import stream.StormRunner;
import stream.Subscription;
import stream.data.DataFactory;
import stream.io.Sink;
import stream.runtime.setup.ObjectFactory;
import stream.runtime.setup.ProcessorFactory;
import stream.storm.AbstractBolt;
import stream.storm.BoltContext;
import stream.storm.QueueInjection;
import stream.util.Variables;

public class ProcessBolt
extends AbstractBolt {
    private static final long serialVersionUID = -924312414467186051L;
    static Logger log = LoggerFactory.getLogger(ProcessBolt.class);
    transient ProcessorList process;
    transient List<OutputRef> outputRefs = new ArrayList<OutputRef>();
    protected final Variables variables;
    protected String[] outputs;
    final BoltContext ctx = new BoltContext();
    final Set<Subscription> subscriptions = new LinkedHashSet<Subscription>();

    public ProcessBolt(String xmlConfig, String uuid, Map<String, String> variables) throws Exception {
        super(xmlConfig, uuid);
        this.variables = new Variables(variables);
        this.createProcess();
    }

    public Set<Subscription> getSubscriptions() {
        return this.subscriptions;
    }

    protected ProcessorList createProcess() throws Exception {
        DocumentBuilder builder = DocumentBuilderFactory.newInstance().newDocumentBuilder();
        Document config = builder.parse(new ByteArrayInputStream(this.xmlConfig.getBytes()));
        Element element = StormRunner.findElementByUUID(config.getDocumentElement(), this.uuid);
        if (element == null) {
            log.error("Failed to find process for uuid '{}' in the XML!", (Object)this.uuid);
            throw new Exception("Failed to find process for uuid '" + this.uuid + "' in the XML!");
        }
        ObjectFactory obf = ObjectFactory.newInstance();
        ProcessorFactory pf = new ProcessorFactory(obf);
        QueueInjection queueInjection = new QueueInjection(this.uuid, this.output);
        pf.addCreationHandler((ProcessorFactory.ProcessorCreationHandler)queueInjection);
        log.debug("Creating processor-list from element {}", (Object)element);
        List list = pf.createNestedProcessors(element);
        this.process = new ProcessorList();
        for (Processor p : list) {
            this.process.getProcessors().add(p);
        }
        if (element.hasAttribute("output")) {
            String out = element.getAttribute("output");
            this.outputs = out.indexOf(",") > 0 ? out.split(",") : new String[]{out};
        }
        this.subscriptions.addAll(queueInjection.getSubscriptions());
        log.debug("Found {} subscribers for bolt '{}': " + this.subscriptions, (Object)this.subscriptions.size(), (Object)this.uuid);
        return this.process;
    }

    public List<Processor> getAllProcessors() {
        return this.getAllProcessors(this.process);
    }

    protected List<Processor> getAllProcessors(ProcessorList list) {
        ArrayList<Processor> ps = new ArrayList<Processor>();
        for (Processor p : list.getProcessors()) {
            if (p instanceof ProcessorList) {
                ps.addAll(this.getAllProcessors((ProcessorList)p));
                continue;
            }
            ps.add(p);
        }
        return ps;
    }

    protected Processor createProcessor(Element el, ProcessorFactory pf) throws Exception {
        Processor p = pf.createProcessor(el);
        if (p instanceof ProcessorList) {
            ProcessorList list = (ProcessorList)p;
            NodeList nested = el.getChildNodes();
            for (int i = 0; i < nested.getLength(); ++i) {
                Node ch = nested.item(i);
                if (ch.getNodeType() != 1) continue;
                Element e = (Element)ch;
                Processor inner = this.createProcessor(e, pf);
                list.getProcessors().add(inner);
            }
        }
        return p;
    }

    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        log.debug("Preparing ProcessBolt {}", (Object)this.uuid);
        this.output = collector;
        log.debug("   output collector: {}", (Object)this.output);
        try {
            this.process = this.createProcess();
            this.process.init((ProcessContext)this.ctx);
        }
        catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException("Failed to create process!");
        }
    }

    public void execute(Tuple input) {
        Data item;
        block8: {
            log.debug("Tuple received: {}", (Object)input);
            item = null;
            try {
                Object data = input.getValueByField("stream.Data");
                log.debug("Data is: {}", data);
                if (data == null) break block8;
                item = (Data)data;
            }
            catch (Exception e) {
                log.debug("Error processing tuple: {}", (Object)e.getMessage());
                item = DataFactory.create();
                Fields fields = input.getFields();
                for (int i = 0; i < fields.size(); ++i) {
                    String key = fields.get(i);
                    Object value = input.getValue(i);
                    if (!(value instanceof Serializable)) continue;
                    item.put((Object)key, (Object)((Serializable)value));
                }
            }
        }
        if (item != null) {
            log.debug("Processing item...");
            item = this.process.process(item);
            if (this.outputs != null) {
                for (String out : this.outputs) {
                    log.debug("Emitting result item to {}: {}", (Object)out, (Object)item);
                    this.output.emit(out, (List)new Values(new Object[]{item}));
                }
            } else {
                log.debug("Emitting item {}", (Object)item);
                this.output.emit((List)new Values(new Object[]{item}));
            }
        } else {
            log.debug("No item to process!");
        }
    }

    public final class OutputRef {
        final Processor processor;
        final String property;
        final String[] refs;

        public OutputRef(Processor p, String property, String[] refs) {
            this.processor = p;
            this.property = property;
            this.refs = refs;
        }
    }

    public final class DataForwarder
    implements Sink {
        String id;
        final OutputCollector output;

        public DataForwarder(String id, OutputCollector output) {
            this.id = id;
            this.output = output;
        }

        public String getId() {
            return this.id;
        }

        public boolean write(Data item) throws Exception {
            if (item == null) {
                return false;
            }
            this.output.emit(this.id, (List)new Values(new Object[]{item}));
            return true;
        }

        public void close() throws Exception {
        }

        public boolean write(Collection<Data> data) throws Exception {
            for (Data item : data) {
                this.output.emit(this.id, (List)new Values(new Object[]{item}));
            }
            return true;
        }

        public void setId(String id) {
            this.id = id;
        }

        public void init() throws Exception {
        }
    }
}

