/*
 * Decompiled with CFR 0.152.
 */
package org.streampipes.wrapper.siddhi.engine;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.streampipes.model.graph.DataProcessorInvocation;
import org.streampipes.wrapper.params.binding.EventProcessorBindingParams;
import org.streampipes.wrapper.routing.SpOutputCollector;
import org.streampipes.wrapper.runtime.EventProcessor;
import org.streampipes.wrapper.siddhi.manager.SpSiddhiManager;
import org.wso2.siddhi.core.SiddhiAppRuntime;
import org.wso2.siddhi.core.SiddhiManager;
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.core.stream.input.InputHandler;
import org.wso2.siddhi.core.stream.output.StreamCallback;

public abstract class SiddhiEventEngine<T extends EventProcessorBindingParams>
extends EventProcessor<T> {
    private StringBuilder siddhiAppString = new StringBuilder();
    private SiddhiAppRuntime siddhiAppRuntime;
    private Map<String, InputHandler> siddhiInputHandlers = new HashMap<String, InputHandler>();
    private List<String> inputStreamNames = new ArrayList<String>();
    private static final Logger LOG = LoggerFactory.getLogger(SiddhiEventEngine.class);

    public SiddhiEventEngine(T params) {
        super(params);
    }

    public void bind(T parameters, final SpOutputCollector collector) {
        if (parameters.getInEventTypes().size() != ((DataProcessorInvocation)parameters.getGraph()).getInputStreams().size()) {
            throw new IllegalArgumentException("Input parameters do not match!");
        }
        SiddhiManager siddhiManager = SpSiddhiManager.INSTANCE.getSiddhiManager();
        LOG.info("Configuring event types for graph " + ((DataProcessorInvocation)parameters.getGraph()).getName());
        parameters.getInEventTypes().forEach((key, value) -> {
            Map inTypeMap = value;
            this.registerEventTypeIfNotExists((String)key, inTypeMap);
            this.inputStreamNames.add(this.fixEventName((String)key));
        });
        String fromStatement = this.fromStatement(this.inputStreamNames, parameters);
        String selectStatement = this.selectStatement(parameters);
        this.registerStatements(fromStatement, selectStatement, this.getOutputTopicName(parameters));
        this.siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(this.siddhiAppString.toString());
        parameters.getInEventTypes().forEach((key, value) -> this.siddhiInputHandlers.put((String)key, this.siddhiAppRuntime.getInputHandler(this.fixEventName((String)key))));
        this.siddhiAppRuntime.addCallback(this.fixEventName(this.getOutputTopicName(parameters)), new StreamCallback((EventProcessorBindingParams)parameters){
            final /* synthetic */ EventProcessorBindingParams val$parameters;
            {
                this.val$parameters = eventProcessorBindingParams;
            }

            public void receive(Event[] events) {
                for (Event event : events) {
                    collector.onEvent(SiddhiEventEngine.this.toMap(event, this.val$parameters));
                }
            }
        });
    }

    private String getOutputTopicName(T parameters) {
        return ((DataProcessorInvocation)parameters.getGraph()).getOutputStream().getEventGrounding().getTransportProtocol().getTopicDefinition().getActualTopicName();
    }

    private Map<String, Object> toMap(Event event, T parameters) {
        HashMap<String, Object> outMap = new HashMap<String, Object>();
        int i = 0;
        for (String key : parameters.getOutEventType().keySet()) {
            outMap.put(key, event.getData(i));
            ++i;
        }
        return outMap;
    }

    private void registerEventTypeIfNotExists(String eventTypeName, Map<String, Object> typeMap) {
        String defineStreamPrefix = "define stream " + this.fixEventName(eventTypeName);
        StringJoiner joiner = new StringJoiner(",");
        for (String key : typeMap.keySet()) {
            joiner.add(key + " " + this.toType((Class)typeMap.get(key)));
        }
        this.siddhiAppString.append(defineStreamPrefix);
        this.siddhiAppString.append("(");
        this.siddhiAppString.append(joiner.toString());
        this.siddhiAppString.append(");\n");
    }

    private String toType(Class<?> o) {
        System.out.println(o.getCanonicalName());
        if (o.equals(Long.class)) {
            return "LONG";
        }
        if (o.equals(Integer.class)) {
            return "INT";
        }
        if (o.equals(Double.class)) {
            return "DOUBLE";
        }
        if (o.equals(Float.class)) {
            return "FLOAT";
        }
        if (o.equals(Boolean.class)) {
            return "BOOL";
        }
        return "STRING";
    }

    private void registerStatements(String fromStatement, String selectStatement, String outputStream) {
        this.siddhiAppString.append(fromStatement).append("\n").append(selectStatement).append("\n").append("insert into ").append(this.fixEventName(outputStream)).append(";");
        LOG.info("Registering statement: \n" + this.siddhiAppString.toString());
    }

    public void onEvent(Map<String, Object> event, String sourceInfo) {
        try {
            this.siddhiInputHandlers.get(sourceInfo).send(this.toObjArr(event));
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private Object[] toObjArr(Map<String, Object> event) {
        return event.values().toArray();
    }

    public void discard() {
        this.siddhiAppRuntime.shutdown();
    }

    protected abstract String fromStatement(List<String> var1, T var2);

    protected abstract String selectStatement(T var1);

    private String fixEventName(String eventName) {
        return eventName.replaceAll("\\.", "");
    }
}

