/*
 * Decompiled with CFR 0.152.
 */
package org.symphonyoss.s2.fugue.google.pubsub;

import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import com.google.pubsub.v1.PubsubMessage;
import java.time.Instant;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.symphonyoss.s2.common.concurrent.NamedThreadFactory;
import org.symphonyoss.s2.common.immutable.ImmutableByteArray;
import org.symphonyoss.s2.fugue.core.trace.ITraceContext;
import org.symphonyoss.s2.fugue.core.trace.ITraceContextFactory;
import org.symphonyoss.s2.fugue.google.pubsub.GoogleAbstractSubscriberManager;
import org.symphonyoss.s2.fugue.naming.SubscriptionName;
import org.symphonyoss.s2.fugue.pipeline.IThreadSafeRetryableConsumer;

public class GoogleSubscriber
implements MessageReceiver {
    private static final Logger log_ = LoggerFactory.getLogger(GoogleSubscriber.class);
    private static final ScheduledThreadPoolExecutor executor_ = new ScheduledThreadPoolExecutor(1, (ThreadFactory)new NamedThreadFactory("google-failed-msg-ack", true));
    private static final int MAX_PENDING_RETRY_COUNT = 10000;
    private final GoogleAbstractSubscriberManager<?> manager_;
    private final ITraceContextFactory traceFactory_;
    private final IThreadSafeRetryableConsumer<ImmutableByteArray> consumer_;
    private final SubscriptionName subscriptionName_;

    public GoogleSubscriber(GoogleAbstractSubscriberManager<?> manager, ITraceContextFactory traceFactory, IThreadSafeRetryableConsumer<ImmutableByteArray> consumer, SubscriptionName subscriptionName) {
        this.manager_ = manager;
        this.traceFactory_ = traceFactory;
        this.consumer_ = consumer;
        this.subscriptionName_ = subscriptionName;
    }

    public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
        try {
            Timestamp ts = message.getPublishTime();
            ITraceContext trace = this.traceFactory_.createTransaction(PubsubMessage.class.getSimpleName(), message.getMessageId(), Instant.ofEpochSecond(ts.getSeconds(), ts.getNanos()));
            trace.trace("RECEIVED");
            ImmutableByteArray byteArray = ImmutableByteArray.newInstance((ByteString)message.getData());
            long retryTime = this.manager_.handleMessage(this.consumer_, byteArray, trace, message.getMessageId());
            if (retryTime < 0L) {
                trace.trace("ABOUT_TO_ACK");
                consumer.ack();
            } else if (executor_.getQueue().size() > 10000) {
                log_.error("We are holding " + executor_.getQueue().size() + " failed messages, this message will not be re-tried for up to 60 mins.");
            } else {
                executor_.schedule(() -> consumer.nack(), retryTime, TimeUnit.MILLISECONDS);
            }
            trace.finished();
        }
        catch (Throwable e) {
            log_.error("Failed to handle message from " + this.subscriptionName_, e);
        }
    }
}

