/*
 * Decompiled with CFR 0.152.
 */
package org.symphonyoss.s2.fugue.google.pubsub;

import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.Subscription;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.symphonyoss.s2.common.fault.ProgramFault;
import org.symphonyoss.s2.common.fault.TransactionFault;
import org.symphonyoss.s2.common.immutable.ImmutableByteArray;
import org.symphonyoss.s2.fugue.config.IConfiguration;
import org.symphonyoss.s2.fugue.core.trace.ITraceContextTransactionFactory;
import org.symphonyoss.s2.fugue.google.pubsub.GoogleSubscriber;
import org.symphonyoss.s2.fugue.naming.INameFactory;
import org.symphonyoss.s2.fugue.naming.SubscriptionName;
import org.symphonyoss.s2.fugue.naming.TopicName;
import org.symphonyoss.s2.fugue.pipeline.IThreadSafeErrorConsumer;
import org.symphonyoss.s2.fugue.pipeline.IThreadSafeRetryableConsumer;
import org.symphonyoss.s2.fugue.pubsub.AbstractPullSubscriberManager;
import org.symphonyoss.s2.fugue.pubsub.SubscriptionImpl;

public class GoogleSubscriberManager
extends AbstractPullSubscriberManager<ImmutableByteArray, GoogleSubscriberManager> {
    private static final Logger log_ = LoggerFactory.getLogger(GoogleSubscriberManager.class);
    private final String projectId_;
    private List<GoogleSubscriber> subscribers_ = new LinkedList<GoogleSubscriber>();

    public GoogleSubscriberManager(IConfiguration config, INameFactory nameFactory, String projectId, ITraceContextTransactionFactory traceFactory, IThreadSafeErrorConsumer<ImmutableByteArray> unprocessableMessageConsumer) {
        super(config, nameFactory, GoogleSubscriberManager.class, traceFactory, unprocessableMessageConsumer, "org/symphonyoss/s2/fugue/google/pubsub");
        this.projectId_ = projectId;
    }

    protected void initSubscription(SubscriptionImpl<ImmutableByteArray> subscription) {
        for (TopicName topicName : subscription.getTopicNames()) {
            log_.info("Validating topic " + topicName + "...");
            SubscriptionName subscriptionName = this.nameFactory_.getSubscriptionName(topicName, subscription.getSubscriptionId());
            this.validateSubcription(topicName, subscriptionName);
            GoogleSubscriber subscriber = new GoogleSubscriber(this, ProjectSubscriptionName.format((String)this.projectId_, (String)subscriptionName.toString()), this.getTraceFactory(), (IThreadSafeRetryableConsumer<ImmutableByteArray>)subscription.getConsumer(), this.getCounter(), this.nameFactory_.getTenantId());
            this.subscribers_.add(subscriber);
            log_.info("Subscribing to " + subscriptionName + "...");
        }
    }

    private void validateSubcription(TopicName topicName, SubscriptionName subscriptionName) {
        try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create();){
            ProjectSubscriptionName projectSubscriptionName = ProjectSubscriptionName.of((String)this.projectId_, (String)subscriptionName.toString());
            try {
                Subscription existing = subscriptionAdminClient.getSubscription(projectSubscriptionName);
                log_.info("Subscription " + subscriptionName + " on topic " + topicName + " exists with ack deadline " + existing.getAckDeadlineSeconds() + " seconds.");
            }
            catch (NotFoundException e) {
                throw new ProgramFault("Subscription " + subscriptionName + " on topic " + topicName + " DOES NOT EXIST.");
            }
            catch (StatusRuntimeException e) {
                log_.error("Subscription " + subscriptionName + " on topic " + topicName + " cannot be validated - lets hope....", (Throwable)e);
            }
        }
        catch (IOException e) {
            throw new TransactionFault((Throwable)e);
        }
    }

    protected void startSubscriptions() {
        for (GoogleSubscriber subscriber : this.subscribers_) {
            log_.info("Starting subscription to " + subscriber.getSubscriptionName() + "...");
            this.submit(subscriber, true);
        }
    }

    protected void stopSubscriptions() {
        for (GoogleSubscriber subscriber : this.subscribers_) {
            subscriber.stop();
        }
        super.stopSubscriptions();
    }
}

