/*
 * Decompiled with CFR 0.152.
 */
package stream.runtime;

import java.net.InetAddress;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import stream.data.Data;
import stream.data.DataFactory;
import stream.io.BlockingQueue;
import stream.io.DataStream;
import stream.io.DataStreamQueue;
import stream.runtime.AbstractProcess;
import stream.runtime.ContainerContext;
import stream.runtime.ContainerController;
import stream.runtime.DefaultNamingService;
import stream.runtime.ElementHandler;
import stream.runtime.LifeCycle;
import stream.runtime.Monitor;
import stream.runtime.Process;
import stream.runtime.ProcessContextImpl;
import stream.runtime.rpc.RMINamingService;
import stream.runtime.setup.ContainerRefElementHandler;
import stream.runtime.setup.DocumentHandler;
import stream.runtime.setup.LibrariesElementHandler;
import stream.runtime.setup.MonitorElementHandler;
import stream.runtime.setup.ObjectCreator;
import stream.runtime.setup.ObjectFactory;
import stream.runtime.setup.ProcessElementHandler;
import stream.runtime.setup.ProcessorFactory;
import stream.runtime.setup.PropertiesHandler;
import stream.runtime.setup.ServiceElementHandler;
import stream.runtime.setup.ServiceInjection;
import stream.runtime.setup.ServiceReference;
import stream.runtime.setup.StreamElementHandler;
import stream.service.NamingService;

public class ProcessContainer {
    static Logger log = LoggerFactory.getLogger(ProcessContainer.class);
    static final List<ProcessContainer> container = new ArrayList<ProcessContainer>();
    protected final ObjectFactory objectFactory = ObjectFactory.newInstance();
    protected final ProcessorFactory processorFactory = new ProcessorFactory(this.objectFactory);
    protected String name = null;
    protected final ContainerContext context;
    protected final Map<String, DataStream> streams = new LinkedHashMap<String, DataStream>();
    protected final Map<String, DataStreamQueue> listeners = new LinkedHashMap<String, DataStreamQueue>();
    protected final List<AbstractProcess> processes = new ArrayList<AbstractProcess>();
    protected final List<ServiceReference> serviceRefs = new ArrayList<ServiceReference>();
    protected final Map<String, ElementHandler> elementHandler = new HashMap<String, ElementHandler>();
    protected final List<DocumentHandler> documentHandler = new ArrayList<DocumentHandler>();
    protected NamingService namingService = null;
    protected final List<LifeCycle> lifeCyleObjects = new ArrayList<LifeCycle>();
    boolean server = true;
    Long startTime = 0L;
    static final String[] extensions;

    public ProcessContainer(URL url) throws Exception {
        this(url, null);
    }

    public ProcessContainer(URL url, Map<String, ElementHandler> customElementHandler) throws Exception {
        LibrariesElementHandler libHandler = new LibrariesElementHandler(this.objectFactory);
        this.documentHandler.add(libHandler);
        this.documentHandler.add(new PropertiesHandler());
        this.elementHandler.put("Container-Ref", new ContainerRefElementHandler(this.objectFactory));
        this.elementHandler.put("Monitor", new MonitorElementHandler(this.objectFactory, this.processorFactory));
        this.elementHandler.put("Process", new ProcessElementHandler(this.objectFactory, this.processorFactory));
        this.elementHandler.put("Stream", new StreamElementHandler(this.objectFactory));
        this.elementHandler.put("Service", new ServiceElementHandler(this.objectFactory));
        this.elementHandler.put("Libs", libHandler);
        if (customElementHandler != null) {
            this.elementHandler.putAll(customElementHandler);
        }
        DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
        DocumentBuilder db = dbf.newDocumentBuilder();
        Document doc = db.parse(url.openStream());
        Element root = doc.getDocumentElement();
        Map<String, String> attr = this.objectFactory.getAttributes(root);
        if (System.getProperty("container.address") != null) {
            attr.put("address", System.getProperty("container.address"));
        }
        if (System.getProperty("container.port") != null) {
            attr.put("port", System.getProperty("container.port"));
        }
        try {
            this.server = new Boolean(attr.get("server"));
        }
        catch (Exception e) {
            this.server = true;
        }
        if (!root.getNodeName().equalsIgnoreCase("experiment") && !root.getNodeName().equalsIgnoreCase("container")) {
            throw new Exception("Expecting root element to be 'container'!");
        }
        String host = InetAddress.getLocalHost().getHostAddress();
        this.name = InetAddress.getLocalHost().getHostName();
        if (this.name.indexOf(".") > 0) {
            this.name = this.name.substring(0, this.name.indexOf("."));
        }
        log.info("Default hostname is: {}", (Object)host);
        if (attr.containsKey("address") && !attr.get("address").trim().isEmpty()) {
            host = InetAddress.getByName(attr.get("address")).getHostAddress();
            log.info("Container address will be {}", (Object)host);
        }
        Integer port = 0;
        if (attr.containsKey("port") && !attr.get("port").trim().isEmpty()) {
            port = new Integer(attr.get("port"));
            log.info("Container port will be {}", port);
        }
        if (root.hasAttribute("id")) {
            this.name = root.getAttribute("id");
        }
        try {
            String nsClass = root.getAttribute("namingService");
            if (nsClass != null && !nsClass.trim().isEmpty()) {
                this.namingService = (NamingService)this.objectFactory.create(nsClass, attr);
            }
        }
        catch (Exception e) {
            log.error("Faild to instantiate naming service '{}': {}", (Object)root.getAttribute("namingService"), (Object)e.getMessage());
            e.printStackTrace();
            throw new Exception("Faild to instantiate naming service '" + root.getAttribute("namingService") + "': " + e.getMessage());
        }
        if (this.namingService == null) {
            if (attr.containsKey("address")) {
                log.info("Creating RMI naming-service...");
                System.setProperty("java.rmi.server.hostname", host);
                this.namingService = new RMINamingService(this.name, host, port, true);
            } else {
                log.info("No address specified, using local naming-service. Container will not be able to reference other containers!");
                this.namingService = new DefaultNamingService();
            }
        }
        if (this.namingService instanceof LifeCycle) {
            this.lifeCyleObjects.add((LifeCycle)((Object)this.namingService));
        }
        log.info("Using naming-service {}", this.namingService);
        this.context = new ContainerContext(this.name, this.namingService);
        this.init(doc);
    }

