/*
 * Decompiled with CFR 0.152.
 */
package stream.storm.config;

import backtype.storm.topology.BoltDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.TopologyBuilder;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Element;
import stream.StreamTopology;
import stream.Subscription;
import stream.runtime.setup.factory.ObjectFactory;
import stream.storm.ProcessBolt;
import stream.storm.config.ATopologyElementHandler;

public class ProcessHandler
extends ATopologyElementHandler {
    static Logger log = LoggerFactory.getLogger(ProcessHandler.class);
    final String xml;

    public ProcessHandler(ObjectFactory of, String xml) {
        super(of);
        this.xml = xml;
    }

    @Override
    public boolean handles(Element el) {
        String name = el.getNodeName();
        return name.equalsIgnoreCase("process");
    }

    @Override
    public void handle(Element el, StreamTopology st, TopologyBuilder builder) throws Exception {
        if (el.getNodeName().equalsIgnoreCase("process")) {
            BoltDeclarer boltDeclarer;
            String id = el.getAttribute("id");
            if (id == null || id.trim().isEmpty()) {
                log.error("No 'id' attribute defined in process element (class: '{}')", (Object)el.getAttribute("class"));
                throw new Exception("Missing 'id' attribute for process element!");
            }
            log.info("  > Creating process-bolt with id '{}'", (Object)id);
            String copies = el.getAttribute("copies");
            Integer workers = 1;
            String input = el.getAttribute("input");
            List<String> inputs = this.getInputNames(el);
            if (copies != null && !copies.isEmpty()) {
                try {
                    workers = Integer.parseInt(copies);
                }
                catch (Exception e) {
                    workers = 1;
                    throw new RuntimeException("Invalid number of copies '" + copies + "' specified!");
                }
            }
            log.info("  >   Adding bolt '{}', subscribing to input(s): '{}'", (Object)id, (Object)input);
            ProcessBolt bolt = new ProcessBolt(this.xml, id, (Map<String, String>)st.getVariables());
            log.info("  >   Registering bolt (process) '{}' with instance {}", (Object)id, (Object)bolt);
            BoltDeclarer cur = boltDeclarer = builder.setBolt(id, (IRichBolt)bolt, (Number)workers);
            if (!inputs.isEmpty()) {
                for (String in : inputs) {
                    if (in.isEmpty()) continue;
                    log.info("  >   Connecting bolt '{}' to non-group '{}'", (Object)id, (Object)in);
                    cur = (BoltDeclarer)cur.noneGrouping(in);
                }
            } else {
                log.warn("No input defined for process '{}'!", (Object)id);
            }
            st.addBolt(id, cur);
            for (Subscription subscription : bolt.getSubscriptions()) {
                log.info("Adding subscription:  {}", (Object)subscription);
                st.addSubscription(subscription);
            }
        }
    }
}

