/*
 * Decompiled with CFR 0.152.
 */
package org.yestech.lib.camel;

import java.util.concurrent.ExecutorService;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.impl.DefaultMessage;
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.util.concurrent.ExecutorServiceHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.terracotta.message.pipe.Pipe;
import org.yestech.lib.camel.TerracottaEndpoint;

public class TerracottaConsumer
extends ServiceSupport
implements Consumer,
Runnable {
    private static final Logger logger = LoggerFactory.getLogger(TerracottaConsumer.class);
    private TerracottaEndpoint endpoint;
    private Processor processor;
    private ExecutorService executor;

    public TerracottaConsumer(TerracottaEndpoint endpoint, Processor processor) {
        this.endpoint = endpoint;
        this.processor = processor;
    }

    public String toString() {
        return "TerracottaConsumer: " + this.endpoint.getEndpointUri();
    }

    @Override
    public void run() {
        Pipe<Object> queue = this.endpoint.getPipe();
        while (queue != null && this.isRunAllowed()) {
            DefaultExchange exchange = new DefaultExchange(this.endpoint.getCamelContext());
            try {
                Object pipeMessage = queue.take();
                DefaultMessage message = new DefaultMessage();
                message.setBody(pipeMessage);
                exchange.setIn((Message)message);
            }
            catch (InterruptedException e) {
                logger.debug("Wait interrupted, are we stopping? " + (this.isStopping() || this.isStopped()));
                continue;
            }
            if (exchange == null) continue;
            if (this.isRunAllowed()) {
                try {
                    this.processor.process((Exchange)exchange);
                }
                catch (Exception e) {
                    logger.error("TerracottaConsumer pipe caught: " + e, (Throwable)e);
                }
                continue;
            }
            logger.warn("This consumer is stopped during polling an exchange, so putting it back on the TerracottaConsumer pipe: " + exchange);
            try {
                queue.put((Object)exchange);
            }
            catch (InterruptedException e) {
                logger.debug("Sleep interrupted, are we stopping? " + (this.isStopping() || this.isStopped()));
            }
        }
    }

    protected void doStart() throws Exception {
        int poolSize = this.endpoint.getConcurrentConsumers();
        this.executor = ExecutorServiceHelper.newFixedThreadPool((int)poolSize, (String)this.endpoint.getEndpointUri(), (boolean)true);
        for (int i = 0; i < poolSize; ++i) {
            this.executor.execute(this);
        }
        this.endpoint.onStarted(this);
    }

    protected void doStop() throws Exception {
        this.endpoint.onStopped(this);
        this.executor.shutdownNow();
        this.executor = null;
    }
}

