/*
 * Decompiled with CFR 0.152.
 */
package ru.tinkoff.kora.kafka.common.consumer.containers;

import jakarta.annotation.Nullable;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.tinkoff.kora.application.graph.Lifecycle;
import ru.tinkoff.kora.common.Context;
import ru.tinkoff.kora.common.util.TimeUtils;
import ru.tinkoff.kora.kafka.common.KafkaUtils;
import ru.tinkoff.kora.kafka.common.consumer.KafkaListenerConfig;
import ru.tinkoff.kora.kafka.common.consumer.containers.ConsumerWrapper;
import ru.tinkoff.kora.kafka.common.consumer.containers.handlers.BaseKafkaRecordsHandler;
import ru.tinkoff.kora.kafka.common.consumer.telemetry.KafkaConsumerTelemetry;

public final class KafkaAssignConsumerContainer<K, V>
implements Lifecycle {
    private static final Logger logger = LoggerFactory.getLogger(KafkaAssignConsumerContainer.class);
    private final AtomicBoolean isActive = new AtomicBoolean(true);
    private final AtomicLong backoffTimeout;
    private final Deserializer<K> keyDeserializer;
    private final Deserializer<V> valueDeserializer;
    private final int threads;
    private final KafkaListenerConfig config;
    private final long refreshInterval;
    private final String consumerPrefix;
    private volatile ExecutorService executorService;
    private final BaseKafkaRecordsHandler<K, V> handler;
    private final Set<Consumer<K, V>> consumers = Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap()));
    private final AtomicReference<List<TopicPartition>> partitions = new AtomicReference(new ArrayList());
    private final ArrayList<Long> offsets = new ArrayList();
    private final String topic;
    private final KafkaConsumerTelemetry<K, V> telemetry;
    private final AtomicLong lastUpdateTime = new AtomicLong(0L);

    public KafkaAssignConsumerContainer(KafkaListenerConfig config, String topic, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, KafkaConsumerTelemetry<K, V> telemetry, BaseKafkaRecordsHandler<K, V> handler) {
        this.handler = Objects.requireNonNull(handler);
        this.backoffTimeout = new AtomicLong(config.backoffTimeout().toMillis());
        this.keyDeserializer = Objects.requireNonNull(keyDeserializer);
        this.valueDeserializer = Objects.requireNonNull(valueDeserializer);
        this.topic = Objects.requireNonNull(topic);
        this.threads = config.threads();
        this.config = config;
        this.refreshInterval = config.partitionRefreshInterval().toMillis();
        this.telemetry = Objects.requireNonNull(telemetry);
        this.consumerPrefix = KafkaUtils.getConsumerPrefix(this.config);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public void launchPollLoop(Consumer<K, V> consumer, int number, long started) {
        List<TopicPartition> allPartitions = this.partitions.get();
        List<TopicPartition> partitions = List.of();
        logger.info("Kafka Consumer '{}' started in {}", (Object)this.consumerPrefix, (Object)TimeUtils.tookForLogging((long)started));
        boolean isFirstPoll = true;
        while (this.isActive.get()) {
            boolean changed = this.refreshPartitions(allPartitions);
            if (changed) {
                logger.info("Kafka Consumer '{}' refreshing and reassigning partitions...", (Object)this.consumerPrefix);
                allPartitions = this.partitions.get();
                partitions = new ArrayList(allPartitions.size() / this.threads + 1);
                for (int i = number; i < allPartitions.size(); ++i) {
                    if (i % this.config.threads() != number) continue;
                    partitions.add(allPartitions.get(i));
                }
                consumer.assign(partitions);
                ArrayList<Long> i = this.offsets;
                synchronized (i) {
                    this.offsets.ensureCapacity(partitions.size());
                    for (TopicPartition partition : partitions) {
                        Long offset = this.offsets.get(partition.partition());
                        if (offset == null) {
                            if (this.config.offset().right() != null) {
                                String resetTo = Objects.requireNonNull((String)this.config.offset().right());
                                if (resetTo.equals("earliest")) {
                                    consumer.seekToBeginning(List.of(partition));
                                    continue;
                                }
                                if (!resetTo.equals("latest")) throw new IllegalArgumentException("Expected `earliest` or `latest` or some duration value, but received: " + resetTo);
                                consumer.seekToEnd(List.of(partition));
                                continue;
                            }
                            if (this.config.offset().left() == null) continue;
                            Duration resetToDuration = Objects.requireNonNull((Duration)this.config.offset().left());
                            long resetTo = Instant.now().minus(resetToDuration).toEpochMilli();
                            long resetToOffset = ((OffsetAndTimestamp)consumer.offsetsForTimes(Map.of(partition, resetTo)).get(partition)).offset();
                            consumer.seek(partition, resetToOffset);
                            continue;
                        }
                        consumer.seek(partition, offset + 1L);
                    }
                }
            }
            if (partitions.isEmpty()) {
                try {
                    Thread.sleep(1000L);
                }
                catch (InterruptedException i) {}
                continue;
            }
            try {
                logger.trace("Kafka Consumer '{}' polling...", (Object)this.consumerPrefix);
                ConsumerRecords records = consumer.poll(this.config.pollTimeout());
                if (isFirstPoll) {
                    logger.info("Kafka Consumer '{}' first poll in {}", (Object)this.consumerPrefix, (Object)TimeUtils.tookForLogging((long)started));
                    isFirstPoll = false;
                }
                if (!records.isEmpty() && logger.isTraceEnabled()) {
                    HashSet logTopics = new HashSet(records.partitions().size());
                    HashSet<Integer> logPartitions = new HashSet<Integer>(records.partitions().size());
                    for (TopicPartition partition : records.partitions()) {
                        logPartitions.add(partition.partition());
                        logTopics.add(partition.topic());
                    }
                    logger.trace("Kafka Consumer '{}' polled '{}' records from topics {} and partitions {}", new Object[]{this.consumerPrefix, records.count(), logTopics, logPartitions});
                } else if (!records.isEmpty() && logger.isDebugEnabled()) {
                    logger.debug("Kafka Consumer '{}' polled '{}' records", (Object)this.consumerPrefix, (Object)records.count());
                } else {
                    logger.trace("Kafka Consumer '{}' polled '0' records", (Object)this.consumerPrefix);
                }
                this.handler.handle(records, consumer, false);
                for (TopicPartition partition : records.partitions()) {
                    List partitionRecords = records.records(partition);
                    ConsumerRecord lastRecord = (ConsumerRecord)partitionRecords.get(partitionRecords.size() - 1);
                    ArrayList<Long> arrayList = this.offsets;
                    synchronized (arrayList) {
                        this.offsets.set(partition.partition(), lastRecord.offset());
                        this.refreshLag(consumer);
                    }
                }
                this.backoffTimeout.set(this.config.backoffTimeout().toMillis());
            }
            catch (WakeupException records) {
            }
            catch (Exception e) {
                logger.error("Kafka Consumer '{}' got unhandled exception", (Object)this.consumerPrefix, (Object)e);
                try {
                    Thread.sleep(this.backoffTimeout.get());
                }
                catch (InterruptedException ie) {
                    logger.error("Kafka Consumer '{}' error interrupting thread", (Object)this.consumerPrefix, (Object)ie);
                }
                if (this.backoffTimeout.get() >= 60000L) return;
                this.backoffTimeout.set(this.backoffTimeout.get() * 2L);
                return;
            }
            finally {
                Context.clear();
            }
        }
    }

    private void refreshLag(Consumer<K, V> consumer) {
        for (Map.Entry entry : consumer.endOffsets((Collection)this.partitions.get()).entrySet()) {
            TopicPartition p = (TopicPartition)entry.getKey();
            Long latestOffset = (Long)entry.getValue();
            Long currentOffset = this.offsets.get(p.partition());
            if (currentOffset == null) continue;
            long lag = latestOffset - currentOffset;
            this.telemetry.reportLag(p, lag);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private boolean refreshPartitions(List<TopicPartition> partitions) {
        long updateTime = this.lastUpdateTime.get();
        long currentTime = System.currentTimeMillis();
        List<TopicPartition> oldPartitions = this.partitions.get();
        if (currentTime - updateTime <= this.refreshInterval) {
            if (oldPartitions.size() == partitions.size()) return false;
            return true;
        }
        if (!this.lastUpdateTime.compareAndSet(updateTime, currentTime)) return false;
        try (KafkaConsumer consumer = new KafkaConsumer(this.config.driverProperties(), (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer());){
            List newPartitions = consumer.partitionsFor(this.topic);
            if (newPartitions.size() == partitions.size()) {
                boolean bl = false;
                return bl;
            }
            this.partitions.set(newPartitions.stream().map(p -> new TopicPartition(p.topic(), p.partition())).toList());
            ArrayList<Long> arrayList = this.offsets;
            synchronized (arrayList) {
                for (int i2 = this.offsets.size(); i2 < newPartitions.size(); ++i2) {
                    this.offsets.add(null);
                }
                if (oldPartitions.isEmpty()) {
                    List<TopicPartition> p2 = newPartitions.stream().skip(this.offsets.size()).map(i -> new TopicPartition(i.topic(), i.partition())).toList();
                    for (Map.Entry entry : consumer.endOffsets(p2).entrySet()) {
                        this.offsets.set(((TopicPartition)entry.getKey()).partition(), (Long)entry.getValue());
                    }
                }
            }
            boolean bl = true;
            return bl;
        }
        catch (Exception e) {
            this.lastUpdateTime.set(updateTime);
            throw e;
        }
    }

    @Nullable
    private Consumer<K, V> initializeConsumer() {
        try {
            KafkaConsumer realConsumer = new KafkaConsumer(this.config.driverProperties(), (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer());
            return new ConsumerWrapper<K, V>((Consumer<byte[], byte[]>)realConsumer, this.keyDeserializer, this.valueDeserializer);
        }
        catch (Exception e) {
            logger.error("Kafka Consumer '{}' initialization failed", (Object)this.consumerPrefix, (Object)e);
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException ie) {
                logger.error("Kafka Consumer '{}' error interrupting thread", (Object)this.consumerPrefix, (Object)ie);
            }
            return null;
        }
    }

    public void init() {
        int threads = this.threads;
        if (threads > 0 && this.topic != null) {
            logger.debug("Kafka Consumer '{}' starting...", (Object)this.consumerPrefix);
            long started = TimeUtils.started();
            this.executorService = Executors.newFixedThreadPool(threads, new KafkaUtils.NamedThreadFactory(this.topic));
            int i = 0;
            while (i < threads) {
                int number = i++;
                this.executorService.execute(() -> {
                    while (this.isActive.get()) {
                        Consumer<K, V> consumer = this.initializeConsumer();
                        try {
                            if (consumer == null) continue;
                            this.consumers.add(consumer);
                            try {
                                this.launchPollLoop(consumer, number, started);
                            }
                            catch (Exception e) {
                                logger.error("Kafka poll loop '{}' got unhandled exception", (Object)this.consumerPrefix, (Object)e);
                            }
                            finally {
                                this.consumers.remove(consumer);
                            }
                        }
                        finally {
                            if (consumer == null) continue;
                            consumer.close();
                        }
                    }
                });
            }
        }
    }

    public void release() {
        if (this.isActive.compareAndSet(true, false)) {
            logger.debug("Kafka Consumer '{}' stopping...", (Object)this.consumerPrefix);
            long started = System.nanoTime();
            for (Consumer<K, V> consumer : this.consumers) {
                consumer.wakeup();
            }
            this.consumers.clear();
            if (this.executorService != null) {
                this.executorService.shutdownNow();
            }
            logger.info("Kafka Consumer '{}' stopped in {}", (Object)this.consumerPrefix, (Object)TimeUtils.tookForLogging((long)started));
        }
    }
}

