/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.consumer;

import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.consumers.CommonConsumerParameters;
import pl.allegro.tech.hermes.consumers.consumer.Consumer;
import pl.allegro.tech.hermes.consumers.consumer.ConsumerAuthorizationHandler;
import pl.allegro.tech.hermes.consumers.consumer.ConsumerMessageSender;
import pl.allegro.tech.hermes.consumers.consumer.ConsumerMessageSenderFactory;
import pl.allegro.tech.hermes.consumers.consumer.Message;
import pl.allegro.tech.hermes.consumers.consumer.converter.MessageConverterResolver;
import pl.allegro.tech.hermes.consumers.consumer.load.SubscriptionLoadRecorder;
import pl.allegro.tech.hermes.consumers.consumer.message.MessageConverter;
import pl.allegro.tech.hermes.consumers.consumer.offset.ConsumerPartitionAssignmentState;
import pl.allegro.tech.hermes.consumers.consumer.offset.OffsetCommitter;
import pl.allegro.tech.hermes.consumers.consumer.offset.PendingOffsets;
import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset;
import pl.allegro.tech.hermes.consumers.consumer.profiling.ConsumerProfiler;
import pl.allegro.tech.hermes.consumers.consumer.profiling.ConsumerRun;
import pl.allegro.tech.hermes.consumers.consumer.profiling.DefaultConsumerProfiler;
import pl.allegro.tech.hermes.consumers.consumer.profiling.NoOpConsumerProfiler;
import pl.allegro.tech.hermes.consumers.consumer.rate.SerialConsumerRateLimiter;
import pl.allegro.tech.hermes.consumers.consumer.receiver.MessageReceiver;
import pl.allegro.tech.hermes.consumers.consumer.receiver.ReceiverFactory;
import pl.allegro.tech.hermes.consumers.consumer.receiver.UninitializedMessageReceiver;
import pl.allegro.tech.hermes.tracker.consumers.Trackers;

