/*
 * Decompiled with CFR 0.152.
 */
package org.opencredo.esper.integration.throughput;

import org.opencredo.esper.EsperStatement;
import org.opencredo.esper.EsperTemplate;
import org.opencredo.esper.integration.IntegrationOperation;
import org.opencredo.esper.integration.MessageContext;
import org.opencredo.esper.integration.interceptor.EsperWireTap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.channel.ChannelInterceptor;
import org.springframework.integration.core.PollableChannel;

public class EsperChannelThroughputMonitor
implements InitializingBean,
DisposableBean {
    private static final String THROUGHPUT_SUFFIX = "throughput";
    private static final Logger LOG = LoggerFactory.getLogger(EsperChannelThroughputMonitor.class);
    private final AbstractMessageChannel channel;
    private final String sourceId;
    private String timeSample = "1 second";
    private long throughput;
    private EsperTemplate template;

    public EsperChannelThroughputMonitor(AbstractMessageChannel channel, String sourceId) {
        this.channel = channel;
        this.sourceId = sourceId;
    }

    public void setTimeSample(String timeSample) {
        this.timeSample = timeSample;
    }

    public long getThroughput() {
        return this.throughput;
    }

    public void afterPropertiesSet() {
        String streamName = this.sourceId + THROUGHPUT_SUFFIX;
        StringBuilder createSourceWindow = new StringBuilder();
        createSourceWindow.append("insert into ").append(streamName);
        createSourceWindow.append(" select sourceId, operation from ").append(MessageContext.class.getName());
        createSourceWindow.append(" where sourceId = '").append(this.sourceId).append("'");
        this.template = new EsperTemplate();
        this.template.addStatement(new EsperStatement(createSourceWindow.toString()));
        EsperWireTap wireTap = new EsperWireTap(this.template, this.sourceId);
        wireTap.setSendContext(true);
        this.setToAlwaysWantingToListenToEventPerMessageSent(wireTap);
        if (this.channel instanceof PollableChannel) {
            wireTap.setPostReceive(true);
            String countStreamName = streamName + "Count";
            StringBuilder sb = new StringBuilder();
            sb.append("insert into ").append(countStreamName).append("(ps_count, pr_count) ");
            sb.append("select count(PS) as ps_count, count(PR) as pr_count from pattern [every PS=");
            sb.append(streamName).append("(operation=").append(IntegrationOperation.class.getName()).append(".").append((Object)IntegrationOperation.POST_SEND).append(") OR every PR=");
            sb.append(streamName).append("(operation=").append(IntegrationOperation.class.getName()).append(".").append((Object)IntegrationOperation.POST_RECEIVE).append(")]");
            EsperStatement statement = new EsperStatement(sb.toString());
            this.template.addStatement(statement);
            sb = new StringBuilder();
            sb.append("select ps_count, pr_count, avg(pr_count) from ").append(countStreamName).append(".win:time_batch(").append(this.timeSample).append(")");
            statement = new EsperStatement(sb.toString());
            statement.setSubscriber((Object)this);
            this.template.addStatement(statement);
        } else {
            StringBuilder sb = new StringBuilder();
            sb.append("select count(*) as throughput from ").append(streamName);
            sb.append(".win:time_batch(").append(this.timeSample).append(")");
            sb.append(" where operation=").append(IntegrationOperation.class.getName()).append(".").append((Object)IntegrationOperation.POST_SEND);
            EsperStatement listenForSourceId = new EsperStatement(sb.toString());
            listenForSourceId.setSubscriber((Object)this);
            this.template.addStatement(listenForSourceId);
        }
        this.template.initialize();
        this.channel.addInterceptor((ChannelInterceptor)wireTap);
    }

    private void setToAlwaysWantingToListenToEventPerMessageSent(EsperWireTap wireTap) {
        wireTap.setPostSend(true);
        wireTap.setPreSend(false);
    }

    public void update(long throughput) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Received throughput of " + throughput + " on channel - " + this.channel.getComponentName());
        }
        this.throughput = throughput;
    }

    public void update(Long ps_count, Long pr_count, Double pr_avg) {
        ps_count = ps_count == null ? 0L : ps_count;
        pr_count = pr_count == null ? 0L : pr_count;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Sent throughput of " + ps_count + ", received throughput of " + pr_count + " average " + pr_avg + " on pollable channel - " + this.channel.getComponentName());
        }
        this.throughput = pr_count;
    }

    public void destroy() {
        this.template.cleanup();
    }
}

