/*
 * Decompiled with CFR 0.152.
 */
package org.symphonyoss.s2.fugue.google.pubsub;

import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ModifyAckDeadlineRequest;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
import java.io.IOException;
import java.time.Instant;
import java.util.Collection;
import java.util.LinkedList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.symphonyoss.s2.common.fault.CodingFault;
import org.symphonyoss.s2.common.immutable.ImmutableByteArray;
import org.symphonyoss.s2.fugue.Fugue;
import org.symphonyoss.s2.fugue.core.trace.ITraceContext;
import org.symphonyoss.s2.fugue.core.trace.ITraceContextTransaction;
import org.symphonyoss.s2.fugue.core.trace.ITraceContextTransactionFactory;
import org.symphonyoss.s2.fugue.counter.IBusyCounter;
import org.symphonyoss.s2.fugue.counter.ICounter;
import org.symphonyoss.s2.fugue.google.pubsub.GoogleSubscriberManager;
import org.symphonyoss.s2.fugue.pipeline.IThreadSafeRetryableConsumer;
import org.symphonyoss.s2.fugue.pubsub.AbstractPullSubscriber;
import org.symphonyoss.s2.fugue.pubsub.AbstractPullSubscriberManager;
import org.symphonyoss.s2.fugue.pubsub.IPullSubscriberContext;
import org.symphonyoss.s2.fugue.pubsub.IPullSubscriberMessage;
import org.threeten.bp.Duration;

