/*
 * Decompiled with CFR 0.152.
 */
package org.symphonyoss.s2.fugue.google.pubsub;

import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ModifyAckDeadlineRequest;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
import java.io.IOException;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.symphonyoss.s2.common.fault.CodingFault;
import org.symphonyoss.s2.common.immutable.ImmutableByteArray;
import org.symphonyoss.s2.fugue.core.trace.ITraceContext;
import org.symphonyoss.s2.fugue.core.trace.ITraceContextTransaction;
import org.symphonyoss.s2.fugue.core.trace.ITraceContextTransactionFactory;
import org.symphonyoss.s2.fugue.counter.IBusyCounter;
import org.symphonyoss.s2.fugue.counter.ICounter;
import org.symphonyoss.s2.fugue.deploy.IBatch;
import org.symphonyoss.s2.fugue.google.pubsub.GoogleSubscriberManager;
import org.symphonyoss.s2.fugue.pipeline.FatalConsumerException;
import org.symphonyoss.s2.fugue.pipeline.IThreadSafeRetryableConsumer;
import org.symphonyoss.s2.fugue.pipeline.RetryableConsumerException;

public class GoogleSubscriber
implements Runnable {
    private static final Logger log_ = LoggerFactory.getLogger(GoogleSubscriber.class);
    private final GoogleSubscriberManager manager_;
    private final ITraceContextTransactionFactory traceFactory_;
    private final IThreadSafeRetryableConsumer<ImmutableByteArray> consumer_;
    private final NonIdleSubscriber nonIdleSubscriber_;
    private final String subscriptionName_;
    private final ICounter counter_;
    private final IBusyCounter busyCounter_;
    private final String tenantId_;
    private boolean running_ = true;
    private int batchSize_ = 10;
    private SubscriberStubSettings subscriberStubSettings_;
    private final PullRequest blockingPullRequest_;
    private final PullRequest nonBlockingPullRequest_;

    GoogleSubscriber(GoogleSubscriberManager manager, String subscriptionName, ITraceContextTransactionFactory traceFactory, IThreadSafeRetryableConsumer<ImmutableByteArray> consumer, ICounter counter, IBusyCounter busyCounter, String tenantId) {
        this.manager_ = manager;
        this.subscriptionName_ = subscriptionName;
        this.traceFactory_ = traceFactory;
        this.consumer_ = consumer;
        this.nonIdleSubscriber_ = new NonIdleSubscriber();
        this.counter_ = counter;
        this.busyCounter_ = busyCounter;
        this.tenantId_ = tenantId;
        try {
            this.subscriberStubSettings_ = SubscriberStubSettings.newBuilder().build();
        }
        catch (IOException e) {
            throw new CodingFault((Throwable)e);
        }
        this.blockingPullRequest_ = PullRequest.newBuilder().setMaxMessages(this.batchSize_).setReturnImmediately(false).setSubscription(this.subscriptionName_).build();
        this.nonBlockingPullRequest_ = PullRequest.newBuilder().setMaxMessages(this.batchSize_).setReturnImmediately(true).setSubscription(this.subscriptionName_).build();
    }

    String getSubscriptionName() {
        return this.subscriptionName_;
    }

    @Override
    public void run() {
        this.run(true);
    }

    void run(boolean runIfIdle) {
        if (this.isRunning()) {
            if (runIfIdle) {
                try {
                    while (this.isRunning()) {
                        this.getSomeMessages();
                    }
                }
                finally {
                    if (runIfIdle && this.isRunning()) {
                        log_.error("Main PubSub thread returned, rescheduling...");
                        this.manager_.submit(this, true);
                    }
                }
            } else if (this.isRunning()) {
                this.getSomeMessages();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private void getSomeMessages() {
        try (GrpcSubscriberStub subscriber = GrpcSubscriberStub.create((SubscriberStubSettings)this.subscriberStubSettings_);){
            PullResponse pullResponse = (PullResponse)subscriber.pullCallable().call((Object)this.nonBlockingPullRequest_);
            if (pullResponse.getReceivedMessagesList().isEmpty()) {
                if (this.busyCounter_ != null && this.busyCounter_.idle()) {
                    this.stop();
                    return;
                }
                log_.info(">>>>>>>>Blocking read for " + this.subscriptionName_ + "...");
                pullResponse = (PullResponse)subscriber.pullCallable().call((Object)this.blockingPullRequest_);
                log_.info(">>>>>>>>Blocking read for " + this.subscriptionName_ + " returned " + pullResponse.getReceivedMessagesList().size());
            } else {
                if (this.busyCounter_ != null) {
                    this.busyCounter_.busy();
                }
                if (this.isRunning()) {
                    this.manager_.submit(this.nonIdleSubscriber_, false);
                    log_.debug("Extra schedule " + this.subscriptionName_);
                }
                log_.info(">>>>>>>>Non-Blocking read for " + this.subscriptionName_ + " returned " + pullResponse.getReceivedMessagesList().size());
            }
            List messages = pullResponse.getReceivedMessagesList();
            switch (messages.size()) {
                case 0: {
                    return;
                }
                case 1: {
                    if (this.counter_ != null) {
                        this.counter_.increment(1);
                    }
                    this.handleMessage((SubscriberStub)subscriber, (ReceivedMessage)messages.get(0));
                    return;
                }
                default: {
                    if (this.counter_ != null) {
                        this.counter_.increment(messages.size());
                    }
                    int index = 0;
                    IBatch batch = this.manager_.newBatch();
                    try {
                        while (index < messages.size() - 1) {
                            int myIndex = index++;
                            batch.submit(() -> this.lambda$getSomeMessages$0((SubscriberStub)subscriber, messages, myIndex));
                        }
                        this.handleMessage((SubscriberStub)subscriber, (ReceivedMessage)messages.get(index));
                        batch.waitForAllTasks();
                        return;
                    }
                    catch (RuntimeException e) {
                        Throwable cause = e.getCause();
                        if (cause instanceof ExecutionException) {
                            cause = cause.getCause();
                        }
                        if (cause instanceof RetryableConsumerException) {
                            throw (RetryableConsumerException)cause;
                        }
                        if (!(cause instanceof FatalConsumerException)) throw e;
                        throw (FatalConsumerException)cause;
                    }
                }
            }
        }
        catch (RuntimeException e) {
            log_.error("Error processing message", (Throwable)e);
            return;
        }
        catch (Throwable e) {
            try {
                log_.error("Error processing message", e);
                return;
            }
            finally {
                System.exit(1);
            }
        }
    }

    private void handleMessage(SubscriberStub subscriber, ReceivedMessage receivedMessage) {
        PubsubMessage message = receivedMessage.getMessage();
        Timestamp ts = message.getPublishTime();
        try (ITraceContextTransaction traceTransaction = this.traceFactory_.createTransaction("PubSub:Google", message.getMessageId(), this.tenantId_, Instant.ofEpochSecond(ts.getSeconds(), ts.getNanos()));){
            ITraceContext trace = traceTransaction.open();
            trace.trace("RECEIVED");
            ImmutableByteArray byteArray = ImmutableByteArray.newInstance((ByteString)message.getData());
            long retryTime = this.manager_.handleMessage(this.consumer_, byteArray, trace, message.getMessageId());
            if (retryTime < 0L) {
                trace.trace("ABOUT_TO_ACK");
                AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder().setSubscription(this.subscriptionName_).addAckIds(receivedMessage.getAckId()).build();
                subscriber.acknowledgeCallable().call((Object)acknowledgeRequest);
                traceTransaction.finished();
            } else {
                trace.trace("ABOUT_TO_NACK");
                int visibilityTimout = (int)(retryTime / 1000L);
                ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder().setAckDeadlineSeconds(visibilityTimout).addAckIds(receivedMessage.getAckId()).build();
                subscriber.modifyAckDeadlineCallable().call((Object)request);
                traceTransaction.aborted();
            }
        }
    }

    synchronized boolean isRunning() {
        return this.running_;
    }

    synchronized void stop() {
        this.running_ = false;
    }

    private /* synthetic */ void lambda$getSomeMessages$0(SubscriberStub subscriber, List messages, int myIndex) {
        this.handleMessage(subscriber, (ReceivedMessage)messages.get(myIndex));
    }

    class NonIdleSubscriber
    implements Runnable {
        NonIdleSubscriber() {
        }

        @Override
        public void run() {
            GoogleSubscriber.this.run(false);
        }
    }
}

