/*
 * Decompiled with CFR 0.152.
 */
package org.symphonyoss.s2.fugue.pubsub;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.symphonyoss.s2.common.concurrent.NamedThreadFactory;
import org.symphonyoss.s2.fugue.Fugue;
import org.symphonyoss.s2.fugue.config.IConfiguration;
import org.symphonyoss.s2.fugue.counter.IBusyCounter;
import org.symphonyoss.s2.fugue.pubsub.AbstractSubscriberManager;
import org.symphonyoss.s2.fugue.pubsub.IPullSubscriberManagerBuilder;

public abstract class AbstractPullSubscriberManager<T extends AbstractPullSubscriberManager<T>>
extends AbstractSubscriberManager<T> {
    private static final Logger log_ = LoggerFactory.getLogger(AbstractPullSubscriberManager.class);
    private final IBusyCounter busyCounter_;
    private int subscriberThreadPoolSize_;
    private int handlerThreadPoolSize_;
    private final LinkedBlockingQueue<Runnable> executorQueue_ = new LinkedBlockingQueue();
    private final LinkedBlockingQueue<Runnable> handlerQueue_ = new LinkedBlockingQueue();
    private ThreadPoolExecutor subscriberExecutor_;
    private ThreadPoolExecutor handlerExecutor_;

    protected AbstractPullSubscriberManager(Class<T> type, Builder<?, T> builder) {
        super(type, builder);
        this.busyCounter_ = ((Builder)builder).busyCounter_;
        IConfiguration subscriberConfig = this.config_.getConfiguration(builder.getConfigPath());
        this.subscriberThreadPoolSize_ = subscriberConfig.getInt("subscriberThreadPoolSize", 4);
        this.handlerThreadPoolSize_ = subscriberConfig.getInt("handlerThreadPoolSize", 9 * this.subscriberThreadPoolSize_);
    }

    protected IBusyCounter getBusyCounter() {
        return this.busyCounter_;
    }

    @Override
    public void start() {
        if (Fugue.isDebugSingleThread()) {
            this.subscriberThreadPoolSize_ = this.totalSubscriptionCnt_ == 0 ? 1 : this.totalSubscriptionCnt_;
            this.handlerThreadPoolSize_ = 1;
        } else {
            int min = this.getTotalSubscriptionCnt() * 2;
            if (this.subscriberThreadPoolSize_ < min) {
                log_.warn("Configured for " + this.subscriberThreadPoolSize_ + " subscriber threads for a total of " + this.getTotalSubscriptionCnt() + " subscriptions, using " + min + " subscriber threads");
                this.subscriberThreadPoolSize_ = min;
            }
            if (this.handlerThreadPoolSize_ < (min = this.subscriberThreadPoolSize_ * 2)) {
                log_.warn("Configured for " + this.handlerThreadPoolSize_ + " handler threads for " + this.subscriberThreadPoolSize_ + " subscriber treads, using " + min + " handler threads");
                this.handlerThreadPoolSize_ = min;
            }
        }
        log_.info("Starting AbstractPullSubscriberManager with " + this.subscriberThreadPoolSize_ + " subscriber threads and " + this.handlerThreadPoolSize_ + " handler threads for a total of " + this.getTotalSubscriptionCnt() + " subscriptions...");
        this.subscriberExecutor_ = new ThreadPoolExecutor(this.subscriberThreadPoolSize_, this.subscriberThreadPoolSize_, 10000L, TimeUnit.MILLISECONDS, this.executorQueue_, (ThreadFactory)new NamedThreadFactory("PubSub-subscriber"));
        this.handlerExecutor_ = new ThreadPoolExecutor(this.handlerThreadPoolSize_, this.handlerThreadPoolSize_, 10000L, TimeUnit.MILLISECONDS, this.handlerQueue_, (ThreadFactory)new NamedThreadFactory("PubSub-handler", true));
        super.start();
    }

    @Override
    protected void stopSubscriptions() {
        this.subscriberExecutor_.shutdown();
        try {
            if (!this.subscriberExecutor_.awaitTermination(60L, TimeUnit.SECONDS)) {
                this.subscriberExecutor_.shutdownNow();
                if (!this.subscriberExecutor_.awaitTermination(60L, TimeUnit.SECONDS)) {
                    System.err.println("Pool did not terminate");
                }
            }
        }
        catch (InterruptedException ie) {
            this.subscriberExecutor_.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    protected void submit(Runnable subscriber, boolean force) {
        if (force || this.executorQueue_.size() < this.subscriberThreadPoolSize_) {
            this.subscriberExecutor_.submit(subscriber);
        }
    }

    protected void printQueueSize() {
        log_.debug("Queue size " + this.executorQueue_.size());
    }

    ThreadPoolExecutor getHandlerExecutor() {
        return this.handlerExecutor_;
    }

    public static abstract class Builder<T extends Builder<T, B>, B extends AbstractPullSubscriberManager<B>>
    extends AbstractSubscriberManager.Builder<T, B>
    implements IPullSubscriberManagerBuilder<T, B> {
        private IBusyCounter busyCounter_;

        protected Builder(Class<T> type) {
            super(type);
        }

        @Override
        public T withBusyCounter(IBusyCounter busyCounter) {
            this.busyCounter_ = busyCounter;
            return (T)((Builder)this.self());
        }

        protected abstract String getConfigPath();
    }
}

