/*
 * Decompiled with CFR 0.152.
 */
package org.marketcetera.modules.cep.esper;

import com.espertech.esper.client.Configuration;
import com.espertech.esper.client.EPException;
import com.espertech.esper.client.EPServiceProvider;
import com.espertech.esper.client.EPServiceProviderManager;
import com.espertech.esper.client.EPStatement;
import com.espertech.esper.client.time.CurrentTimeEvent;
import com.espertech.esper.client.time.TimerControlEvent;
import java.io.File;
import java.io.Serializable;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.marketcetera.core.Pair;
import org.marketcetera.event.TimestampCarrier;
import org.marketcetera.metrics.ThreadedMetric;
import org.marketcetera.module.DataEmitter;
import org.marketcetera.module.DataEmitterSupport;
import org.marketcetera.module.DataFlowID;
import org.marketcetera.module.DataReceiver;
import org.marketcetera.module.DataRequest;
import org.marketcetera.module.IllegalRequestParameterValue;
import org.marketcetera.module.Module;
import org.marketcetera.module.ModuleException;
import org.marketcetera.module.ModuleURN;
import org.marketcetera.module.RequestDataException;
import org.marketcetera.module.RequestID;
import org.marketcetera.module.StopDataFlowException;
import org.marketcetera.module.UnsupportedDataTypeException;
import org.marketcetera.module.UnsupportedRequestParameterType;
import org.marketcetera.modules.cep.esper.CEPEsperProcessorMXBean;
import org.marketcetera.modules.cep.esper.Messages;
import org.marketcetera.modules.cep.system.CEPDataTypes;
import org.marketcetera.util.log.I18NBoundMessage;
import org.marketcetera.util.log.I18NBoundMessage1P;
import org.marketcetera.util.misc.ClassVersion;
import org.w3c.dom.Node;

