/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.consumer;

import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.config.ConfigFactory;
import pl.allegro.tech.hermes.common.config.Configs;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.consumers.consumer.Consumer;
import pl.allegro.tech.hermes.consumers.consumer.ConsumerMessageSender;
import pl.allegro.tech.hermes.consumers.consumer.ConsumerMessageSenderFactory;
import pl.allegro.tech.hermes.consumers.consumer.Message;
import pl.allegro.tech.hermes.consumers.consumer.converter.MessageConverterResolver;
import pl.allegro.tech.hermes.consumers.consumer.message.MessageConverter;
import pl.allegro.tech.hermes.consumers.consumer.offset.OffsetQueue;
import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset;
import pl.allegro.tech.hermes.consumers.consumer.rate.AdjustableSemaphore;
import pl.allegro.tech.hermes.consumers.consumer.rate.SerialConsumerRateLimiter;
import pl.allegro.tech.hermes.consumers.consumer.receiver.MessageReceiver;
import pl.allegro.tech.hermes.consumers.consumer.receiver.MessageReceivingTimeoutException;
import pl.allegro.tech.hermes.consumers.consumer.receiver.ReceiverFactory;
import pl.allegro.tech.hermes.tracker.consumers.Trackers;

public class SerialConsumer
implements Consumer {
    private static final Logger logger = LoggerFactory.getLogger(SerialConsumer.class);
    private final ReceiverFactory messageReceiverFactory;
    private final HermesMetrics hermesMetrics;
    private final SerialConsumerRateLimiter rateLimiter;
    private final Trackers trackers;
    private final MessageConverterResolver messageConverterResolver;
    private final ConsumerMessageSender sender;
    private final OffsetQueue offsetQueue;
    private final AdjustableSemaphore inflightSemaphore;
    private final int defaultInflight;
    private final int signalProcessingInterval;
    private Topic topic;
    private Subscription subscription;
    private MessageReceiver messageReceiver;

    public SerialConsumer(ReceiverFactory messageReceiverFactory, HermesMetrics hermesMetrics, Subscription subscription, SerialConsumerRateLimiter rateLimiter, ConsumerMessageSenderFactory consumerMessageSenderFactory, Trackers trackers, MessageConverterResolver messageConverterResolver, Topic topic, ConfigFactory configFactory, OffsetQueue offsetQueue) {
        this.defaultInflight = configFactory.getIntProperty(Configs.CONSUMER_INFLIGHT_SIZE);
        this.signalProcessingInterval = configFactory.getIntProperty(Configs.CONSUMER_SIGNAL_PROCESSING_INTERVAL);
        this.inflightSemaphore = new AdjustableSemaphore(this.calculateInflightSize(subscription));
        this.messageReceiverFactory = messageReceiverFactory;
        this.hermesMetrics = hermesMetrics;
        this.subscription = subscription;
        this.rateLimiter = rateLimiter;
        this.offsetQueue = offsetQueue;
        this.sender = consumerMessageSenderFactory.create(subscription, rateLimiter, offsetQueue, this.inflightSemaphore::release);
        this.trackers = trackers;
        this.messageConverterResolver = messageConverterResolver;
        this.messageReceiver = () -> {
            throw new IllegalStateException("Consumer not initialized");
        };
        this.topic = topic;
    }

    private int calculateInflightSize(Subscription subscription) {
        return Math.min(subscription.getSerialSubscriptionPolicy().getInflightSize(), this.defaultInflight);
    }

    @Override
    public void consume(Runnable signalsInterrupt) {
        try {
            do {
                signalsInterrupt.run();
            } while (!this.inflightSemaphore.tryAcquire(this.signalProcessingInterval, TimeUnit.MILLISECONDS));
            Message message = this.messageReceiver.next();
            if (logger.isDebugEnabled()) {
                logger.debug("Read message {} partition {} offset {}", new Object[]{message.getContentType(), message.getPartition(), message.getOffset()});
            }
            Message convertedMessage = this.messageConverterResolver.converterFor(message, this.subscription).convert(message, this.topic);
            this.sendMessage(convertedMessage);
        }
        catch (MessageReceivingTimeoutException messageReceivingTimeoutException) {
            this.inflightSemaphore.release();
            logger.trace("Timeout while reading message for subscription {}. Trying to read message again", (Object)this.subscription.getQualifiedName(), (Object)messageReceivingTimeoutException);
        }
        catch (Exception e) {
            logger.error("Consumer loop failed for {}", (Object)this.subscription.getQualifiedName(), (Object)e);
        }
    }

    private void sendMessage(Message message) {
        this.offsetQueue.offerInflightOffset(SubscriptionPartitionOffset.subscriptionPartitionOffset(message, this.subscription));
        this.hermesMetrics.incrementInflightCounter(this.subscription);
        this.trackers.get(this.subscription).logInflight(MessageConverter.toMessageMetadata(message, this.subscription));
        this.sender.sendAsync(message);
    }

    @Override
    public void initialize() {
        logger.info("Consumer: preparing message receiver for subscription {}", (Object)this.subscription.getQualifiedName());
        this.initializeMessageReceiver();
        this.rateLimiter.initialize();
    }

    private void initializeMessageReceiver() {
        this.messageReceiver = this.messageReceiverFactory.createMessageReceiver(this.topic, this.subscription, this.rateLimiter);
    }

    @Override
    public void tearDown() {
        this.messageReceiver.stop();
        this.rateLimiter.shutdown();
        this.sender.shutdown();
    }

    @Override
    public void updateSubscription(Subscription newSubscription) {
        logger.info("Updating consumer for subscription {}", (Object)this.subscription.getQualifiedName());
        this.inflightSemaphore.setMaxPermits(this.calculateInflightSize(newSubscription));
        this.rateLimiter.updateSubscription(newSubscription);
        this.sender.updateSubscription(newSubscription);
        this.messageReceiver.update(newSubscription);
        this.subscription = newSubscription;
    }

    @Override
    public void updateTopic(Topic newTopic) {
        if (this.topic.getContentType() != newTopic.getContentType()) {
            logger.info("Topic content type changed from {} to {}, reinitializing message recevier", (Object)this.topic.getContentType(), (Object)newTopic.getContentType());
            this.topic = newTopic;
            this.messageReceiver.stop();
            this.initializeMessageReceiver();
        }
    }
}

