package com.spotify.google.cloud.pubsub.client;

import com.google.common.util.concurrent.MoreExecutors;
import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/spotify/google/cloud/pubsub/client/Puller.class */
public class Puller implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(Puller.class);
    private final Acker acker;
    private final Pubsub pubsub;
    private final String project;
    private final String subscription;
    private final MessageHandler handler;
    private final int concurrency;
    private final int batchSize;
    private final int maxOutstandingMessages;
    private final int maxAckQueueSize;
    private final long pullIntervalMillis;
    private final ScheduledExecutorService scheduler = MoreExecutors.getExitingScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
    private final AtomicInteger outstandingRequests = new AtomicInteger();
    private final AtomicInteger outstandingMessages = new AtomicInteger();

    /* loaded from: input_file:com/spotify/google/cloud/pubsub/client/Puller$Builder.class */
    public static class Builder {
        private Pubsub pubsub;
        private String project;
        private String subscription;
        private MessageHandler handler;
        private int concurrency = 64;
        private int batchSize = 1000;
        private int maxOutstandingMessages = 64000;
        private int maxAckQueueSize = 10 * this.batchSize;
        private long pullIntervalMillis = 1000;

        public Builder pubsub(Pubsub pubsub) {
            this.pubsub = pubsub;
            return this;
        }

        public Builder project(String str) {
            this.project = str;
            return this;
        }

        public Builder subscription(String str) {
            this.subscription = str;
            return this;
        }

        public Builder messageHandler(MessageHandler messageHandler) {
            this.handler = messageHandler;
            return this;
        }

        public Builder concurrency(int i) {
            this.concurrency = i;
            return this;
        }

        public Builder batchSize(int i) {
            this.batchSize = i;
            return this;
        }

        public Builder maxOutstandingMessages(int i) {
            this.maxOutstandingMessages = i;
            return this;
        }

        public Builder maxAckQueueSize(int i) {
            this.maxAckQueueSize = i;
            return this;
        }

        public Builder pullIntervalMillis(long j) {
            this.pullIntervalMillis = j;
            return this;
        }

        public Puller build() {
            return new Puller(this);
        }
    }

    /* loaded from: input_file:com/spotify/google/cloud/pubsub/client/Puller$MessageHandler.class */
    public interface MessageHandler {
        CompletionStage<String> handleMessage(Puller puller, String str, Message message, String str2);
    }

    public Puller(Builder builder) {
        this.pubsub = (Pubsub) Objects.requireNonNull(builder.pubsub, "pubsub");
        this.project = (String) Objects.requireNonNull(builder.project, "project");
        this.subscription = (String) Objects.requireNonNull(builder.subscription, "subscription");
        this.handler = (MessageHandler) Objects.requireNonNull(builder.handler, "handler");
        this.concurrency = builder.concurrency;
        this.batchSize = builder.batchSize;
        this.maxOutstandingMessages = builder.maxOutstandingMessages;
        this.maxAckQueueSize = builder.maxAckQueueSize;
        this.pullIntervalMillis = builder.pullIntervalMillis;
        this.acker = Acker.builder().pubsub(this.pubsub).project(this.project).subscription(this.subscription).batchSize(this.batchSize).concurrency(this.concurrency).queueSize(Integer.valueOf(this.maxAckQueueSize)).build();
        pull();
        this.scheduler.scheduleWithFixedDelay(this::pull, this.pullIntervalMillis, this.pullIntervalMillis, TimeUnit.MILLISECONDS);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.scheduler.shutdownNow();
        try {
            this.scheduler.awaitTermination(30L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public int maxAckQueueSize() {
        return this.maxAckQueueSize;
    }

    public int maxOutstandingMessages() {
        return this.maxOutstandingMessages;
    }

    public int outstandingMessages() {
        return this.outstandingMessages.get();
    }

    public int concurrency() {
        return this.concurrency;
    }

    public int outstandingRequests() {
        return this.outstandingRequests.get();
    }

    public int batchSize() {
        return this.batchSize;
    }

    public String subscription() {
        return this.subscription;
    }

    public String project() {
        return this.project;
    }

    public long pullIntervalMillis() {
        return this.pullIntervalMillis;
    }

    private void pull() {
        while (this.outstandingRequests.get() < this.concurrency && this.outstandingMessages.get() < this.maxOutstandingMessages) {
            pullBatch();
        }
    }

    private void pullBatch() {
        this.outstandingRequests.incrementAndGet();
        this.pubsub.pull(this.project, this.subscription, false, this.batchSize).whenComplete((list, th) -> {
            this.outstandingRequests.decrementAndGet();
            if (th != null) {
                log.error("Pull failed", th);
                return;
            }
            this.outstandingMessages.addAndGet(list.size());
            Iterator it = list.iterator();
            while (it.hasNext()) {
                ReceivedMessage receivedMessage = (ReceivedMessage) it.next();
                try {
                    CompletionStage<String> handleMessage = this.handler.handleMessage(this, this.subscription, receivedMessage.message(), receivedMessage.ackId());
                    if (handleMessage == null) {
                        this.outstandingMessages.decrementAndGet();
                        log.error("Message handler returned null");
                    } else {
                        handleMessage.whenComplete((str, th) -> {
                            this.outstandingMessages.decrementAndGet();
                        });
                        Acker acker = this.acker;
                        acker.getClass();
                        handleMessage.thenAccept(acker::acknowledge).exceptionally(th2 -> {
                            if (th2 instanceof CancellationException) {
                                return null;
                            }
                            log.error("Acking pubsub threw exception", th2);
                            return null;
                        });
                    }
                } catch (Exception e) {
                    this.outstandingMessages.decrementAndGet();
                    log.error("Message handler threw exception", e);
                }
            }
        });
    }

    public static Builder builder() {
        return new Builder();
    }
}
