/*
 * Decompiled with CFR 0.152.
 */
package org.symphonyoss.s2.fugue.pubsub;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.symphonyoss.s2.common.concurrent.NamedThreadFactory;
import org.symphonyoss.s2.fugue.config.IConfiguration;
import org.symphonyoss.s2.fugue.core.trace.ITraceContextTransactionFactory;
import org.symphonyoss.s2.fugue.counter.Counter;
import org.symphonyoss.s2.fugue.counter.ICounter;
import org.symphonyoss.s2.fugue.deploy.ExecutorBatch;
import org.symphonyoss.s2.fugue.deploy.IBatch;
import org.symphonyoss.s2.fugue.naming.INameFactory;
import org.symphonyoss.s2.fugue.pipeline.IThreadSafeErrorConsumer;
import org.symphonyoss.s2.fugue.pubsub.AbstractSubscriberManager;
import org.symphonyoss.s2.fugue.pubsub.ISubscriberManager;

public abstract class AbstractPullSubscriberManager<P, T extends ISubscriberManager<P, T>>
extends AbstractSubscriberManager<P, T> {
    private static final Logger log_ = LoggerFactory.getLogger(AbstractPullSubscriberManager.class);
    private final IConfiguration sqsConfig_;
    private ICounter counter_;
    private int subscriberThreadPoolSize_;
    private int handlerThreadPoolSize_;
    private final LinkedBlockingQueue<Runnable> executorQueue_ = new LinkedBlockingQueue();
    private final LinkedBlockingQueue<Runnable> handlerQueue_ = new LinkedBlockingQueue();
    private ThreadPoolExecutor subscriberExecutor_;
    private ThreadPoolExecutor handlerExecutor_;

    protected AbstractPullSubscriberManager(IConfiguration config, INameFactory nameFactory, Class<T> type, ITraceContextTransactionFactory traceFactory, IThreadSafeErrorConsumer<P> unprocessableMessageConsumer, String configPath) {
        super(nameFactory, type, traceFactory, unprocessableMessageConsumer);
        this.sqsConfig_ = config.getConfiguration(configPath);
    }

    public T withCounter(Counter counter) {
        this.counter_ = counter;
        return (T)((ISubscriberManager)this.self());
    }

    protected ICounter getCounter() {
        return this.counter_;
    }

    @Override
    public void start() {
        this.initialize();
        int min = this.getTotalSubscriptionCnt() * 2;
        if (this.subscriberThreadPoolSize_ < min) {
            log_.warn("Configured for " + this.subscriberThreadPoolSize_ + " subscriber threads for a total of " + this.getTotalSubscriptionCnt() + " subscriptions, using " + min + " subscriber threads");
            this.subscriberThreadPoolSize_ = min;
        }
        if (this.handlerThreadPoolSize_ < (min = this.subscriberThreadPoolSize_ * 2)) {
            log_.warn("Configured for " + this.handlerThreadPoolSize_ + " handler threads for " + this.subscriberThreadPoolSize_ + " subscriber treads, using " + min + " handler threads");
            this.handlerThreadPoolSize_ = min;
        }
        log_.info("Starting AbstractPullSubscriberManager with " + this.subscriberThreadPoolSize_ + " subscriber threads and " + this.handlerThreadPoolSize_ + " handler threads for a total of " + this.getTotalSubscriptionCnt() + " subscriptions...");
        this.subscriberExecutor_ = new ThreadPoolExecutor(this.subscriberThreadPoolSize_, this.subscriberThreadPoolSize_, 0L, TimeUnit.MILLISECONDS, this.executorQueue_, (ThreadFactory)new NamedThreadFactory("PubSub-subscriber"));
        this.handlerExecutor_ = new ThreadPoolExecutor(this.subscriberThreadPoolSize_, this.handlerThreadPoolSize_, 0L, TimeUnit.MILLISECONDS, this.handlerQueue_, (ThreadFactory)new NamedThreadFactory("PubSub-handler", true));
        super.start();
    }

    protected void initialize() {
        this.subscriberThreadPoolSize_ = this.sqsConfig_.getInt("subscriberThreadPoolSize", 4);
        this.handlerThreadPoolSize_ = this.sqsConfig_.getInt("handlerThreadPoolSize", 9 * this.subscriberThreadPoolSize_);
    }

    @Override
    protected void stopSubscriptions() {
        this.subscriberExecutor_.shutdown();
        try {
            if (!this.subscriberExecutor_.awaitTermination(60L, TimeUnit.SECONDS)) {
                this.subscriberExecutor_.shutdownNow();
                if (!this.subscriberExecutor_.awaitTermination(60L, TimeUnit.SECONDS)) {
                    System.err.println("Pool did not terminate");
                }
            }
        }
        catch (InterruptedException ie) {
            this.subscriberExecutor_.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    public void submit(Runnable subscriber, boolean force) {
        if (force || this.executorQueue_.size() < this.subscriberThreadPoolSize_) {
            this.subscriberExecutor_.submit(subscriber);
        }
    }

    protected void printQueueSize() {
        log_.debug("Queue size " + this.executorQueue_.size());
    }

    public IBatch newBatch() {
        return new ExecutorBatch((ExecutorService)this.handlerExecutor_);
    }
}

