/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.consumer;

import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.config.ConfigFactory;
import pl.allegro.tech.hermes.common.config.Configs;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.consumers.consumer.Consumer;
import pl.allegro.tech.hermes.consumers.consumer.ConsumerAuthorizationHandler;
import pl.allegro.tech.hermes.consumers.consumer.ConsumerMessageSender;
import pl.allegro.tech.hermes.consumers.consumer.ConsumerMessageSenderFactory;
import pl.allegro.tech.hermes.consumers.consumer.Message;
import pl.allegro.tech.hermes.consumers.consumer.converter.MessageConverterResolver;
import pl.allegro.tech.hermes.consumers.consumer.message.MessageConverter;
import pl.allegro.tech.hermes.consumers.consumer.offset.OffsetQueue;
import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset;
import pl.allegro.tech.hermes.consumers.consumer.rate.AdjustableSemaphore;
import pl.allegro.tech.hermes.consumers.consumer.rate.SerialConsumerRateLimiter;
import pl.allegro.tech.hermes.consumers.consumer.receiver.MessageReceiver;
import pl.allegro.tech.hermes.consumers.consumer.receiver.ReceiverFactory;
import pl.allegro.tech.hermes.consumers.consumer.receiver.UninitializedMessageReceiver;
import pl.allegro.tech.hermes.tracker.consumers.Trackers;

public class SerialConsumer
implements Consumer {
    private static final Logger logger = LoggerFactory.getLogger(SerialConsumer.class);
    private final ReceiverFactory messageReceiverFactory;
    private final HermesMetrics hermesMetrics;
    private final SerialConsumerRateLimiter rateLimiter;
    private final Trackers trackers;
    private final MessageConverterResolver messageConverterResolver;
    private final ConsumerMessageSender sender;
    private final ConfigFactory configFactory;
    private final OffsetQueue offsetQueue;
    private final ConsumerAuthorizationHandler consumerAuthorizationHandler;
    private final AdjustableSemaphore inflightSemaphore;
    private final int defaultInflight;
    private final int signalProcessingInterval;
    private Topic topic;
    private Subscription subscription;
    private MessageReceiver messageReceiver;

    public SerialConsumer(ReceiverFactory messageReceiverFactory, HermesMetrics hermesMetrics, Subscription subscription, SerialConsumerRateLimiter rateLimiter, ConsumerMessageSenderFactory consumerMessageSenderFactory, Trackers trackers, MessageConverterResolver messageConverterResolver, Topic topic, ConfigFactory configFactory, OffsetQueue offsetQueue, ConsumerAuthorizationHandler consumerAuthorizationHandler) {
        this.defaultInflight = configFactory.getIntProperty(Configs.CONSUMER_INFLIGHT_SIZE);
        this.signalProcessingInterval = configFactory.getIntProperty(Configs.CONSUMER_SIGNAL_PROCESSING_INTERVAL);
        this.inflightSemaphore = new AdjustableSemaphore(this.calculateInflightSize(subscription));
        this.messageReceiverFactory = messageReceiverFactory;
        this.hermesMetrics = hermesMetrics;
        this.subscription = subscription;
        this.rateLimiter = rateLimiter;
        this.configFactory = configFactory;
        this.offsetQueue = offsetQueue;
        this.consumerAuthorizationHandler = consumerAuthorizationHandler;
        this.trackers = trackers;
        this.messageConverterResolver = messageConverterResolver;
        this.messageReceiver = new UninitializedMessageReceiver();
        this.topic = topic;
        this.sender = consumerMessageSenderFactory.create(subscription, rateLimiter, offsetQueue, this.inflightSemaphore::release);
    }

    private int calculateInflightSize(Subscription subscription) {
        return Math.min(subscription.getSerialSubscriptionPolicy().getInflightSize(), this.defaultInflight);
    }

    @Override
    public void consume(Runnable signalsInterrupt) {
        try {
            do {
                signalsInterrupt.run();
            } while (!this.inflightSemaphore.tryAcquire(this.signalProcessingInterval, TimeUnit.MILLISECONDS));
            Optional<Message> maybeMessage = this.messageReceiver.next();
            if (maybeMessage.isPresent()) {
                Message message = maybeMessage.get();
                if (logger.isDebugEnabled()) {
                    logger.debug("Read message {} partition {} offset {}", new Object[]{message.getContentType(), message.getPartition(), message.getOffset()});
                }
                Message convertedMessage = this.messageConverterResolver.converterFor(message, this.subscription).convert(message, this.topic);
                this.sendMessage(convertedMessage);
            } else {
                this.inflightSemaphore.release();
            }
        }
        catch (Exception e) {
            logger.error("Consumer loop failed for {}", (Object)this.subscription.getQualifiedName(), (Object)e);
        }
    }

    private void sendMessage(Message message) {
        this.offsetQueue.offerInflightOffset(SubscriptionPartitionOffset.subscriptionPartitionOffset(message, this.subscription));
        this.hermesMetrics.incrementInflightCounter(this.subscription);
        this.trackers.get(this.subscription).logInflight(MessageConverter.toMessageMetadata(message, this.subscription));
        this.sender.sendAsync(message);
    }

    @Override
    public void initialize() {
        logger.info("Consumer: preparing message receiver for subscription {}", (Object)this.subscription.getQualifiedName());
        this.initializeMessageReceiver();
        this.sender.initialize();
        this.rateLimiter.initialize();
        this.consumerAuthorizationHandler.createSubscriptionHandler(this.subscription.getQualifiedName());
    }

    private void initializeMessageReceiver() {
        this.messageReceiver = this.messageReceiverFactory.createMessageReceiver(this.topic, this.subscription, this.rateLimiter);
    }

    @Override
    public void tearDown() {
        this.messageReceiver.stop();
        this.sender.shutdown();
        this.rateLimiter.shutdown();
        this.consumerAuthorizationHandler.removeSubscriptionHandler(this.subscription.getQualifiedName());
    }

    @Override
    public void updateSubscription(Subscription newSubscription) {
        logger.info("Updating consumer for subscription {}", (Object)this.subscription.getQualifiedName());
        this.inflightSemaphore.setMaxPermits(this.calculateInflightSize(newSubscription));
        this.rateLimiter.updateSubscription(newSubscription);
        this.sender.updateSubscription(newSubscription);
        this.messageReceiver.update(newSubscription);
        this.consumerAuthorizationHandler.updateSubscription(newSubscription.getQualifiedName());
        this.subscription = newSubscription;
    }

    @Override
    public void updateTopic(Topic newTopic) {
        if (this.topic.getContentType() != newTopic.getContentType() || this.messageSizeChanged(newTopic) || this.topic.isSchemaVersionAwareSerializationEnabled() != newTopic.isSchemaVersionAwareSerializationEnabled()) {
            logger.info("Reinitializing message receiver, contentType, messageSize or schemaVersionAwareSerialization changed.");
            this.topic = newTopic;
            this.messageReceiver.stop();
            this.initializeMessageReceiver();
        }
    }

    private boolean messageSizeChanged(Topic newTopic) {
        return this.topic.getMaxMessageSize() != newTopic.getMaxMessageSize() && this.configFactory.getBooleanProperty(Configs.CONSUMER_USE_TOPIC_MESSAGE_SIZE);
    }

    @Override
    public void commit(Set<SubscriptionPartitionOffset> offsets) {
        this.messageReceiver.commit(offsets);
    }

    @Override
    public boolean moveOffset(SubscriptionPartitionOffset offset) {
        return this.messageReceiver.moveOffset(offset);
    }

    @Override
    public Subscription getSubscription() {
        return this.subscription;
    }
}

