/*
 * Decompiled with CFR 0.152.
 */
package kz.greetgo.kafka.consumer;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import kz.greetgo.kafka.consumer.ConsumerConfigDefaults;
import kz.greetgo.kafka.consumer.ConsumerConfigWorker;
import kz.greetgo.kafka.consumer.ConsumerDefinition;
import kz.greetgo.kafka.consumer.ConsumerReactor;
import kz.greetgo.kafka.core.config.EventConfigStorage;
import kz.greetgo.kafka.core.logger.Logger;
import kz.greetgo.kafka.core.logger.LoggerType;
import kz.greetgo.kafka.model.Box;
import kz.greetgo.kafka.producer.ProducerFacadeBridge;
import kz.greetgo.kafka.producer.ProducerSource;
import kz.greetgo.kafka.serializer.BoxDeserializer;
import kz.greetgo.strconverter.StrConverter;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;

public class ConsumerReactorImpl
implements ConsumerReactor {
    public Logger logger;
    public Supplier<StrConverter> strConverterSupplier;
    public ConsumerDefinition consumerDefinition;
    public EventConfigStorage configStorage;
    public Supplier<String> bootstrapServers;
    public String hostId;
    public Supplier<ConsumerConfigDefaults> consumerConfigDefaults;
    public ProducerSource producerSource;
    private final ConsumerConfigWorker consumerConfigWorker = new ConsumerConfigWorker(() -> this.configStorage, this::refresh, () -> this.consumerConfigDefaults.get());
    private final AtomicBoolean working = new AtomicBoolean(true);
    private final ConcurrentHashMap<Long, Worker> workers = new ConcurrentHashMap();
    private final AtomicLong nextValue = new AtomicLong(1L);

    public void start() {
        if (!this.working.get()) {
            throw new IllegalStateException("Cannot start closed ConsumerReactor");
        }
        this.consumerConfigWorker.setConfigPathPrefix(this.consumerDefinition.getConfigPath());
        this.consumerConfigWorker.setHostId(this.hostId);
        this.consumerConfigWorker.start();
        this.refresh();
    }

    public void stop() {
        if (this.working.get() && this.working.compareAndSet(true, false)) {
            this.consumerConfigWorker.close();
        }
    }

    public void join() {
        Iterator<Worker> iterator;
        while ((iterator = this.workers.values().iterator()).hasNext()) {
            try {
                iterator.next().join();
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        return;
    }

    @Override
    public void refresh() {
        HashSet<Long> toDelete = new HashSet<Long>();
        int currentCount = 0;
        for (Map.Entry<Long, Worker> e : this.workers.entrySet()) {
            if (!e.getValue().isRunning()) {
                toDelete.add(e.getKey());
                continue;
            }
            ++currentCount;
        }
        toDelete.forEach(this.workers::remove);
        int workerCount = this.consumerConfigWorker.getWorkerCount();
        if (this.logger.isShow(LoggerType.LOG_CONSUMER_REACTOR_REFRESH)) {
            this.logger.logConsumerReactorRefresh(this.consumerDefinition, currentCount, workerCount);
        }
        if (workerCount > currentCount) {
            int n = workerCount - currentCount;
            for (int i = 0; i < n; ++i) {
                this.appendWorker();
            }
        } else if (workerCount < currentCount) {
            this.removeWorkers(currentCount - workerCount);
        }
    }

    private void appendWorker() {
        Worker worker = new Worker();
        this.workers.put(worker.id, worker);
        worker.start();
    }

    private void removeWorkers(int countToRemove) {
        int removed = 0;
        while (removed < countToRemove) {
            ArrayList list = new ArrayList(this.workers.keySet());
            if (list.isEmpty()) {
                return;
            }
            for (Long id : list) {
                Worker worker = this.workers.remove(id);
                if (worker == null || !worker.isRunning() || ++removed < countToRemove) continue;
                return;
            }
        }
    }

    static /* synthetic */ ConsumerConfigWorker access$300(ConsumerReactorImpl x0) {
        return x0.consumerConfigWorker;
    }

    static /* synthetic */ AtomicBoolean access$400(ConsumerReactorImpl x0) {
        return x0.working;
    }

    static /* synthetic */ ConcurrentHashMap access$500(ConsumerReactorImpl x0) {
        return x0.workers;
    }

    private class Worker
    extends Thread {
        private final long id;
        private final AtomicBoolean running;

        private Worker() {
            this.id = ConsumerReactorImpl.this.nextValue.getAndIncrement();
            this.running = new AtomicBoolean(true);
        }

        public boolean isRunning() {
            return this.running.get();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * WARNING - Removed back jump from a try to a catch block - possible behaviour change.
         * Unable to fully structure code
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        @Override
        public void run() {
            try {
                Thread.currentThread().setName("kafka-consumer-" + ConsumerReactorImpl.this.consumerDefinition.logDisplay() + "-" + this.id);
                if (ConsumerReactorImpl.this.logger.isShow(LoggerType.LOG_START_CONSUMER_WORKER)) {
                    ConsumerReactorImpl.this.logger.logConsumerStartWorker(ConsumerReactorImpl.this.consumerDefinition, this.id);
                }
                configMap = ConsumerReactorImpl.access$300(ConsumerReactorImpl.this).getConfigMap();
                configMap.put("bootstrap.servers", ConsumerReactorImpl.this.bootstrapServers.get());
                configMap.put("auto.offset.reset", ConsumerReactorImpl.this.consumerDefinition.getAutoOffsetReset().name().toLowerCase());
                configMap.put("group.id", ConsumerReactorImpl.this.consumerDefinition.getGroupId());
                configMap.put("enable.auto.commit", ConsumerReactorImpl.this.consumerDefinition.isAutoCommit() != false ? "true" : "false");
                configMap.put("internal.leave.group.on.close", "false");
                if (ConsumerReactorImpl.this.logger.isShow(LoggerType.SHOW_CONSUMER_WORKER_CONFIG)) {
                    ConsumerReactorImpl.this.logger.logConsumerWorkerConfig(ConsumerReactorImpl.this.consumerDefinition, this.id, configMap);
                }
                forKey = new ByteArrayDeserializer();
                forValue = new BoxDeserializer(ConsumerReactorImpl.this.strConverterSupplier.get());
                invoker = ConsumerReactorImpl.this.consumerDefinition.getInvoker();
                usingProducerNames = invoker.getUsingProducerNames();
                invokeSession = invoker.createSession();
                var7_7 = null;
                try {
                    for (String producerName : usingProducerNames) {
                        producer = ProducerFacadeBridge.createPermanentBridge(producerName, ConsumerReactorImpl.this.producerSource);
                        invokeSession.putProducer(producerName, producer);
                    }
                    block34: while (ConsumerReactorImpl.access$400(ConsumerReactorImpl.this).get() != false) {
                        block49: {
                            block50: {
                                if (ConsumerReactorImpl.access$500(ConsumerReactorImpl.this).containsKey(this.id) == false) return;
                                consumer = new KafkaConsumer(configMap, (Deserializer)forKey, (Deserializer)forValue);
                                var9_11 = null;
                                try {
                                    consumer.subscribe(ConsumerReactorImpl.this.consumerDefinition.topicList());
lbl35:
                                    // 3 sources

                                    while (ConsumerReactorImpl.access$400(ConsumerReactorImpl.this).get() && ConsumerReactorImpl.access$500(ConsumerReactorImpl.this).containsKey(this.id)) {
                                        try {
                                            records = consumer.poll(ConsumerReactorImpl.access$300(ConsumerReactorImpl.this).pollDuration());
                                            if (records.count() == 0) {
                                                continue;
                                            }
                                            ** GOTO lbl-1000
                                        }
                                        catch (RuntimeException exception) {
                                            if (ConsumerReactorImpl.this.logger.isShow(LoggerType.LOG_CONSUMER_POLL_EXCEPTION_HAPPENED)) {
                                                ConsumerReactorImpl.this.logger.logConsumerPollExceptionHappened(exception, ConsumerReactorImpl.this.consumerDefinition);
                                            }
                                            ConsumerReactorImpl.access$400(ConsumerReactorImpl.this).set(false);
                                            if (consumer == null) continue block34;
                                            if (var9_11 != null) {
                                                try {
                                                    consumer.close();
                                                }
                                                catch (Throwable var12_20) {
                                                    var9_11.addSuppressed(var12_20);
                                                }
                                                continue block34;
                                            }
                                            consumer.close();
                                            continue block34;
                                        }
                                    }
                                    break block49;
                                }
                                catch (Throwable var10_14) {
                                    try {
                                        var9_11 = var10_14;
                                        throw var10_14;
                                    }
                                    catch (Throwable var13_22) {
                                        if (consumer == null) throw var13_22;
                                        if (var9_11 != null) {
                                            try {
                                                consumer.close();
                                                throw var13_22;
                                            }
                                            catch (Throwable var14_23) {
                                                var9_11.addSuppressed(var14_23);
                                                throw var13_22;
                                            }
                                        }
                                        consumer.close();
                                        throw var13_22;
                                    }
                                }
lbl-1000:
                                // 1 sources

                                {
                                    if (invokeSession.invoke((ConsumerRecords<byte[], Box>)records).needToCommit()) ** GOTO lbl-1000
                                    if (consumer == null) continue;
                                    if (var9_11 == null) break block50;
                                }
                                try {
                                    consumer.close();
                                }
                                catch (Throwable exception) {
                                    var9_11.addSuppressed(exception);
                                }
                                continue;
                            }
                            consumer.close();
                            continue;
lbl-1000:
                            // 1 sources

                            {
                                try {
                                    consumer.commitSync();
                                    ** GOTO lbl35
                                }
                                catch (RuntimeException exception) {
                                    if (ConsumerReactorImpl.this.logger.isShow(LoggerType.LOG_CONSUMER_COMMIT_SYNC_EXCEPTION_HAPPENED)) {
                                        ConsumerReactorImpl.this.logger.logConsumerCommitSyncExceptionHappened(exception, ConsumerReactorImpl.this.consumerDefinition);
                                    }
                                    if (consumer == null) continue;
                                    if (var9_11 != null) {
                                        try {
                                            consumer.close();
                                        }
                                        catch (Throwable var12_21) {
                                            var9_11.addSuppressed(var12_21);
                                        }
                                        continue;
                                    }
                                    consumer.close();
                                    continue;
                                }
                            }
                        }
                        if (consumer == null) continue;
                        if (var9_11 != null) {
                            try {
                                consumer.close();
                            }
                            catch (Throwable var10_13) {
                                var9_11.addSuppressed(var10_13);
                            }
                            continue;
                        }
                        consumer.close();
                    }
                    return;
                }
                catch (Throwable var8_10) {
                    var7_7 = var8_10;
                    throw var8_10;
                }
                finally {
                    if (invokeSession != null) {
                        if (var7_7 != null) {
                            try {
                                invokeSession.close();
                            }
                            catch (Throwable var8_9) {
                                var7_7.addSuppressed(var8_9);
                            }
                        } else {
                            invokeSession.close();
                        }
                    }
                }
            }
            finally {
                this.running.set(false);
                ConsumerReactorImpl.access$500(ConsumerReactorImpl.this).remove(this.id);
                if (ConsumerReactorImpl.this.logger.isShow(LoggerType.LOG_CONSUMER_FINISH_WORKER)) {
                    ConsumerReactorImpl.this.logger.logConsumerFinishWorker(ConsumerReactorImpl.this.consumerDefinition, this.id);
                }
            }
        }
    }
}