public class SerialConsumer
implements Consumer {
    private static final Logger logger = LoggerFactory.getLogger(SerialConsumer.class);
    private final ReceiverFactory messageReceiverFactory;
    private final MetricsFacade metrics;
    private final SerialConsumerRateLimiter rateLimiter;
    private final Trackers trackers;
    private final MessageConverterResolver messageConverterResolver;
    private final ConsumerMessageSender sender;
    private final boolean useTopicMessageSizeEnabled;
    private final PendingOffsets pendingOffsets;
    private final ConsumerAuthorizationHandler consumerAuthorizationHandler;
    private final SubscriptionLoadRecorder loadRecorder;
    private final OffsetCommitter offsetCommitter;
    private final Duration commitPeriod;
    private final int defaultInflight;
    private final Duration signalProcessingInterval;
    private Topic topic;
    private Subscription subscription;
    private MessageReceiver messageReceiver;
    private Instant lastCommitTime;

    public SerialConsumer(ReceiverFactory messageReceiverFactory, MetricsFacade metrics, Subscription subscription, SerialConsumerRateLimiter rateLimiter, ConsumerMessageSenderFactory consumerMessageSenderFactory, Trackers trackers, MessageConverterResolver messageConverterResolver, Topic topic, CommonConsumerParameters commonConsumerParameters, ConsumerAuthorizationHandler consumerAuthorizationHandler, SubscriptionLoadRecorder loadRecorder, ConsumerPartitionAssignmentState consumerPartitionAssignmentState, Duration commitPeriod, int offsetQueueSize) {
        this.defaultInflight = commonConsumerParameters.getSerialConsumer().getInflightSize();
        this.signalProcessingInterval = commonConsumerParameters.getSerialConsumer().getSignalProcessingInterval();
        this.messageReceiverFactory = messageReceiverFactory;
        this.metrics = metrics;
        this.subscription = subscription;
        this.rateLimiter = rateLimiter;
        this.useTopicMessageSizeEnabled = commonConsumerParameters.isUseTopicMessageSizeEnabled();
        this.pendingOffsets = new PendingOffsets(subscription.getQualifiedName(), metrics, this.calculateInflightSize(subscription), offsetQueueSize);
        this.consumerAuthorizationHandler = consumerAuthorizationHandler;
        this.trackers = trackers;
        this.messageConverterResolver = messageConverterResolver;
        this.loadRecorder = loadRecorder;
        this.messageReceiver = new UninitializedMessageReceiver();
        this.topic = topic;
        this.offsetCommitter = new OffsetCommitter(consumerPartitionAssignmentState, metrics);
        this.sender = consumerMessageSenderFactory.create(subscription, rateLimiter, this.pendingOffsets, loadRecorder, metrics);
        this.commitPeriod = commitPeriod;
        this.lastCommitTime = Instant.now();
    }

    private int calculateInflightSize(Subscription subscription) {
        Optional<Integer> subscriptionInflight = Optional.ofNullable(subscription.getSerialSubscriptionPolicy().getInflightSize());
        return subscriptionInflight.orElse(this.defaultInflight);
    }

    @Override
    public void consume(Runnable signalsInterrupt) {
        try {
            ConsumerProfiler profiler = this.subscription.isProfilingEnabled() ? new DefaultConsumerProfiler(this.subscription.getQualifiedName(), this.subscription.getProfilingThresholdMs()) : new NoOpConsumerProfiler();
            profiler.startMeasurements("signalsAndSemaphoreAcquire");
            do {
                this.loadRecorder.recordSingleOperation();
                profiler.startPartialMeasurement("signalsInterrupt.run");
                signalsInterrupt.run();
                this.commitIfReady();
                profiler.stopPartialMeasurement();
            } while (!this.pendingOffsets.tryAcquireSlot(this.signalProcessingInterval));
            profiler.measure("messageReceiver.next");
            Optional<Message> maybeMessage = this.messageReceiver.next();
            profiler.measure("messageConverter.convert");
            if (maybeMessage.isPresent()) {
                Message message = maybeMessage.get();
                if (message.isFiltered()) {
                    profiler.flushMeasurements(ConsumerRun.FILTERED);
                } else {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Read message {} partition {} offset {}", new Object[]{message.getContentType(), message.getPartition(), message.getOffset()});
                    }
                    Message convertedMessage = this.messageConverterResolver.converterFor(message, this.subscription).convert(message, this.topic);
                    this.sendMessage(convertedMessage, profiler);
                }
            } else {
                this.pendingOffsets.releaseSlot();
                profiler.flushMeasurements(ConsumerRun.EMPTY);
            }
        }
        catch (InterruptedException e) {
            logger.info("Restoring interrupted status {}", (Object)this.subscription.getQualifiedName(), (Object)e);
            Thread.currentThread().interrupt();
        }
        catch (Exception e) {
            logger.error("Consumer loop failed for {}", (Object)this.subscription.getQualifiedName(), (Object)e);
        }
    }

    private void commitIfReady() {
        if (this.isReadyToCommit()) {
            Set<SubscriptionPartitionOffset> offsetsToCommit = this.offsetCommitter.calculateOffsetsToBeCommitted(this.pendingOffsets.getOffsetsSnapshotAndReleaseProcessedSlots());
            if (!offsetsToCommit.isEmpty()) {
                this.commit(offsetsToCommit);
            }
            this.lastCommitTime = Instant.now();
        }
    }

    private boolean isReadyToCommit() {
        return Duration.between(this.lastCommitTime, Instant.now()).toMillis() > this.commitPeriod.toMillis();
    }

    private void sendMessage(Message message, ConsumerProfiler profiler) throws InterruptedException {
        profiler.measure("offsetQueue.offerInflightOffset");
        this.pendingOffsets.markAsInflight(SubscriptionPartitionOffset.subscriptionPartitionOffset(this.subscription.getQualifiedName(), message.getPartitionOffset(), message.getPartitionAssignmentTerm()));
        profiler.measure("trackers.logInflight");
        this.trackers.get(this.subscription).logInflight(MessageConverter.toMessageMetadata(message, this.subscription));
        this.sender.sendAsync(message, profiler);
    }

    @Override
    public void initialize() {
        logger.info("Consumer: preparing message receiver for subscription {}", (Object)this.subscription.getQualifiedName());
        this.initializeMessageReceiver();
        this.sender.initialize();
        this.rateLimiter.initialize();
        this.loadRecorder.initialize();
        this.consumerAuthorizationHandler.createSubscriptionHandler(this.subscription.getQualifiedName());
    }

    private void initializeMessageReceiver() {
        this.messageReceiver = this.messageReceiverFactory.createMessageReceiver(this.topic, this.subscription, this.rateLimiter, this.loadRecorder, this.metrics, this.pendingOffsets::markAsProcessed);
    }

    @Override
    public void tearDown() {
        this.messageReceiver.stop();
        this.sender.shutdown();
        this.rateLimiter.shutdown();
        this.loadRecorder.shutdown();
        this.consumerAuthorizationHandler.removeSubscriptionHandler(this.subscription.getQualifiedName());
        this.metrics.unregisterAllMetricsRelatedTo(this.subscription.getQualifiedName());
    }

    @Override
    public void updateSubscription(Subscription newSubscription) {
        logger.info("Updating consumer for subscription {}", (Object)this.subscription.getQualifiedName());
        this.pendingOffsets.setInflightSize(this.calculateInflightSize(newSubscription));
        this.rateLimiter.updateSubscription(newSubscription);
        this.sender.updateSubscription(newSubscription);
        this.messageReceiver.update(newSubscription);
        this.consumerAuthorizationHandler.updateSubscription(newSubscription.getQualifiedName());
        this.subscription = newSubscription;
    }

    @Override
    public void updateTopic(Topic newTopic) {
        if (this.topic.getContentType() != newTopic.getContentType() || this.messageSizeChanged(newTopic) || this.topic.isSchemaIdAwareSerializationEnabled() != newTopic.isSchemaIdAwareSerializationEnabled()) {
            logger.info("Reinitializing message receiver, contentType, messageSize or schemaIdAwareSerialization changed.");
            this.topic = newTopic;
            this.messageReceiver.stop();
            this.initializeMessageReceiver();
        }
    }

    private boolean messageSizeChanged(Topic newTopic) {
        return this.topic.getMaxMessageSize() != newTopic.getMaxMessageSize() && this.useTopicMessageSizeEnabled;
    }

    @Override
    public void commit(Set<SubscriptionPartitionOffset> offsets) {
        this.messageReceiver.commit(offsets);
    }

    @Override
    public boolean moveOffset(PartitionOffset offset) {
        return this.messageReceiver.moveOffset(offset);
    }

    @Override
    public Subscription getSubscription() {
        return this.subscription;
    }
}

