/*
 * Decompiled with CFR 0.152.
 */
package ru.tinkoff.kora.kafka.common.consumer.containers;

import jakarta.annotation.Nullable;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.tinkoff.kora.application.graph.Lifecycle;
import ru.tinkoff.kora.common.Context;
import ru.tinkoff.kora.common.util.TimeUtils;
import ru.tinkoff.kora.kafka.common.KafkaUtils;
import ru.tinkoff.kora.kafka.common.consumer.ConsumerAwareRebalanceListener;
import ru.tinkoff.kora.kafka.common.consumer.KafkaListenerConfig;
import ru.tinkoff.kora.kafka.common.consumer.containers.ConsumerWrapper;
import ru.tinkoff.kora.kafka.common.consumer.containers.handlers.BaseKafkaRecordsHandler;

public final class KafkaSubscribeConsumerContainer<K, V>
implements Lifecycle {
    private static final Logger logger = LoggerFactory.getLogger(KafkaSubscribeConsumerContainer.class);
    private final AtomicBoolean isActive = new AtomicBoolean(false);
    private final AtomicLong backoffTimeout;
    private final Deserializer<K> keyDeserializer;
    private final Deserializer<V> valueDeserializer;
    private volatile ExecutorService executorService;
    private final BaseKafkaRecordsHandler<K, V> handler;
    private final Set<Consumer<K, V>> consumers = Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap()));
    @Nullable
    private final ConsumerAwareRebalanceListener rebalanceListener;
    private final KafkaListenerConfig config;
    private final String consumerPrefix;
    private final boolean commitAllowed;

    public KafkaSubscribeConsumerContainer(KafkaListenerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, BaseKafkaRecordsHandler<K, V> handler) {
        this(config, keyDeserializer, valueDeserializer, handler, null);
    }

    public KafkaSubscribeConsumerContainer(KafkaListenerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, BaseKafkaRecordsHandler<K, V> handler, @Nullable ConsumerAwareRebalanceListener rebalanceListener) {
        if (config.driverProperties().get("group.id") == null) {
            throw new IllegalArgumentException("Group id is required for subscribe container");
        }
        this.handler = handler;
        this.rebalanceListener = rebalanceListener;
        this.backoffTimeout = new AtomicLong(config.backoffTimeout().toMillis());
        this.consumerPrefix = KafkaUtils.getConsumerPrefix(config);
        Object autoCommit = config.driverProperties().get("enable.auto.commit");
        if (autoCommit == null) {
            config = config.withDriverPropertiesOverrides(Map.of("enable.auto.commit", false));
            this.commitAllowed = true;
        } else {
            this.commitAllowed = !Boolean.parseBoolean(String.valueOf(autoCommit));
        }
        this.config = config;
        this.keyDeserializer = keyDeserializer;
        this.valueDeserializer = valueDeserializer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public void launchPollLoop(Consumer<K, V> consumer, long started) {
        try (Consumer<K, V> consumer2 = consumer;){
            this.consumers.add(consumer);
            logger.info("Kafka Consumer '{}' started in {}", (Object)this.consumerPrefix, (Object)TimeUtils.tookForLogging((long)started));
            boolean isFirstPoll = true;
            while (this.isActive.get()) {
                try {
                    logger.trace("Kafka Consumer '{}' polling...", (Object)this.consumerPrefix);
                    ConsumerRecords records = consumer.poll(this.config.pollTimeout());
                    if (isFirstPoll) {
                        logger.info("Kafka Consumer '{}' first poll in {}", (Object)this.consumerPrefix, (Object)TimeUtils.tookForLogging((long)started));
                        isFirstPoll = false;
                    }
                    if (!records.isEmpty() && logger.isTraceEnabled()) {
                        HashSet<String> logTopics = new HashSet<String>(records.partitions().size());
                        HashSet<Integer> logPartitions = new HashSet<Integer>(records.partitions().size());
                        for (TopicPartition partition : records.partitions()) {
                            logPartitions.add(partition.partition());
                            logTopics.add(partition.topic());
                        }
                        logger.trace("Kafka Consumer '{}' polled '{}' records from topics {} and partitions {}", new Object[]{this.consumerPrefix, records.count(), logTopics, logPartitions});
                    } else if (!records.isEmpty() && logger.isDebugEnabled()) {
                        logger.debug("Kafka Consumer '{}' polled '{}' records", (Object)this.consumerPrefix, (Object)records.count());
                    } else {
                        logger.trace("Kafka Consumer '{}' polled '0' records", (Object)this.consumerPrefix);
                    }
                    this.handler.handle(records, consumer, this.commitAllowed);
                    this.backoffTimeout.set(this.config.backoffTimeout().toMillis());
                }
                catch (WakeupException records) {
                }
                catch (Exception e) {
                    logger.error("Kafka Consumer '{}' got unhandled exception", (Object)this.consumerPrefix, (Object)e);
                    try {
                        Thread.sleep(this.backoffTimeout.get());
                    }
                    catch (InterruptedException ie) {
                        logger.error("Kafka Consumer '{}' error interrupting thread", (Object)this.consumerPrefix, (Object)ie);
                    }
                    if (this.backoffTimeout.get() >= 60000L) break;
                    this.backoffTimeout.set(this.backoffTimeout.get() * 2L);
                    break;
                }
                finally {
                    Context.clear();
                }
            }
            Thread.interrupted();
            return;
        }
        finally {
            this.consumers.remove(consumer);
        }
    }

    public void init() {
        if (this.config.threads() > 0 && this.isActive.compareAndSet(false, true)) {
            logger.debug("Kafka Consumer '{}' starting...", (Object)this.consumerPrefix);
            long started = TimeUtils.started();
            this.executorService = Executors.newFixedThreadPool(this.config.threads(), new KafkaUtils.NamedThreadFactory(this.consumerPrefix));
            for (int i = 0; i < this.config.threads(); ++i) {
                this.executorService.execute(() -> {
                    while (this.isActive.get()) {
                        Consumer<K, V> consumer = this.initializeConsumer();
                        if (consumer == null) continue;
                        this.launchPollLoop(consumer, started);
                    }
                });
            }
        }
    }

    public void release() {
        if (this.isActive.compareAndSet(true, false)) {
            logger.debug("Kafka Consumer '{}' stopping...", (Object)this.consumerPrefix);
            long started = TimeUtils.started();
            for (Consumer<K, V> consumer : this.consumers) {
                consumer.wakeup();
            }
            this.consumers.clear();
            if (this.executorService != null) {
                this.executorService.shutdownNow();
            }
            logger.info("Kafka Consumer '{}' stopped in {}", (Object)this.consumerPrefix, (Object)TimeUtils.tookForLogging((long)started));
        }
    }

    @Nullable
    private Consumer<K, V> initializeConsumer() {
        try {
            return this.buildConsumer();
        }
        catch (Exception e) {
            logger.error("Kafka Consumer '{}' initialization failed", (Object)this.consumerPrefix, (Object)e);
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException ie) {
                logger.error("Kafka Consumer '{}' error interrupting thread", (Object)this.consumerPrefix, (Object)ie);
            }
            return null;
        }
    }

    private Consumer<K, V> buildConsumer() {
        final KafkaConsumer consumer = new KafkaConsumer(this.config.driverProperties(), (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer());
        try {
            if (this.config.topicsPattern() != null) {
                if (this.rebalanceListener != null) {
                    consumer.subscribe(this.config.topicsPattern(), new ConsumerRebalanceListener(){

                        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                            KafkaSubscribeConsumerContainer.this.rebalanceListener.onPartitionsRevoked((Consumer<?, ?>)consumer, partitions);
                        }

                        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                            KafkaSubscribeConsumerContainer.this.rebalanceListener.onPartitionsAssigned((Consumer<?, ?>)consumer, partitions);
                        }

                        public void onPartitionsLost(Collection<TopicPartition> partitions) {
                            KafkaSubscribeConsumerContainer.this.rebalanceListener.onPartitionsLost((Consumer<?, ?>)consumer, partitions);
                        }
                    });
                } else {
                    consumer.subscribe(this.config.topicsPattern());
                }
            } else if (this.config.topics() != null) {
                if (this.rebalanceListener != null) {
                    consumer.subscribe(this.config.topicsPattern(), new ConsumerRebalanceListener(){

                        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                            KafkaSubscribeConsumerContainer.this.rebalanceListener.onPartitionsRevoked((Consumer<?, ?>)consumer, partitions);
                        }

                        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                            KafkaSubscribeConsumerContainer.this.rebalanceListener.onPartitionsAssigned((Consumer<?, ?>)consumer, partitions);
                        }

                        public void onPartitionsLost(Collection<TopicPartition> partitions) {
                            KafkaSubscribeConsumerContainer.this.rebalanceListener.onPartitionsLost((Consumer<?, ?>)consumer, partitions);
                        }
                    });
                } else {
                    consumer.subscribe(this.config.topics());
                }
            }
        }
        catch (Exception e) {
            try {
                consumer.close();
            }
            catch (Exception suppressed) {
                e.addSuppressed(suppressed);
            }
            throw e;
        }
        return new ConsumerWrapper<K, V>((Consumer<byte[], byte[]>)consumer, this.keyDeserializer, this.valueDeserializer);
    }
}