    public String getName() {
        return this.name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public ContainerContext getContext() {
        return this.context;
    }

    public List<AbstractProcess> getProcesses() {
        return this.processes;
    }

    public List<ServiceReference> getServiceRefs() {
        return this.serviceRefs;
    }

    private void init(Document doc) throws Exception {
        String name;
        Element root = doc.getDocumentElement();
        if (root.getAttribute("import") != null) {
            String[] pkgs;
            for (String pkg : pkgs = root.getAttribute("import").split(",")) {
                if (pkg.trim().isEmpty()) continue;
                this.objectFactory.addPackage(pkg.trim());
            }
        }
        if ((name = root.getAttribute("name")) == null) {
            name = "local";
        }
        for (DocumentHandler handle : this.documentHandler) {
            handle.handle(this, doc);
        }
        this.objectFactory.addVariables(this.context.getProperties());
        NodeList children = root.getChildNodes();
        if (this.context.getProperties().get("container.datafactory") != null) {
            log.info("Using {} as default DataFactory for this container...", (Object)this.context.getProperties().get("container.datafactory"));
            Class<?> dataFactoryClass = Class.forName(this.context.getProperties().get("container.datafactory"));
            DataFactory.setDefaultDataFactory((DataFactory)dataFactoryClass.newInstance());
        }
        for (int i = 0; i < children.getLength(); ++i) {
            Node node = children.item(i);
            if (node.getNodeType() != 1) continue;
            Element element = (Element)node;
            for (ElementHandler handler : this.elementHandler.values()) {
                if (!handler.handlesElement(element)) continue;
                handler.handleElement(this, element);
            }
        }
        this.connectProcesses();
    }

    protected void connectProcesses() throws Exception {
        log.debug("Wiring process inputs to data-streams...");
        for (AbstractProcess aprocess : this.processes) {
            if (!(aprocess instanceof Process)) continue;
            Process process = (Process)aprocess;
            String input = process.getInput();
            if (input == null) {
                throw new RuntimeException("Process '" + process + "' is not connected to any input-stream!");
            }
            DataStream stream = this.streams.get(input);
            if (stream == null) {
                log.debug("No stream defined for name '{}' - creating a listener-queue for key '{}'", (Object)input, (Object)input);
                BlockingQueue q = new BlockingQueue();
                this.listeners.put(input, q);
                this.setStream(input, q);
                this.context.register(input, q);
                stream = q;
            }
            process.setDataStream(stream);
        }
    }

    protected void injectServices() throws Exception {
        ServiceInjection.injectServices(this.getServiceRefs(), this.getContext());
    }

    public void setStream(String id, DataStream stream) {
        this.streams.put(id, stream);
    }

    public void run() throws Exception {
        if (!container.contains(this)) {
            container.add(this);
        }
        this.startTime = System.currentTimeMillis();
        ContainerController controller = new ContainerController(this);
        log.info("Registering container-controller {}", controller);
        this.namingService.register(".ctrl", controller);
        this.injectServices();
        if (this.streams.isEmpty() && this.listeners.isEmpty()) {
            throw new Exception("No data-stream defined!");
        }
        log.debug("Need to handle {} sources: {}", this.streams.size(), this.streams.keySet());
        log.debug("Experiment contains {} stream processes", this.processes.size());
        log.debug("Initializing all DataStreams...");
        for (String name : this.streams.keySet()) {
            DataStream stream = this.streams.get(name);
            log.debug("Initializing stream '{}'", (Object)name);
            stream.init();
        }
        log.debug("Creating {} active processes...", this.processes.size());
        long start = System.currentTimeMillis();
        for (AbstractProcess spu : this.processes) {
            spu.setDaemon(true);
            ProcessContextImpl ctx = new ProcessContextImpl(this.context);
            log.debug("Initializing process with process-context...");
            spu.init(ctx);
            log.debug("Starting stream-process [{}]", spu);
            spu.start();
            log.debug("Stream-process started.");
        }
        Thread.sleep(1000L);
        log.debug("waiting for processes to finish...");
        while (!this.processes.isEmpty()) {
            log.trace("{} processes running", this.processes.size());
            Iterator<AbstractProcess> it = this.processes.iterator();
            while (it.hasNext()) {
                AbstractProcess p = it.next();
                if (!this.server && p instanceof Monitor) {
                    it.remove();
                    continue;
                }
                if (!p.isRunning()) {
                    log.debug("Process '{}' is finished.", p);
                    log.debug("Removing finished process {}", p);
                    it.remove();
                    continue;
                }
                log.trace("    {} is still running", p);
            }
            if ("true".equalsIgnoreCase(System.getProperty("debug.memory"))) {
                System.out.println("Until now, " + DataFactory.getDataItemsCreated() + " data items have been created.");
            }
            try {
                Thread.sleep(500L);
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
        long end = System.currentTimeMillis();
        log.info("Running processes: {}", this.processes);
        log.info("ProcessContainer finished all processes after about {} ms", end - start);
    }

    public Set<String> getStreamListenerNames() {
        return this.listeners.keySet();
    }

    public ObjectFactory getObjectFactory() {
        return this.objectFactory;
    }

    public void dataArrived(String key, Data item) {
        if (this.listeners.containsKey(key)) {
            log.debug("Adding item {} into queue {}", item, (Object)key);
            this.listeners.get(key).process(item);
        } else {
            log.warn("No listener defined for {}", (Object)key);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() {
        List<AbstractProcess> list = this.processes;
        synchronized (list) {
            for (AbstractProcess process : this.processes) {
                log.debug("Sending SHUTDOWN signal to process {}", process);
                try {
                    process.finish();
                }
                catch (Exception e) {
                    log.error("Failed to properly shutdown process: {}", (Object)e.getMessage());
                }
            }
        }
        log.debug("Sending finish() signal to life-cycle objects...");
        for (LifeCycle object : this.lifeCyleObjects) {
            try {
                log.debug("   sending finish() to {}", object);
                object.finish();
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
        while (!this.processes.isEmpty()) {
            log.info("Waiting for {} processes to finish...", this.processes.size());
            try {
                Iterator<AbstractProcess> it = this.processes.iterator();
                while (it.hasNext()) {
                    AbstractProcess process = it.next();
                    if (process.isAlive()) continue;
                    log.debug("another process finished...");
                    it.remove();
                }
                log.debug("Waiting for {} processes to finish...", this.processes.size());
                log.debug("   processes: {}", this.processes);
                try {
                    Thread.sleep(500L);
                }
                catch (Exception exception) {
                }
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
        log.info("Container shut down.");
    }

    static {
        log.debug("Adding container shutdown-hook");
        Runtime.getRuntime().addShutdownHook(new Thread(){

            @Override
            public void run() {
                if ("disabled".equalsIgnoreCase(System.getProperty("container.shutdown-hook"))) {
                    log.warn("Shutdown-hook disabled...");
                    return;
                }
                log.debug("Running shutdown-hook...");
                for (ProcessContainer pc : container) {
                    log.debug("Sending shutdown signal to {}", pc);
                    pc.shutdown();
                }
            }
        });
        for (String ext : extensions = new String[]{"stream.moa.MoaObjectFactory", "stream.script.JavaScriptProcessorFactory"}) {
            try {
                Class<?> clazz = Class.forName(ext);
                ObjectCreator creator = (ObjectCreator)clazz.newInstance();
                ObjectFactory.registerObjectCreator(creator);
                log.debug("Registered extension {}", (Object)ext);
            }
            catch (Exception e) {
                log.debug("Failed to register extension '{}': {}", (Object)ext, (Object)e.getMessage());
                if (!log.isTraceEnabled()) continue;
                e.printStackTrace();
            }
        }
    }
}

