/*
 * Decompiled with CFR 0.152.
 */
package org.symphonyoss.s2.fugue.google.pubsub;

import com.google.api.core.ApiService;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.Subscription;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.symphonyoss.s2.common.fault.TransactionFault;
import org.symphonyoss.s2.common.immutable.ImmutableByteArray;
import org.symphonyoss.s2.fugue.config.IConfiguration;
import org.symphonyoss.s2.fugue.core.trace.ITraceContextTransactionFactory;
import org.symphonyoss.s2.fugue.counter.ICounter;
import org.symphonyoss.s2.fugue.google.pubsub.GoogleSubscriber;
import org.symphonyoss.s2.fugue.naming.INameFactory;
import org.symphonyoss.s2.fugue.naming.SubscriptionName;
import org.symphonyoss.s2.fugue.naming.TopicName;
import org.symphonyoss.s2.fugue.pipeline.IThreadSafeErrorConsumer;
import org.symphonyoss.s2.fugue.pipeline.IThreadSafeRetryableConsumer;
import org.symphonyoss.s2.fugue.pubsub.AbstractSubscriberManager;
import org.symphonyoss.s2.fugue.pubsub.SubscriptionImpl;

public class GoogleSubscriberManager
extends AbstractSubscriberManager<ImmutableByteArray, GoogleSubscriberManager> {
    private static final Logger log_ = LoggerFactory.getLogger(GoogleSubscriberManager.class);
    final String projectId_;
    final boolean startSubscriptions_;
    List<Subscriber> subscriberList_ = new LinkedList<Subscriber>();
    List<GoogleSubscriber> receiverList_ = new LinkedList<GoogleSubscriber>();
    int subscriptionErrorCnt_;
    private final IConfiguration pubSubConfig_;
    private ICounter counter_;

    public GoogleSubscriberManager(IConfiguration config, INameFactory nameFactory, String projectId, ITraceContextTransactionFactory traceFactory, IThreadSafeErrorConsumer<ImmutableByteArray> unprocessableMessageConsumer) {
        super(nameFactory, GoogleSubscriberManager.class, traceFactory, unprocessableMessageConsumer);
        this.projectId_ = projectId;
        this.startSubscriptions_ = true;
        this.pubSubConfig_ = config.getConfiguration("org/symphonyoss/s2/fugue/google/pubsub");
    }

    public GoogleSubscriberManager withCounter(ICounter counter) {
        this.counter_ = counter;
        return this;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void startSubscription(SubscriptionImpl<ImmutableByteArray> subscription) {
        for (TopicName topicName : subscription.getTopicNames()) {
            log_.info("Validating topic " + topicName + "...");
            SubscriptionName subscriptionName = this.nameFactory_.getSubscriptionName(topicName, subscription.getSubscriptionId());
            this.validateSubcription(topicName, subscriptionName);
        }
        if (this.startSubscriptions_) {
            if (this.subscriptionErrorCnt_ > 0) {
                throw new IllegalStateException("There are " + this.subscriptionErrorCnt_ + " subscription errors.");
            }
            long threadsPerSubscription = this.pubSubConfig_.getLong("subscriberThreadsPerSubscription", 10L);
            int subscriberThreadPoolSize = this.pubSubConfig_.getInt("subscriberThreadPoolSize", 4);
            long maxOutstandingElementCount = this.pubSubConfig_.getLong("maxOutstandingElementCount", threadsPerSubscription);
            threadsPerSubscription = 1L;
            subscriberThreadPoolSize = 1;
            maxOutstandingElementCount = 1L;
            log_.info("Starting subscriptions threadsPerSubscription=" + threadsPerSubscription + " subscriberThreadPoolSize=" + subscriberThreadPoolSize + " maxOutstandingElementCount=" + maxOutstandingElementCount + " ...");
            for (TopicName topicName : subscription.getTopicNames()) {
                log_.info("Subscribing to topic " + topicName + " ...");
                SubscriptionName subscriptionName = this.nameFactory_.getSubscriptionName(topicName, subscription.getSubscriptionId());
                this.validateSubcription(topicName, subscriptionName);
                GoogleSubscriber receiver = new GoogleSubscriber(this, this.getTraceFactory(), (IThreadSafeRetryableConsumer<ImmutableByteArray>)subscription.getConsumer(), subscriptionName, this.counter_);
                final ProjectSubscriptionName projectSubscriptionName = ProjectSubscriptionName.of((String)this.projectId_, (String)subscriptionName.toString());
                Subscriber.Builder builder = Subscriber.newBuilder((ProjectSubscriptionName)projectSubscriptionName, (MessageReceiver)receiver);
                InstantiatingExecutorProvider executorProvider = InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(subscriberThreadPoolSize).build();
                builder.setExecutorProvider((ExecutorProvider)executorProvider);
                builder.setFlowControlSettings(FlowControlSettings.newBuilder().setMaxOutstandingElementCount(Long.valueOf(maxOutstandingElementCount)).build());
                Subscriber subscriber = builder.build();
                subscriber.addListener(new ApiService.Listener(){

                    public void failed(ApiService.State from, Throwable failure) {
                        log_.error("Error for " + projectSubscriptionName + " from " + from, failure);
                    }
                }, MoreExecutors.directExecutor());
                List<Subscriber> list = this.subscriberList_;
                synchronized (list) {
                    this.subscriberList_.add(subscriber);
                    this.receiverList_.add(receiver);
                }
                subscriber.startAsync();
                log_.info("Subscribing to " + projectSubscriptionName + "...");
            }
        }
    }

    private void validateSubcription(TopicName topicName, SubscriptionName subscriptionName) {
        try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create();){
            ProjectSubscriptionName projectSubscriptionName = ProjectSubscriptionName.of((String)this.projectId_, (String)subscriptionName.toString());
            try {
                Subscription existing = subscriptionAdminClient.getSubscription(projectSubscriptionName);
                log_.info("Subscription " + subscriptionName + " on topic " + topicName + " exists with ack deadline " + existing.getAckDeadlineSeconds() + " seconds.");
            }
            catch (NotFoundException e) {
                log_.error("Subscription " + subscriptionName + " on topic " + topicName + " DOES NOT EXIST.");
                ++this.subscriptionErrorCnt_;
            }
            catch (StatusRuntimeException e) {
                log_.error("Subscription " + subscriptionName + " on topic " + topicName + " cannot be validated - lets hope....", (Throwable)e);
            }
        }
        catch (IOException e) {
            throw new TransactionFault((Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void stopSubscriptions() {
        List<Subscriber> list = this.subscriberList_;
        synchronized (list) {
            for (Subscriber subscriber : this.subscriberList_) {
                try {
                    log_.info("Stopping subscriber " + subscriber.getSubscriptionNameString() + "...");
                    subscriber.stopAsync().awaitTerminated();
                    log_.info("Stopped subscriber " + subscriber.getSubscriptionNameString());
                }
                catch (RuntimeException e) {
                    log_.error("Failed to stop subscriber " + subscriber.getSubscriptionNameString(), (Throwable)e);
                }
            }
        }
    }
}

