/*
 * Decompiled with CFR 0.152.
 */
package org.symphonyoss.s2.fugue.google.pubsub;

import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import com.google.pubsub.v1.PubsubMessage;
import java.time.Instant;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.symphonyoss.s2.common.concurrent.NamedThreadFactory;
import org.symphonyoss.s2.common.immutable.ImmutableByteArray;
import org.symphonyoss.s2.fugue.core.trace.ITraceContext;
import org.symphonyoss.s2.fugue.core.trace.ITraceContextTransaction;
import org.symphonyoss.s2.fugue.core.trace.ITraceContextTransactionFactory;
import org.symphonyoss.s2.fugue.counter.ICounter;
import org.symphonyoss.s2.fugue.google.pubsub.GoogleSubscriberManager;
import org.symphonyoss.s2.fugue.naming.SubscriptionName;
import org.symphonyoss.s2.fugue.pipeline.IThreadSafeRetryableConsumer;

public class GoogleSubscriber
implements MessageReceiver {
    private static final Logger log_ = LoggerFactory.getLogger(GoogleSubscriber.class);
    private static final ScheduledThreadPoolExecutor executor_ = new ScheduledThreadPoolExecutor(1, (ThreadFactory)new NamedThreadFactory("google-failed-msg-ack", true));
    private static final int MAX_PENDING_RETRY_COUNT = 10000;
    private final GoogleSubscriberManager manager_;
    private final ITraceContextTransactionFactory traceFactory_;
    private final IThreadSafeRetryableConsumer<ImmutableByteArray> consumer_;
    private final SubscriptionName subscriptionName_;
    private final ICounter counter_;
    private AtomicBoolean stopped_ = new AtomicBoolean();

    GoogleSubscriber(GoogleSubscriberManager manager, ITraceContextTransactionFactory traceFactory, IThreadSafeRetryableConsumer<ImmutableByteArray> consumer, SubscriptionName subscriptionName, ICounter counter) {
        this.manager_ = manager;
        this.traceFactory_ = traceFactory;
        this.consumer_ = consumer;
        this.subscriptionName_ = subscriptionName;
        this.counter_ = counter;
    }

    public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
        Timestamp ts = message.getPublishTime();
        try (ITraceContextTransaction traceTransaction = this.traceFactory_.createTransaction(PubsubMessage.class.getSimpleName(), message.getMessageId(), Instant.ofEpochSecond(ts.getSeconds(), ts.getNanos()));){
            ITraceContext trace = traceTransaction.open();
            if (this.stopped_.get()) {
                System.err.println("NAKing message");
                trace.trace("ABORTING_SHUTDOWN");
                traceTransaction.aborted();
                consumer.nack();
                return;
            }
            if (this.counter_ != null) {
                this.counter_.increment(1);
            }
            trace.trace("RECEIVED");
            ImmutableByteArray byteArray = ImmutableByteArray.newInstance((ByteString)message.getData());
            long retryTime = this.manager_.handleMessage(this.consumer_, byteArray, trace, message.getMessageId());
            if (retryTime < 0L) {
                trace.trace("ABOUT_TO_ACK");
                consumer.ack();
            } else if (executor_.getQueue().size() > 10000) {
                log_.error("We are holding " + executor_.getQueue().size() + " failed messages, this message will not be re-tried for up to 60 mins.");
            } else {
                executor_.schedule(() -> consumer.nack(), retryTime, TimeUnit.MILLISECONDS);
            }
            traceTransaction.finished();
        }
        catch (Throwable e) {
            log_.error("Failed to handle message from " + this.subscriptionName_, e);
        }
    }

    public void stop() {
        this.stopped_.set(true);
    }
}

