/*
 * Decompiled with CFR 0.152.
 */
package org.symphonyoss.s2.fugue.google.pubsub;

import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import org.symphonyoss.s2.common.immutable.ImmutableByteArray;
import org.symphonyoss.s2.fugue.core.trace.ITraceContext;
import org.symphonyoss.s2.fugue.core.trace.ITraceContextFactory;
import org.symphonyoss.s2.fugue.google.pubsub.GoogleAbstractSubscriberManager;
import org.symphonyoss.s2.fugue.pipeline.IThreadSafeRetryableConsumer;

public class GoogleSubscriber
implements MessageReceiver {
    private final GoogleAbstractSubscriberManager<?> manager_;
    private final ITraceContextFactory traceFactory_;
    private final IThreadSafeRetryableConsumer<ImmutableByteArray> consumer_;

    public GoogleSubscriber(GoogleAbstractSubscriberManager<?> manager, ITraceContextFactory traceFactory, IThreadSafeRetryableConsumer<ImmutableByteArray> consumer) {
        this.manager_ = manager;
        this.traceFactory_ = traceFactory;
        this.consumer_ = consumer;
    }

    public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
        try {
            ITraceContext trace = this.traceFactory_.createTransaction(PubsubMessage.class.getName(), message.getMessageId());
            ImmutableByteArray byteArray = ImmutableByteArray.newInstance((ByteString)message.getData());
            long retryTime = this.manager_.handleMessage(this.consumer_, byteArray, trace);
            if (retryTime < 0L) {
                trace.trace("ABOUT_TO_ACK");
                consumer.ack();
            } else {
                consumer.nack();
            }
            trace.finished();
        }
        catch (RuntimeException e) {
            e.printStackTrace();
        }
    }
}

