/*
 * Decompiled with CFR 0.152.
 */
package stream.storm;

import backtype.storm.task.OutputCollector;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import stream.Processor;
import stream.ProcessorList;
import stream.Subscription;
import stream.io.Queue;
import stream.runtime.DependencyInjection;
import stream.runtime.setup.ObjectFactory;
import stream.runtime.setup.ProcessorFactory;
import stream.storm.QueueWrapper;

public class QueueInjection
implements ProcessorFactory.ProcessorCreationHandler {
    static Logger log = LoggerFactory.getLogger(QueueInjection.class);
    final OutputCollector collector;
    final String boltId;
    final Set<Subscription> subscriptions = new LinkedHashSet<Subscription>();

    public QueueInjection(String boltId, OutputCollector c) {
        this.boltId = boltId;
        this.collector = c;
    }

    public static void injectQueues(Processor proc, OutputCollector collector) {
    }

    public static void injectQueues(ProcessorList procs, OutputCollector collector) {
        for (Processor p : procs.getProcessors()) {
            QueueInjection.injectQueues(p, collector);
        }
    }

    public static String getQueueSetterName(Method m) {
        return m.getName().substring(3);
    }

    public static boolean isQueueArraySetter(Method m) {
        Class<?> type = m.getParameterTypes()[0];
        return type.isArray();
    }

    public void processorCreated(Processor p, Element from) throws Exception {
        Map params = ObjectFactory.newInstance().getAttributes((Node)from);
        for (Method m : p.getClass().getMethods()) {
            log.debug("Checking method {}", (Object)m);
            if (DependencyInjection.isSetter((Method)m, Queue.class)) {
                String qsn = QueueInjection.getQueueSetterName(m);
                String prop = qsn.substring(0, 1).toLowerCase() + qsn.substring(1);
                if (params.get(prop) == null) {
                    log.info("Found null-value for property '{}', skipping injection for this property.", (Object)prop);
                    continue;
                }
                log.debug("Found queue-setter for property {} (property value: '{}')", (Object)prop, params.get(prop));
                if (DependencyInjection.isArraySetter((Method)m, Queue.class)) {
                    String[] names = ((String)params.get(prop)).split(",");
                    ArrayList<QueueWrapper> wrapper = new ArrayList<QueueWrapper>();
                    for (String name : names) {
                        if (name.trim().isEmpty()) continue;
                        this.subscriptions.add(new Subscription(name.trim(), this.boltId));
                        wrapper.add(new QueueWrapper(this.collector, name));
                    }
                    log.debug("Injecting array of queues...");
                    QueueWrapper[] array = wrapper.toArray(new QueueWrapper[wrapper.size()]);
                    m.invoke((Object)p, new Object[]{array});
                    continue;
                }
                String name = (String)params.get(prop);
                this.subscriptions.add(new Subscription(name.trim(), this.boltId));
                log.debug("Injecting a single queue... using method {}", (Object)m);
                m.invoke((Object)p, new QueueWrapper(this.collector, name));
                continue;
            }
            log.debug("Skipping method {} => not a queue-setter", (Object)m);
        }
    }

    public Set<Subscription> getSubscriptions() {
        return this.subscriptions;
    }
}

