/*
 * Decompiled with CFR 0.152.
 */
package org.symphonyoss.s2.fugue.google.pubsub;

import com.google.api.core.ApiService;
import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.symphonyoss.s2.common.fault.TransactionFault;
import org.symphonyoss.s2.common.immutable.ImmutableByteArray;
import org.symphonyoss.s2.fugue.core.trace.ITraceContextFactory;
import org.symphonyoss.s2.fugue.google.pubsub.GoogleSubscriber;
import org.symphonyoss.s2.fugue.naming.INameFactory;
import org.symphonyoss.s2.fugue.naming.SubscriptionName;
import org.symphonyoss.s2.fugue.naming.TopicName;
import org.symphonyoss.s2.fugue.pipeline.IThreadSafeConsumer;
import org.symphonyoss.s2.fugue.pipeline.IThreadSafeRetryableConsumer;
import org.symphonyoss.s2.fugue.pubsub.AbstractSubscriberManager;
import org.symphonyoss.s2.fugue.pubsub.ISubscriberManager;
import org.symphonyoss.s2.fugue.pubsub.Subscription;

class GoogleAbstractSubscriberManager<T extends ISubscriberManager<ImmutableByteArray, T>>
extends AbstractSubscriberManager<ImmutableByteArray, T> {
    private static final Logger log_ = LoggerFactory.getLogger(GoogleAbstractSubscriberManager.class);
    final INameFactory nameFactory_;
    final String projectId_;
    final boolean startSubscriptions_;
    List<Subscriber> subscriberList_ = new LinkedList<Subscriber>();

    GoogleAbstractSubscriberManager(Class<T> type, INameFactory nameFactory, String projectId, ITraceContextFactory traceFactory, IThreadSafeConsumer<ImmutableByteArray> unprocessableMessageConsumer) {
        super(type, traceFactory, unprocessableMessageConsumer);
        this.nameFactory_ = nameFactory;
        this.projectId_ = projectId;
        this.startSubscriptions_ = true;
    }

    GoogleAbstractSubscriberManager(Class<T> type, INameFactory nameFactory, String projectId, ITraceContextFactory traceFactory) {
        super(type, traceFactory, null);
        this.nameFactory_ = nameFactory;
        this.projectId_ = projectId;
        this.startSubscriptions_ = false;
        log_.info("Starting GoogleSubscriberManager in project " + this.projectId_ + "...");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void startSubscription(Subscription<ImmutableByteArray> subscription) {
        SubscriptionName subscriptionName;
        TopicName topicName;
        for (String topic : subscription.getTopicNames()) {
            log_.info("Validating to topic " + topic + "...");
            topicName = this.nameFactory_.getTopicName(topic);
            subscriptionName = this.nameFactory_.getSubscriptionName(topicName, subscription.getSubscriptionName());
            this.validateSubcription(topicName, subscriptionName);
        }
        if (this.startSubscriptions_) {
            for (String topic : subscription.getTopicNames()) {
                log_.info("Subscribing to topic " + topic + "...");
                topicName = this.nameFactory_.getTopicName(topic);
                subscriptionName = this.nameFactory_.getSubscriptionName(topicName, subscription.getSubscriptionName());
                this.validateSubcription(topicName, subscriptionName);
                GoogleSubscriber receiver = new GoogleSubscriber(this, this.getTraceFactory(), (IThreadSafeRetryableConsumer<ImmutableByteArray>)subscription.getConsumer());
                final ProjectSubscriptionName projectSubscriptionName = ProjectSubscriptionName.of((String)this.projectId_, (String)subscriptionName.toString());
                Subscriber subscriber = Subscriber.newBuilder((ProjectSubscriptionName)projectSubscriptionName, (MessageReceiver)receiver).build();
                subscriber.addListener(new ApiService.Listener(){

                    public void failed(ApiService.State from, Throwable failure) {
                        log_.error("Error for " + projectSubscriptionName + " from " + from, failure);
                    }
                }, MoreExecutors.directExecutor());
                List<Subscriber> list = this.subscriberList_;
                synchronized (list) {
                    this.subscriberList_.add(subscriber);
                }
                subscriber.startAsync();
                log_.info("Subscribing to " + projectSubscriptionName + "...");
            }
        }
    }

    private void validateSubcription(TopicName topicName, SubscriptionName subscriptionName) {
        try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create();){
            ProjectTopicName projectTopicName = ProjectTopicName.of((String)this.projectId_, (String)topicName.toString());
            ProjectSubscriptionName projectSubscriptionName = ProjectSubscriptionName.of((String)this.projectId_, (String)subscriptionName.toString());
            try {
                com.google.pubsub.v1.Subscription existing = subscriptionAdminClient.getSubscription(projectSubscriptionName);
                log_.info("Subscription " + subscriptionName + " on topic " + topicName + " exists.");
            }
            catch (NotFoundException e) {
                log_.error("Subscription " + subscriptionName + " on topic " + topicName + " DOES NOT EXIST.");
            }
        }
        catch (IOException e) {
            throw new TransactionFault((Throwable)e);
        }
    }

    protected void stopSubscriptions() {
        for (Subscriber subscriber : this.subscriberList_) {
            try {
                subscriber.stopAsync();
                log_.info("Stopped subscriber " + subscriber.getSubscriptionNameString());
            }
            catch (RuntimeException e) {
                log_.error("Failed to stop subscriber " + subscriber.getSubscriptionNameString(), (Throwable)e);
            }
        }
    }
}