@ClassVersion(value="$Id: CEPEsperProcessor.java 16841 2014-02-20 19:59:04Z colin $")
public class CEPEsperProcessor
extends Module
implements DataReceiver,
DataEmitter,
CEPEsperProcessorMXBean {
    private final ThreadLocal<Integer> mSelfPostingEvents = new ThreadLocal<Integer>(){

        @Override
        protected Integer initialValue() {
            return 0;
        }
    };
    private EPServiceProvider mService;
    private final Map<RequestID, List<EPStatement>> mRequests = new Hashtable<RequestID, List<EPStatement>>();
    private String mConfiguration;
    private volatile boolean mUseExternalTime;
    private static final String PATTERN_QUERY_PREFIX = "p:";
    private volatile ProcessingDelegate mDelegate;

    protected CEPEsperProcessor(ModuleURN inURN, boolean inAutoStart) {
        super(inURN, inAutoStart);
    }

    public void requestData(DataRequest inRequest, DataEmitterSupport inSupport) throws UnsupportedRequestParameterType, IllegalRequestParameterValue {
        Object[] stmts;
        if (inRequest == null) {
            throw new IllegalRequestParameterValue(this.getURN(), null);
        }
        Object obj = inRequest.getData();
        if (obj == null) {
            throw new IllegalRequestParameterValue(this.getURN(), null);
        }
        if (obj instanceof String) {
            stmts = new String[]{(String)obj};
        } else if (obj instanceof String[]) {
            stmts = (String[])obj;
            if (stmts.length < 1) {
                throw new IllegalRequestParameterValue(this.getURN(), (Object)stmts);
            }
        } else {
            throw new UnsupportedRequestParameterType(this.getURN(), obj);
        }
        try {
            this.getDelegate().processRequest((String[])stmts, inSupport);
        }
        catch (RequestDataException e) {
            throw new IllegalRequestParameterValue((Throwable)e, (I18NBoundMessage)new I18NBoundMessage1P(Messages.ERROR_CREATING_STATEMENTS, (Serializable)((Object)Arrays.toString(stmts))));
        }
    }

    public void cancel(DataFlowID inFlowID, RequestID inRequestID) {
        this.getDelegate().cancelRequest(inFlowID, inRequestID);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void receiveData(DataFlowID inFlowID, Object inData) throws UnsupportedDataTypeException, StopDataFlowException {
        ThreadedMetric.event((String)"cep-IN", (Object[])new Object[0]);
        if (inData != null) {
            this.getDelegate().preProcessData(inFlowID, inData);
            int selfPostedCounter = this.mSelfPostingEvents.get();
            boolean fSelfPostedEvent = selfPostedCounter > 0;
            this.mSelfPostingEvents.set(selfPostedCounter + 1);
            try {
                if (inData instanceof Map) {
                    if (fSelfPostedEvent) {
                        this.mService.getEPRuntime().route((Map)inData, "map");
                    } else {
                        this.mService.getEPRuntime().sendEvent((Map)inData, "map");
                    }
                } else if (inData instanceof Node) {
                    if (fSelfPostedEvent) {
                        this.mService.getEPRuntime().route((Node)inData);
                    } else {
                        this.mService.getEPRuntime().sendEvent((Node)inData);
                    }
                } else if (fSelfPostedEvent) {
                    this.mService.getEPRuntime().route(inData);
                } else {
                    this.mService.getEPRuntime().sendEvent(inData);
                }
            }
            finally {
                this.mSelfPostingEvents.set(selfPostedCounter);
            }
        }
    }

    @Override
    public String getConfiguration() {
        return this.mConfiguration;
    }

    @Override
    public void setConfiguration(String inConfiguration) {
        if (this.getState().isStarted()) {
            throw new IllegalStateException(Messages.ERROR_MODULE_ALREADY_STARTED.getText());
        }
        this.mConfiguration = inConfiguration;
    }

    @Override
    public String[] getStatementNames() {
        if (this.getState().isStarted()) {
            return this.mService.getEPAdministrator().getStatementNames();
        }
        throw new IllegalStateException(Messages.ERROR_MODULE_NOT_STARTED.getText());
    }

    @Override
    public long getNumEventsReceived() {
        if (this.getState().isStarted()) {
            return this.mService.getEPRuntime().getNumEventsEvaluated();
        }
        throw new IllegalStateException(Messages.ERROR_MODULE_NOT_STARTED.getText());
    }

    @Override
    public boolean isUseExternalTime() {
        return this.mUseExternalTime;
    }

    @Override
    public void setUseExternalTime(boolean inUseExternalTime) {
        if (this.getState().isStarted()) {
            throw new IllegalStateException(Messages.ERROR_MODULE_ALREADY_STARTED.getText());
        }
        this.mUseExternalTime = inUseExternalTime;
    }

    protected CEPEsperProcessor(ModuleURN inURN) {
        super(inURN, true);
    }

    protected void preStart() throws ModuleException {
        String configFile = this.getConfiguration();
        Configuration configuration = new Configuration();
        try {
            if (configFile != null) {
                try {
                    URL u = new URL(configFile);
                    configuration.configure(u);
                }
                catch (MalformedURLException ignore) {
                    File f = new File(configFile);
                    if (f.isFile()) {
                        configuration.configure(f);
                    }
                    configuration.configure(configFile);
                }
            }
            for (Pair stringClassPair : CEPDataTypes.REQUEST_PRECANNED_TYPES) {
                if (((String)stringClassPair.getFirstMember()).equals("map")) {
                    configuration.addEventType("map", new Properties());
                    continue;
                }
                configuration.addEventType((String)stringClassPair.getFirstMember(), (Class)stringClassPair.getSecondMember());
            }
            configuration.addEventType("timeCarrier", TimestampCarrier.class);
            this.mService = EPServiceProviderManager.getProvider((String)this.getURN().instanceName(), (Configuration)configuration);
            if (this.isUseExternalTime()) {
                this.mService.getEPRuntime().sendEvent((Object)new TimerControlEvent(TimerControlEvent.ClockType.CLOCK_EXTERNAL));
                this.mDelegate = new ExternalTimeDelegate();
            } else {
                this.mDelegate = new RegularDelegate();
            }
        }
        catch (EPException e) {
            throw new ModuleException((Throwable)e, (I18NBoundMessage)Messages.ERROR_CONFIGURING_ESPER.getMessage());
        }
    }

    protected void preStop() {
        this.mService.destroy();
        this.mService = null;
    }

    protected ArrayList<EPStatement> createStatements(String ... inQuery) throws EPException {
        ArrayList<EPStatement> stmts = new ArrayList<EPStatement>(inQuery.length);
        try {
            for (String query : inQuery) {
                if (query.startsWith(PATTERN_QUERY_PREFIX)) {
                    stmts.add(this.mService.getEPAdministrator().createPattern(query.substring(PATTERN_QUERY_PREFIX.length())));
                    continue;
                }
                stmts.add(this.mService.getEPAdministrator().createEPL(query));
            }
        }
        catch (EPException ex) {
            for (EPStatement stmt : stmts) {
                stmt.destroy();
            }
            throw ex;
        }
        return stmts;
    }

    private ProcessingDelegate getDelegate() {
        return this.mDelegate;
    }

    public static class Subscriber {
        private DataEmitterSupport mSupport;

        private Subscriber(DataEmitterSupport inSupport) {
            this.mSupport = inSupport;
        }

        public void update(Map<?, ?> inMap) {
            ThreadedMetric.event((String)"cep-OUT", (Object[])new Object[0]);
            if (inMap != null && inMap.size() == 1) {
                this.mSupport.send(inMap.values().iterator().next());
            } else {
                this.mSupport.send(inMap);
            }
        }
    }

    private class ExternalTimeDelegate
    extends RegularDelegate {
        private final Map<DataFlowID, List<Pair<DataEmitterSupport, String[]>>> mUnprocessedRequests;

        private ExternalTimeDelegate() {
            this.mUnprocessedRequests = new Hashtable<DataFlowID, List<Pair<DataEmitterSupport, String[]>>>();
        }

        @Override
        public void processRequest(String[] inStmts, DataEmitterSupport inSupport) {
            List<Pair<DataEmitterSupport, String[]>> emitterList = this.mUnprocessedRequests.get(inSupport.getFlowID());
            if (emitterList == null) {
                emitterList = new LinkedList<Pair<DataEmitterSupport, String[]>>();
                this.mUnprocessedRequests.put(inSupport.getFlowID(), emitterList);
            }
            emitterList.add((Pair<DataEmitterSupport, String[]>)new Pair((Object)inSupport, (Object)inStmts));
        }

        @Override
        public void cancelRequest(DataFlowID inFlowID, RequestID inRequestID) {
            super.cancelRequest(inFlowID, inRequestID);
            List<Pair<DataEmitterSupport, String[]>> reqList = this.mUnprocessedRequests.get(inFlowID);
            if (reqList != null) {
                Iterator<Pair<DataEmitterSupport, String[]>> iterator = reqList.iterator();
                while (iterator.hasNext()) {
                    Pair<DataEmitterSupport, String[]> pair = iterator.next();
                    if (!((DataEmitterSupport)pair.getFirstMember()).getRequestID().equals((Object)inRequestID)) continue;
                    iterator.remove();
                }
                if (reqList.isEmpty()) {
                    this.mUnprocessedRequests.remove(inFlowID);
                }
            }
        }

        @Override
        public void preProcessData(DataFlowID inFlowID, Object inData) throws StopDataFlowException {
            if (inData instanceof TimestampCarrier) {
                CEPEsperProcessor.this.mService.getEPRuntime().sendEvent((Object)new CurrentTimeEvent(((TimestampCarrier)inData).getTimeMillis()));
                List<Pair<DataEmitterSupport, String[]>> reqList = this.mUnprocessedRequests.remove(inFlowID);
                if (reqList != null) {
                    for (Pair<DataEmitterSupport, String[]> oneRequest : reqList) {
                        try {
                            super.processRequest((String[])oneRequest.getSecondMember(), (DataEmitterSupport)oneRequest.getFirstMember());
                        }
                        catch (RequestDataException e) {
                            throw new StopDataFlowException((Throwable)e, (I18NBoundMessage)new I18NBoundMessage1P(Messages.ERROR_CREATING_STATEMENTS, (Serializable)((Object)Arrays.toString((Object[])oneRequest.getSecondMember()))));
                        }
                    }
                }
            }
        }
    }

    private class RegularDelegate
    implements ProcessingDelegate {
        private RegularDelegate() {
        }

        @Override
        public void processRequest(String[] inStmts, DataEmitterSupport inSupport) throws RequestDataException {
            ArrayList<EPStatement> statements;
            try {
                statements = CEPEsperProcessor.this.createStatements(inStmts);
                statements.get(statements.size() - 1).setSubscriber((Object)new Subscriber(inSupport));
            }
            catch (EPException ex) {
                throw new RequestDataException((Throwable)ex);
            }
            CEPEsperProcessor.this.mRequests.put(inSupport.getRequestID(), statements);
        }

        @Override
        public void cancelRequest(DataFlowID inFlowID, RequestID inRequestID) {
            List stmts = (List)CEPEsperProcessor.this.mRequests.remove(inRequestID);
            if (stmts != null) {
                for (EPStatement s : stmts) {
                    s.destroy();
                }
            }
        }

        @Override
        public void preProcessData(DataFlowID inFlowID, Object inData) throws StopDataFlowException {
        }
    }

    private static interface ProcessingDelegate {
        public void processRequest(String[] var1, DataEmitterSupport var2) throws RequestDataException;

        public void cancelRequest(DataFlowID var1, RequestID var2);

        public void preProcessData(DataFlowID var1, Object var2) throws StopDataFlowException;
    }
}