public class GoogleSubscriber
extends AbstractPullSubscriber {
    private static final int EXTENSION_TIMEOUT_SECONDS = 10;
    private static final int EXTENSION_FREQUENCY_MILLIS = 5000;
    private static final Logger log_ = LoggerFactory.getLogger(GoogleSubscriber.class);
    private final GoogleSubscriberManager manager_;
    private final ITraceContextTransactionFactory traceFactory_;
    private final IThreadSafeRetryableConsumer<ImmutableByteArray> consumer_;
    private final NonIdleSubscriber nonIdleSubscriber_;
    private final String subscriptionName_;
    private final String tenantId_;
    private int batchSize_ = 10;
    private SubscriberStubSettings subscriberStubSettings_;
    private final PullRequest blockingPullRequest_;
    private final PullRequest nonBlockingPullRequest_;

    GoogleSubscriber(GoogleSubscriberManager manager, String subscriptionName, ITraceContextTransactionFactory traceFactory, IThreadSafeRetryableConsumer<ImmutableByteArray> consumer, ICounter counter, IBusyCounter busyCounter, String tenantId) {
        super((AbstractPullSubscriberManager)manager, subscriptionName, counter, busyCounter, 5000L);
        if (Fugue.isDebugSingleThread()) {
            this.batchSize_ = 1;
        }
        this.manager_ = manager;
        this.subscriptionName_ = subscriptionName;
        this.traceFactory_ = traceFactory;
        this.consumer_ = consumer;
        this.nonIdleSubscriber_ = new NonIdleSubscriber();
        this.tenantId_ = tenantId;
        try {
            SubscriberStubSettings.Builder settingsBuilder = SubscriberStubSettings.newBuilder();
            settingsBuilder.pullSettings().setSimpleTimeoutNoRetries(Duration.ofSeconds((long)60L));
            this.subscriberStubSettings_ = settingsBuilder.build();
        }
        catch (IOException e) {
            throw new CodingFault((Throwable)e);
        }
        this.blockingPullRequest_ = PullRequest.newBuilder().setMaxMessages(this.batchSize_).setReturnImmediately(false).setSubscription(this.subscriptionName_).build();
        this.nonBlockingPullRequest_ = PullRequest.newBuilder().setMaxMessages(this.batchSize_).setReturnImmediately(true).setSubscription(this.subscriptionName_).build();
    }

    String getSubscriptionName() {
        return this.subscriptionName_;
    }

    protected NonIdleSubscriber getNonIdleSubscriber() {
        return this.nonIdleSubscriber_;
    }

    protected IPullSubscriberContext getContext() throws IOException {
        return new GooglePullSubscriberContext();
    }

    private class GooglePullSubscriberMessage
    implements IPullSubscriberMessage {
        private final GrpcSubscriberStub subscriber_;
        private final ReceivedMessage receivedMessage_;
        private boolean running_ = true;

        private GooglePullSubscriberMessage(GrpcSubscriberStub subscriber, ReceivedMessage receivedMessage) {
            this.subscriber_ = subscriber;
            this.receivedMessage_ = receivedMessage;
        }

        public String getMessageId() {
            return this.receivedMessage_.getMessage().getMessageId();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run() {
            PubsubMessage message = this.receivedMessage_.getMessage();
            Timestamp ts = message.getPublishTime();
            try (ITraceContextTransaction traceTransaction = GoogleSubscriber.this.traceFactory_.createTransaction("PubSub:Google", message.getMessageId(), GoogleSubscriber.this.tenantId_, Instant.ofEpochSecond(ts.getSeconds(), ts.getNanos()));){
                ITraceContext trace = traceTransaction.open();
                trace.trace("RECEIVED");
                ImmutableByteArray byteArray = ImmutableByteArray.newInstance((ByteString)message.getData());
                long retryTime = GoogleSubscriber.this.manager_.handleMessage(GoogleSubscriber.this.consumer_, byteArray, trace, message.getMessageId());
                GooglePullSubscriberMessage googlePullSubscriberMessage = this;
                synchronized (googlePullSubscriberMessage) {
                    this.running_ = false;
                    if (retryTime < 0L) {
                        trace.trace("ABOUT_TO_ACK");
                        AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder().setSubscription(GoogleSubscriber.this.subscriptionName_).addAckIds(this.receivedMessage_.getAckId()).build();
                        this.subscriber_.acknowledgeCallable().call((Object)acknowledgeRequest);
                        traceTransaction.finished();
                    } else {
                        trace.trace("ABOUT_TO_NACK");
                        int visibilityTimout = (int)(retryTime / 1000L);
                        ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder().setSubscription(GoogleSubscriber.this.subscriptionName_).setAckDeadlineSeconds(visibilityTimout).addAckIds(this.receivedMessage_.getAckId()).build();
                        this.subscriber_.modifyAckDeadlineCallable().call((Object)request);
                        traceTransaction.aborted();
                    }
                }
            }
            catch (RuntimeException e) {
                log_.error("Failed to process message " + this.getMessageId(), (Throwable)e);
            }
        }

        public synchronized void extend() {
            if (this.running_) {
                try {
                    ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder().setSubscription(GoogleSubscriber.this.subscriptionName_).setAckDeadlineSeconds(10).addAckIds(this.receivedMessage_.getAckId()).build();
                    this.subscriber_.modifyAckDeadlineCallable().call((Object)request);
                }
                catch (RuntimeException e) {
                    log_.error("Failed to extend message " + this.getMessageId(), (Throwable)e);
                }
            }
        }
    }

    class GooglePullSubscriberContext
    implements IPullSubscriberContext {
        private final GrpcSubscriberStub subscriber_;

        GooglePullSubscriberContext() throws IOException {
            this.subscriber_ = GrpcSubscriberStub.create((SubscriberStubSettings)GoogleSubscriber.this.subscriberStubSettings_);
        }

        public Collection<IPullSubscriberMessage> nonBlockingPull() {
            return this.pull(GoogleSubscriber.this.nonBlockingPullRequest_);
        }

        public Collection<IPullSubscriberMessage> blockingPull() {
            return this.pull(GoogleSubscriber.this.blockingPullRequest_);
        }

        private Collection<IPullSubscriberMessage> pull(PullRequest pullRequest) {
            LinkedList<IPullSubscriberMessage> result = new LinkedList<IPullSubscriberMessage>();
            for (ReceivedMessage receivedMessage : ((PullResponse)this.subscriber_.pullCallable().call((Object)pullRequest)).getReceivedMessagesList()) {
                result.add(new GooglePullSubscriberMessage(this.subscriber_, receivedMessage));
            }
            return result;
        }

        public void close() {
            this.subscriber_.close();
        }
    }

    class NonIdleSubscriber
    implements Runnable {
        NonIdleSubscriber() {
        }

        @Override
        public void run() {
            GoogleSubscriber.this.run(false);
        }
    }
}

