/*
 * Decompiled with CFR 0.152.
 */
package ru.tinkoff.kora.kafka.common.containers;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import ru.tinkoff.kora.application.graph.Lifecycle;
import ru.tinkoff.kora.common.Context;
import ru.tinkoff.kora.kafka.common.KafkaConsumerFactory;
import ru.tinkoff.kora.kafka.common.config.KafkaConsumerConfig;
import ru.tinkoff.kora.kafka.common.containers.handlers.BaseKafkaRecordsHandler;

public final class KafkaConsumerContainer<K, V>
implements Lifecycle {
    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerContainer.class);
    private volatile AtomicBoolean isActive = new AtomicBoolean(true);
    private final AtomicLong backoffTimeout;
    private ExecutorService executorService;
    private final KafkaConsumerFactory<K, V> factory;
    private final BaseKafkaRecordsHandler<K, V> handler;
    private final Set<Consumer<K, V>> consumers = new HashSet<Consumer<K, V>>();
    private final KafkaConsumerConfig config;
    private final boolean allowCommit;

    public KafkaConsumerContainer(KafkaConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, BaseKafkaRecordsHandler<K, V> handler) {
        this.handler = handler;
        this.config = config;
        this.backoffTimeout = new AtomicLong(config.backoffTimeout().toMillis());
        if (config.driverProperties().getProperty("group.id") == null) {
            this.factory = KafkaConsumerFactory.assign(config, keyDeserializer, valueDeserializer);
            this.allowCommit = false;
        } else {
            if (config.driverProperties().getProperty("enable.auto.commit") == null) {
                config = config.withDriverPropertiesOverrides(Map.of("enable.auto.commit", false));
            }
            this.factory = KafkaConsumerFactory.subscribe(config, keyDeserializer, valueDeserializer);
            this.allowCommit = true;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public void launchPollLoop() {
        Consumer<K, V> consumer = null;
        block13: while (this.isActive.get()) {
            if (consumer == null) {
                try {
                    consumer = this.factory.buildConsumer();
                    this.consumers.add(consumer);
                }
                catch (WakeupException wakeupException) {
                }
                catch (KafkaException e) {
                    logger.error("Error initializing KafkaConsumer", (Throwable)e);
                    try {
                        Thread.sleep(1000L);
                    }
                    catch (InterruptedException ie) {
                        logger.error("Error interrupting thread", (Throwable)ie);
                    }
                    continue;
                }
            }
            if (consumer == null) continue;
            while (this.isActive.get()) {
                try {
                    ConsumerRecords records = consumer.poll(this.config.pollTimeout());
                    this.handler.handle(records, consumer, this.allowCommit);
                    this.backoffTimeout.set(this.config.backoffTimeout().toMillis());
                }
                catch (WakeupException records) {
                }
                catch (Exception e) {
                    logger.error("Unhandled exception", (Throwable)e);
                    consumer.close();
                    this.consumers.remove(consumer);
                    consumer = null;
                    try {
                        Thread.sleep(this.backoffTimeout.get());
                    }
                    catch (InterruptedException ie) {
                        logger.error("Error interrupting thread", (Throwable)ie);
                    }
                    if (this.backoffTimeout.get() >= 60000L) continue block13;
                    this.backoffTimeout.set(this.backoffTimeout.get() * 2L);
                    continue block13;
                }
                finally {
                    Context.clear();
                    continue block13;
                }
            }
        }
        if (consumer != null) {
            Thread.interrupted();
            consumer.close();
        }
    }

    public Mono<Void> init() {
        return Mono.fromRunnable(() -> {
            if (this.config.threads() > 0) {
                String prefix = this.config.topics() != null ? String.join((CharSequence)";", this.config.topics()) : (this.config.topicsPattern() != null ? this.config.topicsPattern().toString() : (this.config.partitions() != null ? String.join((CharSequence)";", this.config.partitions()) : "unknown"));
                this.executorService = Executors.newFixedThreadPool(this.config.threads(), new NamedThreadFactory(prefix));
                for (int i = 0; i < this.config.threads(); ++i) {
                    this.executorService.execute(this::launchPollLoop);
                }
            }
        });
    }

    public Mono<Void> release() {
        return Mono.fromRunnable(() -> {
            if (this.isActive.compareAndSet(true, false)) {
                for (Consumer<K, V> consumer : this.consumers) {
                    consumer.wakeup();
                }
                this.consumers.clear();
                this.executorService.shutdownNow();
            }
        });
    }

    private static class NamedThreadFactory
    implements ThreadFactory {
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;
        private final String consumerPrefix = "kafka-consumer-";

        NamedThreadFactory(String prefix) {
            this.namePrefix = prefix;
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r, "kafka-consumer-" + this.namePrefix + this.threadNumber.getAndIncrement());
            t.setDaemon(false);
            t.setPriority(5);
            return t;
        }
    }
}

