/*
 * Decompiled with CFR 0.152.
 */
package org.streampipes.wrapper.esper;

import com.espertech.esper.client.ConfigurationException;
import com.espertech.esper.client.EPServiceProvider;
import com.espertech.esper.client.EPStatement;
import com.espertech.esper.client.EventBean;
import com.espertech.esper.client.UpdateListener;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.streampipes.commons.Utils;
import org.streampipes.model.graph.DataProcessorInvocation;
import org.streampipes.model.runtime.Event;
import org.streampipes.wrapper.context.EventProcessorRuntimeContext;
import org.streampipes.wrapper.esper.AbstractQueueRunnable;
import org.streampipes.wrapper.esper.EsperEngineSettings;
import org.streampipes.wrapper.esper.StatementAwareQueue;
import org.streampipes.wrapper.esper.config.EsperEngineConfig;
import org.streampipes.wrapper.esper.writer.Writer;
import org.streampipes.wrapper.params.binding.EventProcessorBindingParams;
import org.streampipes.wrapper.routing.SpOutputCollector;
import org.streampipes.wrapper.runtime.EventProcessor;

public abstract class EsperEventEngine<T extends EventProcessorBindingParams>
implements EventProcessor<T> {
    protected EPServiceProvider epService;
    protected List<EPStatement> epStatements;
    private AbstractQueueRunnable<EventBean[]> queue;
    private List<String> eventTypeNames = new ArrayList<String>();
    private static final Logger LOG = LoggerFactory.getLogger(EsperEventEngine.class);

    public void onInvocation(T parameters, SpOutputCollector collector, EventProcessorRuntimeContext runtimeContext) {
        if (parameters.getInEventTypes().size() != ((DataProcessorInvocation)parameters.getGraph()).getInputStreams().size()) {
            throw new IllegalArgumentException("Input parameters do not match!");
        }
        this.epService = EsperEngineSettings.epService;
        LOG.info("Configuring event types for graph " + ((DataProcessorInvocation)parameters.getGraph()).getName());
        parameters.getInEventTypes().entrySet().forEach(e -> {
            Map inTypeMap = (Map)e.getValue();
            this.checkAndRegisterEventType((String)e.getKey(), inTypeMap);
        });
        this.checkAndRegisterEventType(((DataProcessorInvocation)parameters.getGraph()).getOutputStream().getEventGrounding().getTransportProtocol().getTopicDefinition().getActualTopicName(), parameters.getOutEventType());
        List<String> statements = this.statements(parameters);
        this.registerStatements(statements, collector, runtimeContext);
    }

    private void checkAndRegisterEventType(String key, Map<String, Object> typeMap) {
        HashMap<String, Object> newTypeMap = new HashMap<String, Object>();
        for (String objKey : typeMap.keySet()) {
            Object obj = typeMap.get(objKey);
            if (obj instanceof List) {
                String eventName = StringUtils.capitalize((String)objKey);
                this.registerEventTypeIfNotExists(eventName, (Map)((List)obj).get(0));
                newTypeMap.put(objKey, eventName + "[]");
                continue;
            }
            newTypeMap.put(objKey, obj);
        }
        this.registerEventTypeIfNotExists(key, newTypeMap);
    }

    private void registerEventTypeIfNotExists(String eventTypeName, Map<String, Object> typeMap) {
        try {
            LOG.info("Registering event type, " + eventTypeName);
            this.epService.getEPAdministrator().getConfiguration().addEventType(eventTypeName, typeMap);
            this.eventTypeNames.add(eventTypeName);
        }
        catch (ConfigurationException e) {
            e.printStackTrace();
            LOG.error("Event type does already exist, " + eventTypeName);
        }
    }

    private void registerStatements(List<String> statements, SpOutputCollector collector, EventProcessorRuntimeContext runtimeContext) {
        this.toEpStatement(statements);
        this.queue = new StatementAwareQueue(this.getWriter(collector, runtimeContext), 500000);
        this.queue.start();
        for (EPStatement epStatement : this.epStatements) {
            LOG.info("Registering statement " + epStatement.getText());
            if (epStatement.getText().startsWith("select")) {
                epStatement.addListener(EsperEventEngine.listenerSendingTo(this.queue));
            }
            epStatement.start();
        }
    }

    private void toEpStatement(List<String> statements) {
        if (this.epStatements == null) {
            this.epStatements = new ArrayList<EPStatement>();
        }
        for (String statement : statements) {
            this.epStatements.add(this.epService.getEPAdministrator().createEPL(statement));
        }
    }

    public void onEvent(Event event, SpOutputCollector collector) {
        this.epService.getEPRuntime().sendEvent(event.getRaw(), event.getSourceInfo().getSourceId());
    }

    public void onDetach() {
        LOG.info("Removing existing statements");
        for (EPStatement epStatement : this.epStatements) {
            this.epService.getEPAdministrator().getStatement(epStatement.getName()).removeAllListeners();
            this.epService.getEPAdministrator().getStatement(epStatement.getName()).stop();
            this.epService.getEPAdministrator().getStatement(epStatement.getName()).destroy();
        }
        this.epStatements.clear();
        for (String eventName : this.eventTypeNames) {
            try {
                this.epService.getEPAdministrator().getConfiguration().removeEventType(eventName, false);
            }
            catch (ConfigurationException ce) {
                LOG.error("Event type used in another statement which is still running, skipping...");
            }
        }
        this.queue.interrupt();
    }

    private static UpdateListener listenerSendingTo(final AbstractQueueRunnable<EventBean[]> queue) {
        return new UpdateListener(){

            public void update(EventBean[] newEvents, EventBean[] oldEvents) {
                try {
                    if (newEvents != null) {
                        queue.add(newEvents);
                    } else {
                        queue.add(oldEvents);
                    }
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
    }

    protected abstract List<String> statements(T var1);

    protected String fixEventName(String eventName) {
        return "`" + eventName + "`";
    }

    protected List<String> makeStatementList(String statement) {
        return Utils.createList((Object[])new String[]{statement});
    }

    protected Writer getWriter(SpOutputCollector collector, EventProcessorRuntimeContext runtimeContext) {
        return EsperEngineConfig.getDefaultWriter(collector, runtimeContext);
    }
}

