/*
 * Decompiled with CFR 0.152.
 */
package org.symphonyoss.s2.fugue.google.pubsub;

import com.google.api.core.ApiService;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.ProjectSubscriptionName;
import java.util.LinkedList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.symphonyoss.s2.common.immutable.ImmutableByteArray;
import org.symphonyoss.s2.fugue.IConfigurationProvider;
import org.symphonyoss.s2.fugue.core.trace.ITraceContextFactory;
import org.symphonyoss.s2.fugue.google.pubsub.GoogleSubscriber;
import org.symphonyoss.s2.fugue.naming.INameFactory;
import org.symphonyoss.s2.fugue.naming.SubscriptionName;
import org.symphonyoss.s2.fugue.naming.TopicName;
import org.symphonyoss.s2.fugue.pipeline.IThreadSafeConsumer;
import org.symphonyoss.s2.fugue.pipeline.IThreadSafeRetryableConsumer;
import org.symphonyoss.s2.fugue.pubsub.AbstractSubscriberManager;
import org.symphonyoss.s2.fugue.pubsub.ISubscriberManager;
import org.symphonyoss.s2.fugue.pubsub.Subscription;

class GoogleAbstractSubscriberManager<T extends ISubscriberManager<ImmutableByteArray, T>>
extends AbstractSubscriberManager<ImmutableByteArray, T> {
    private static final Logger log_ = LoggerFactory.getLogger(GoogleAbstractSubscriberManager.class);
    final INameFactory nameFactory_;
    final IConfigurationProvider config_;
    final boolean startSubscriptions_;
    List<Subscriber> subscriberList_ = new LinkedList<Subscriber>();
    String projectId_;

    GoogleAbstractSubscriberManager(Class<T> type, INameFactory nameFactory, IConfigurationProvider config, ITraceContextFactory traceFactory, IThreadSafeConsumer<ImmutableByteArray> unprocessableMessageConsumer) {
        super(type, config, traceFactory, unprocessableMessageConsumer);
        this.nameFactory_ = nameFactory;
        this.config_ = config;
        this.startSubscriptions_ = true;
    }

    GoogleAbstractSubscriberManager(Class<T> type, INameFactory nameFactory, IConfigurationProvider config, ITraceContextFactory traceFactory) {
        super(type, config, traceFactory, null);
        this.nameFactory_ = nameFactory;
        this.config_ = config;
        this.startSubscriptions_ = false;
    }

    public void start() {
        IConfigurationProvider googleConfig = this.config_.getConfiguration("google");
        this.projectId_ = googleConfig.getRequiredString("projectId");
        log_.info("Starting GoogleSubscriberManager in project " + this.projectId_ + "...");
        super.start();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void startSubscription(Subscription<ImmutableByteArray> subscription) {
        if (this.startSubscriptions_) {
            for (String topic : subscription.getTopicNames()) {
                log_.info("Subscribing to topic " + topic + "...");
                TopicName topicName = this.nameFactory_.getTopicName(topic);
                SubscriptionName subscriptionName = this.nameFactory_.getSubscriptionName(topicName, subscription.getSubscriptionName());
                GoogleSubscriber receiver = new GoogleSubscriber(this, this.getTraceFactory(), (IThreadSafeRetryableConsumer<ImmutableByteArray>)subscription.getConsumer());
                final ProjectSubscriptionName projectSubscriptionName = ProjectSubscriptionName.of((String)this.projectId_, (String)subscriptionName.toString());
                Subscriber subscriber = Subscriber.newBuilder((ProjectSubscriptionName)projectSubscriptionName, (MessageReceiver)receiver).build();
                subscriber.addListener(new ApiService.Listener(){

                    public void failed(ApiService.State from, Throwable failure) {
                        log_.error("Error for " + projectSubscriptionName + " from " + from, failure);
                    }
                }, MoreExecutors.directExecutor());
                List<Subscriber> list = this.subscriberList_;
                synchronized (list) {
                    this.subscriberList_.add(subscriber);
                }
                subscriber.startAsync();
                log_.info("Subscribing to " + projectSubscriptionName + "...");
            }
        }
    }

    protected void stopSubscriptions() {
        for (Subscriber subscriber : this.subscriberList_) {
            try {
                subscriber.stopAsync();
                log_.info("Stopped subscriber " + subscriber.getSubscriptionNameString());
            }
            catch (RuntimeException e) {
                log_.error("Failed to stop subscriber " + subscriber.getSubscriptionNameString(), (Throwable)e);
            }
        }
    }
}

