/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.consumer.receiver.kafka;

import com.google.common.collect.ImmutableList;
import java.time.Duration;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper;
import pl.allegro.tech.hermes.common.kafka.KafkaTopic;
import pl.allegro.tech.hermes.common.kafka.KafkaTopics;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.consumers.consumer.Message;
import pl.allegro.tech.hermes.consumers.consumer.load.SubscriptionLoadRecorder;
import pl.allegro.tech.hermes.consumers.consumer.offset.ConsumerPartitionAssignmentState;
import pl.allegro.tech.hermes.consumers.consumer.offset.OffsetCommitterConsumerRebalanceListener;
import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset;
import pl.allegro.tech.hermes.consumers.consumer.offset.kafka.broker.KafkaConsumerOffsetMover;
import pl.allegro.tech.hermes.consumers.consumer.receiver.MessageReceiver;
import pl.allegro.tech.hermes.consumers.consumer.receiver.RetryableReceiverError;
import pl.allegro.tech.hermes.consumers.consumer.receiver.kafka.KafkaConsumerRecordToMessageConverter;
import pl.allegro.tech.hermes.consumers.consumer.receiver.kafka.KafkaConsumerRecordToMessageConverterFactory;
import pl.allegro.tech.hermes.metrics.HermesCounter;

public class KafkaSingleThreadedMessageReceiver
implements MessageReceiver {
    private static final Logger logger = LoggerFactory.getLogger(KafkaSingleThreadedMessageReceiver.class);
    private final KafkaConsumer<byte[], byte[]> consumer;
    private final KafkaConsumerRecordToMessageConverter messageConverter;
    private final BlockingQueue<ConsumerRecord<byte[], byte[]>> readQueue;
    private final KafkaConsumerOffsetMover offsetMover;
    private final HermesCounter skippedCounter;
    private final HermesCounter failuresCounter;
    private final SubscriptionLoadRecorder loadReporter;
    private volatile Subscription subscription;
    private final Duration poolTimeout;
    private final ConsumerPartitionAssignmentState partitionAssignmentState;

    public KafkaSingleThreadedMessageReceiver(KafkaConsumer<byte[], byte[]> consumer, KafkaConsumerRecordToMessageConverterFactory messageConverterFactory, MetricsFacade metrics, KafkaNamesMapper kafkaNamesMapper, Topic topic, Subscription subscription, Duration poolTimeout, int readQueueCapacity, SubscriptionLoadRecorder loadReporter, ConsumerPartitionAssignmentState partitionAssignmentState) {
        this.skippedCounter = metrics.offsetCommits().skippedCounter();
        this.failuresCounter = metrics.offsetCommits().failuresCounter();
        this.subscription = subscription;
        this.poolTimeout = poolTimeout;
        this.loadReporter = loadReporter;
        this.partitionAssignmentState = partitionAssignmentState;
        this.consumer = consumer;
        this.readQueue = new ArrayBlockingQueue<ConsumerRecord<byte[], byte[]>>(readQueueCapacity);
        this.offsetMover = new KafkaConsumerOffsetMover(subscription.getQualifiedName(), consumer);
        Map<String, KafkaTopic> topics = this.getKafkaTopics(topic, kafkaNamesMapper).stream().collect(Collectors.toMap(t -> t.name().asString(), Function.identity()));
        this.messageConverter = messageConverterFactory.create(topic, subscription, topics);
        this.consumer.subscribe(topics.keySet(), (ConsumerRebalanceListener)new OffsetCommitterConsumerRebalanceListener(subscription.getQualifiedName(), partitionAssignmentState));
    }

    private Collection<KafkaTopic> getKafkaTopics(Topic topic, KafkaNamesMapper kafkaNamesMapper) {
        KafkaTopics kafkaTopics = kafkaNamesMapper.toKafkaTopics(topic);
        ImmutableList.Builder topicsBuilder = new ImmutableList.Builder().add((Object)kafkaTopics.getPrimary());
        kafkaTopics.getSecondary().ifPresent(arg_0 -> ((ImmutableList.Builder)topicsBuilder).add(arg_0));
        return topicsBuilder.build();
    }

    @Override
    public Optional<Message> next() {
        try {
            this.supplyReadQueue();
            return this.getMessageFromReadQueue();
        }
        catch (InterruptException ex) {
            logger.info("Kafka consumer thread interrupted", (Throwable)ex);
            Thread.currentThread().interrupt();
            return Optional.empty();
        }
        catch (KafkaException ex) {
            logger.error("Error while reading message for subscription {}", (Object)this.subscription.getQualifiedName(), (Object)ex);
            return Optional.empty();
        }
        catch (Exception ex) {
            logger.error("Failed to read message for subscription {}, readQueueSize {}", new Object[]{this.subscription.getQualifiedName(), this.readQueue.size(), ex});
            return Optional.empty();
        }
    }

    private void supplyReadQueue() {
        if (this.readQueue.isEmpty()) {
            ConsumerRecords records = this.consumer.poll(this.poolTimeout);
            try {
                for (ConsumerRecord record : records) {
                    this.loadReporter.recordSingleOperation();
                    this.readQueue.add((ConsumerRecord<byte[], byte[]>)record);
                }
            }
            catch (Exception ex) {
                logger.error("Failed to read message for subscription {}, readQueueSize {}, records {}", new Object[]{this.subscription.getQualifiedName(), this.readQueue.size(), records.count(), ex});
            }
        }
    }

    private Optional<Message> getMessageFromReadQueue() {
        if (!this.readQueue.isEmpty()) {
            ConsumerRecord record = (ConsumerRecord)this.readQueue.element();
            try {
                Message message = this.convertToMessage((ConsumerRecord<byte[], byte[]>)record);
                this.readQueue.poll();
                return Optional.of(message);
            }
            catch (RetryableReceiverError ex) {
                logger.warn("Cannot convert record to message... Operation will be delayed", (Throwable)((Object)ex));
                return Optional.empty();
            }
        }
        return Optional.empty();
    }

    private Message convertToMessage(ConsumerRecord<byte[], byte[]> record) {
        long currentTerm = this.partitionAssignmentState.currentTerm(this.subscription.getQualifiedName());
        return this.messageConverter.convertToMessage(record, currentTerm);
    }

    @Override
    public void stop() {
        try {
            this.consumer.close();
        }
        catch (IllegalStateException illegalStateException) {
        }
        catch (InterruptException interruptException) {
        }
        catch (KafkaException ex) {
            logger.warn("KafkaException occurred during closing consumer.", (Throwable)ex);
        }
        finally {
            this.partitionAssignmentState.revokeAll(this.subscription.getQualifiedName());
        }
    }

    @Override
    public void update(Subscription newSubscription) {
        this.subscription = newSubscription;
        this.messageConverter.update(this.subscription);
    }

    @Override
    public void commit(Set<SubscriptionPartitionOffset> offsets) {
        try {
            this.consumer.commitSync(this.createOffset(offsets));
        }
        catch (InterruptException ex) {
            logger.info("Kafka consumer thread interrupted", (Throwable)ex);
            Thread.currentThread().interrupt();
        }
        catch (Exception ex) {
            logger.error("Error while committing offset for subscription {}", (Object)this.subscription.getQualifiedName(), (Object)ex);
            this.failuresCounter.increment();
        }
    }

    private Map<TopicPartition, OffsetAndMetadata> createOffset(Set<SubscriptionPartitionOffset> partitionOffsets) {
        LinkedHashMap<TopicPartition, OffsetAndMetadata> offsetsData = new LinkedHashMap<TopicPartition, OffsetAndMetadata>();
        for (SubscriptionPartitionOffset partitionOffset : partitionOffsets) {
            TopicPartition topicAndPartition = new TopicPartition(partitionOffset.getKafkaTopicName().asString(), partitionOffset.getPartition());
            if (this.partitionAssignmentState.isAssignedPartitionAtCurrentTerm(partitionOffset.getSubscriptionPartition())) {
                if (this.consumer.position(topicAndPartition) >= partitionOffset.getOffset()) {
                    offsetsData.put(topicAndPartition, new OffsetAndMetadata(partitionOffset.getOffset()));
                    continue;
                }
                this.skippedCounter.increment();
                continue;
            }
            logger.warn("Consumer is not assigned to partition {} of subscription {} at current term {}, ignoring offset {} from term {} to commit", new Object[]{partitionOffset.getPartition(), partitionOffset.getSubscriptionName(), this.partitionAssignmentState.currentTerm(partitionOffset.getSubscriptionName()), partitionOffset.getOffset(), partitionOffset.getPartitionAssignmentTerm()});
        }
        return offsetsData;
    }

    @Override
    public boolean moveOffset(PartitionOffset offset) {
        return this.offsetMover.move(offset);
    }
}

