/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.consumer;

import com.google.common.annotations.VisibleForTesting;
import java.util.List;
import java.util.concurrent.Semaphore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.consumers.consumer.ConsumerMessageSender;
import pl.allegro.tech.hermes.consumers.consumer.Message;
import pl.allegro.tech.hermes.consumers.consumer.message.MessageConverter;
import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionOffsetCommitQueues;
import pl.allegro.tech.hermes.consumers.consumer.rate.ConsumerRateLimiter;
import pl.allegro.tech.hermes.consumers.consumer.receiver.MessageReceiver;
import pl.allegro.tech.hermes.consumers.consumer.receiver.MessageReceivingTimeoutException;
import pl.allegro.tech.hermes.domain.subscription.offset.PartitionOffset;
import pl.allegro.tech.hermes.tracker.consumers.Trackers;

public class Consumer
implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(Consumer.class);
    private final MessageReceiver messageReceiver;
    private final HermesMetrics hermesMetrics;
    private final ConsumerRateLimiter rateLimiter;
    private final SubscriptionOffsetCommitQueues subscriptionOffsetCommitQueues;
    private final Semaphore inflightSemaphore;
    private final Trackers trackers;
    private final pl.allegro.tech.hermes.consumers.consumer.converter.MessageConverter messageConverter;
    private final ConsumerMessageSender sender;
    private Subscription subscription;
    private volatile boolean consuming = true;

    public Consumer(MessageReceiver messageReceiver, HermesMetrics hermesMetrics, Subscription subscription, ConsumerRateLimiter rateLimiter, SubscriptionOffsetCommitQueues subscriptionOffsetCommitQueues, ConsumerMessageSender sender, Semaphore inflightSemaphore, Trackers trackers, pl.allegro.tech.hermes.consumers.consumer.converter.MessageConverter messageConverter) {
        this.messageReceiver = messageReceiver;
        this.hermesMetrics = hermesMetrics;
        this.subscription = subscription;
        this.rateLimiter = rateLimiter;
        this.subscriptionOffsetCommitQueues = subscriptionOffsetCommitQueues;
        this.sender = sender;
        this.inflightSemaphore = inflightSemaphore;
        this.trackers = trackers;
        this.messageConverter = messageConverter;
    }

    private String getId() {
        return this.subscription.getId();
    }

    @Override
    public void run() {
        this.setThreadName();
        this.rateLimiter.initialize();
        while (this.isConsuming()) {
            try {
                this.inflightSemaphore.acquire();
                Message message = this.messageReceiver.next();
                Message convertedMessage = this.messageConverter.convert(message);
                this.sendMessage(convertedMessage);
            }
            catch (MessageReceivingTimeoutException messageReceivingTimeoutException) {
                this.inflightSemaphore.release();
                logger.debug("Timeout while reading message from topic. Trying to read message again", (Throwable)((Object)messageReceivingTimeoutException));
            }
            catch (Exception e) {
                logger.error("Consumer loop failed for " + this.getId(), (Throwable)e);
            }
        }
        logger.info("Stopping consumer for subscription {}", (Object)this.subscription.getId());
        this.messageReceiver.stop();
    }

    private void sendMessage(Message message) {
        this.subscriptionOffsetCommitQueues.put(message);
        this.hermesMetrics.incrementInflightCounter(this.subscription);
        this.trackers.get(this.subscription).logInflight(MessageConverter.toMessageMetadata(message, this.subscription));
        this.sender.sendMessage(message);
    }

    public void stopConsuming() {
        this.rateLimiter.shutdown();
        this.sender.shutdown();
        this.consuming = false;
    }

    public List<PartitionOffset> getOffsetsToCommit() {
        return this.subscriptionOffsetCommitQueues.getOffsetsToCommit();
    }

    public Subscription getSubscription() {
        return this.subscription;
    }

    public void updateSubscription(Subscription newSubscription) {
        this.rateLimiter.updateSubscription(newSubscription);
        this.sender.updateSubscription(newSubscription);
        this.subscription = newSubscription;
    }

    private void setThreadName() {
        Thread.currentThread().setName("Consumer-" + this.subscription.getId());
    }

    @VisibleForTesting
    protected boolean isConsuming() {
        return this.consuming;
    }
}

