/*
 * Decompiled with CFR 0.152.
 */
package org.symphonyoss.s2.fugue.google.pubsub;

import com.google.api.core.ApiService;
import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.Subscription;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.symphonyoss.s2.common.fault.FaultAccumulator;
import org.symphonyoss.s2.common.fault.TransactionFault;
import org.symphonyoss.s2.fugue.config.IConfiguration;
import org.symphonyoss.s2.fugue.core.trace.ITraceContextTransactionFactory;
import org.symphonyoss.s2.fugue.google.pubsub.GoogleAsyncSubscriber;
import org.symphonyoss.s2.fugue.naming.INameFactory;
import org.symphonyoss.s2.fugue.naming.Name;
import org.symphonyoss.s2.fugue.pipeline.IThreadSafeErrorConsumer;
import org.symphonyoss.s2.fugue.pipeline.IThreadSafeRetryableConsumer;
import org.symphonyoss.s2.fugue.pubsub.AbstractSubscriberManager;
import org.symphonyoss.s2.fugue.pubsub.ISubscription;

public class GoogleAsyncSubscriberManager
extends AbstractSubscriberManager<GoogleAsyncSubscriberManager> {
    private static final Logger log_ = LoggerFactory.getLogger(GoogleAsyncSubscriberManager.class);
    private final String projectId_;
    private List<Subscriber> subscriberList_ = new LinkedList<Subscriber>();
    List<GoogleAsyncSubscriber> receiverList_ = new LinkedList<GoogleAsyncSubscriber>();
    private int subscriptionErrorCnt_;

    private GoogleAsyncSubscriberManager(Builder builder) {
        super(GoogleAsyncSubscriberManager.class, (AbstractSubscriberManager.Builder)builder);
        this.projectId_ = builder.projectId_;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void initSubscription(ISubscription subscription) {
        for (Name subscriptionName : subscription.getSubscriptionNames()) {
            log_.info("Validating subscription " + subscriptionName + "...");
            this.validateSubcription(subscriptionName.toString());
        }
        if (this.subscriptionErrorCnt_ > 0) {
            throw new IllegalStateException("There are " + this.subscriptionErrorCnt_ + " subscription errors.");
        }
        long threadsPerSubscription = this.config_.getLong("subscriberThreadsPerSubscription", 10L);
        int subscriberThreadPoolSize = this.config_.getInt("subscriberThreadPoolSize", 4);
        long maxOutstandingElementCount = this.config_.getLong("maxOutstandingElementCount", threadsPerSubscription);
        threadsPerSubscription = 1L;
        subscriberThreadPoolSize = 1;
        maxOutstandingElementCount = 1L;
        log_.info("Starting subscriptions threadsPerSubscription=" + threadsPerSubscription + " subscriberThreadPoolSize=" + subscriberThreadPoolSize + " maxOutstandingElementCount=" + maxOutstandingElementCount + " ...");
        for (Name subscriptionName : subscription.getSubscriptionNames()) {
            log_.info("Subscribing to " + subscriptionName + " ...");
            GoogleAsyncSubscriber receiver = new GoogleAsyncSubscriber(this, this.getTraceFactory(), (IThreadSafeRetryableConsumer<String>)subscription.getConsumer(), subscriptionName.toString(), this.counter_, this.nameFactory_.getPodName());
            final ProjectSubscriptionName projectSubscriptionName = ProjectSubscriptionName.of((String)this.projectId_, (String)subscriptionName.toString());
            Subscriber.Builder builder = Subscriber.newBuilder((ProjectSubscriptionName)projectSubscriptionName, (MessageReceiver)receiver);
            Subscriber subscriber = builder.build();
            subscriber.addListener(new ApiService.Listener(){

                public void failed(ApiService.State from, Throwable failure) {
                    log_.error("Error for " + projectSubscriptionName + " from " + from, failure);
                }
            }, MoreExecutors.directExecutor());
            List<Subscriber> list = this.subscriberList_;
            synchronized (list) {
                this.subscriberList_.add(subscriber);
                this.receiverList_.add(receiver);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void startSubscriptions() {
        List<Subscriber> list = this.subscriberList_;
        synchronized (list) {
            for (Subscriber subscriber : this.subscriberList_) {
                subscriber.startAsync();
                log_.info("Subscribing to " + subscriber.getSubscriptionNameString() + "...");
            }
        }
    }

    private void validateSubcription(String subscriptionName) {
        try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create();){
            ProjectSubscriptionName projectSubscriptionName = ProjectSubscriptionName.of((String)this.projectId_, (String)subscriptionName);
            try {
                Subscription existing = subscriptionAdminClient.getSubscription(projectSubscriptionName);
                log_.info("Subscription " + subscriptionName + " exists with ack deadline " + existing.getAckDeadlineSeconds() + " seconds.");
            }
            catch (NotFoundException e) {
                log_.error("Subscription " + subscriptionName + " DOES NOT EXIST.");
                ++this.subscriptionErrorCnt_;
            }
            catch (StatusRuntimeException e) {
                log_.error("Subscription " + subscriptionName + " cannot be validated - lets hope....", (Throwable)e);
            }
        }
        catch (IOException e) {
            throw new TransactionFault((Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void stopSubscriptions() {
        List<Subscriber> list = this.subscriberList_;
        synchronized (list) {
            for (Subscriber subscriber : this.subscriberList_) {
                try {
                    log_.info("Stopping subscriber " + subscriber.getSubscriptionNameString() + "...");
                    subscriber.stopAsync().awaitTerminated();
                    log_.info("Stopped subscriber " + subscriber.getSubscriptionNameString());
                }
                catch (RuntimeException e) {
                    log_.error("Failed to stop subscriber " + subscriber.getSubscriptionNameString(), (Throwable)e);
                }
            }
        }
    }

    public static class Builder
    extends AbstractSubscriberManager.Builder<Builder, GoogleAsyncSubscriberManager> {
        private String projectId_;

        public Builder(INameFactory nameFactory, ITraceContextTransactionFactory traceFactory, IThreadSafeErrorConsumer<String> unprocessableMessageConsumer, IConfiguration config, String projectId) {
            super(Builder.class);
        }

        public Builder withProjectId(String projectId) {
            this.projectId_ = projectId;
            return (Builder)this.self();
        }

        public void validate(FaultAccumulator faultAccumulator) {
            super.validate(faultAccumulator);
            faultAccumulator.checkNotNull((Object)this.projectId_, "projectId");
        }

        protected GoogleAsyncSubscriberManager construct() {
            return new GoogleAsyncSubscriberManager(this);
        }
    }
}

