/*
 * Decompiled with CFR 0.152.
 */
package kz.greetgo.kafka.consumer;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;
import kz.greetgo.kafka.consumer.ConsumerDefinition;
import kz.greetgo.kafka.consumer.ConsumerReactor;
import kz.greetgo.kafka.consumer.Profile;
import kz.greetgo.kafka.consumer.annotations.TopicContentType;
import kz.greetgo.kafka.consumer.config.ConsumerConnectSetter;
import kz.greetgo.kafka.consumer.config.ConsumerReactorConfig;
import kz.greetgo.kafka.core.ProducerSynchronizer;
import kz.greetgo.kafka.core.logger.Logger;
import kz.greetgo.kafka.core.logger.LoggerType;
import kz.greetgo.kafka.serializer.BoxDeserializer;
import kz.greetgo.kafka.serializer.BytesDeserializer;
import kz.greetgo.kafka.serializer.StrDeserializer;
import kz.greetgo.kafka.util.KafkaTopicUtil;
import kz.greetgo.kafka.util.Listener;
import kz.greetgo.strconverter.StrConverter;
import org.apache.kafka.common.serialization.Deserializer;

public class ConsumerReactorImpl
implements ConsumerReactor {
    Logger logger;
    ProducerSynchronizer producerSynchronizer;
    Supplier<StrConverter> strConverter;
    ConsumerDefinition consumerDefinition;
    ConsumerReactorConfig consumerConfig;
    Supplier<String> consumerThreadPrefix;
    BooleanSupplier externalWorkingSupplier;
    ConsumerConnectSetter consumerConnectSetter;
    private final AtomicBoolean working = new AtomicBoolean(true);
    private final ConcurrentHashMap<Long, Worker> workers = new ConcurrentHashMap();
    private final AtomicLong nextValue = new AtomicLong(1L);
    private final ConcurrentHashMap<Profile, Set<String>> currentPrefixedTopics = new ConcurrentHashMap();
    private final ConcurrentLinkedQueue<Listener> listenerQueue = new ConcurrentLinkedQueue();
    private final ConcurrentHashMap<Profile, Integer> profileListeningMap = new ConcurrentHashMap();
    private final Object sync = new Object();
    private final AtomicBoolean started = new AtomicBoolean(false);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void ensureListenProfileConfig(Profile profile) {
        if (!this.working.get()) {
            return;
        }
        if (this.profileListeningMap.containsKey(profile)) {
            return;
        }
        Object object = this.sync;
        synchronized (object) {
            if (this.profileListeningMap.containsKey(profile)) {
                return;
            }
            this.listenerQueue.add(this.consumerConfig.onChangeWorkerCount(() -> this.correctWorkersCount(profile), profile));
            this.listenerQueue.add(this.consumerConfig.onChangeParams(() -> this.restartAllWorkerForProfile(profile), profile));
            this.profileListeningMap.put(profile, 1);
        }
    }

    private void killAllListeners() {
        Listener listener;
        while ((listener = this.listenerQueue.poll()) != null) {
            listener.kill();
        }
        return;
    }

    @Override
    public void refreshTopicList() {
        if (!this.isWorking()) {
            return;
        }
        Map<Profile, Set<String>> newTopics = KafkaTopicUtil.topicsToMap(this.consumerDefinition.prefixedTopicList());
        if (this.currentPrefixedTopics.equals(newTopics)) {
            return;
        }
        this.currentPrefixedTopics.clear();
        this.currentPrefixedTopics.putAll(newTopics);
        ((ConcurrentHashMap.KeySetView)this.currentPrefixedTopics.keySet()).forEach(this::ensureListenProfileConfig);
        this.restartAllWorkers();
    }

    private Deserializer<Object> createValueDeserializer() {
        TopicContentType x = this.consumerDefinition.topicContentType();
        switch (x) {
            case BOX: {
                return new BoxDeserializer(this.strConverter.get());
            }
            case STR: {
                return new StrDeserializer();
            }
            case BYTES: {
                return new BytesDeserializer();
            }
        }
        throw new RuntimeException("FGRnPY8vwj :: Unknown " + TopicContentType.class.getSimpleName() + " = " + x);
    }

    @Override
    public boolean isWorking() {
        return this.working.get() && this.externalWorkingSupplier.getAsBoolean();
    }

    ConsumerReactorImpl() {
    }

    @Override
    public ConsumerDefinition getConsumerDefinition() {
        return this.consumerDefinition;
    }

    @Override
    public ConsumerReactor stop() {
        this.working.set(false);
        this.killAllListeners();
        return this;
    }

    @Override
    public void start() {
        if (!this.working.get()) {
            return;
        }
        if (!this.started.compareAndSet(false, true)) {
            return;
        }
        this.correctWorkersCountForAllProfiles();
    }

    @Override
    public void join() {
        while (this.workers.size() > 0) {
            ArrayList<Worker> list = new ArrayList<Worker>(this.workers.values());
            for (Worker worker : list) {
                try {
                    worker.join();
                }
                catch (InterruptedException e) {
                    return;
                }
            }
        }
    }

    private void restartAllWorkers() {
        if (!this.isWorking()) {
            return;
        }
        this.workers.clear();
        this.correctWorkersCountForAllProfiles();
    }

    private void restartAllWorkerForProfile(Profile profile) {
        ArrayList<Long> toDelete = new ArrayList<Long>();
        for (Map.Entry<Long, Worker> e : this.workers.entrySet()) {
            if (!Objects.equals(profile, e.getValue().workerProfile) || !e.getValue().isInCircle()) continue;
            toDelete.add(e.getKey());
        }
        toDelete.forEach(this.workers::remove);
        this.correctWorkersCount(profile);
    }

    private void correctWorkersCountForAllProfiles() {
        if (!this.isWorking()) {
            return;
        }
        ((ConcurrentHashMap.KeySetView)this.currentPrefixedTopics.keySet()).forEach(this::correctWorkersCount);
    }

    private void correctWorkersCount(Profile profile) {
        if (!this.isWorking()) {
            return;
        }
        Objects.requireNonNull(profile, "mmG8FjXqj2 :: profile == null");
        HashSet<Long> toDelete = new HashSet<Long>();
        int currentCount = 0;
        for (Map.Entry<Long, Worker> e : this.workers.entrySet()) {
            if (!Objects.equals(profile, e.getValue().workerProfile)) continue;
            if (!e.getValue().isInCircle()) {
                toDelete.add(e.getKey());
                continue;
            }
            ++currentCount;
        }
        toDelete.forEach(this.workers::remove);
        int needCount = this.consumerConfig.getWorkerCount(profile);
        this.ensureListenProfileConfig(profile);
        if (this.logger.isShow(LoggerType.LOG_CONSUMER_REACTOR_REFRESH)) {
            this.logger.logConsumerReactorRefresh(this.consumerDefinition, currentCount, needCount, profile);
        }
        if (needCount > currentCount) {
            this.appendWorker(profile, needCount - currentCount);
        } else if (currentCount > needCount) {
            this.removeWorkers(currentCount - needCount, profile);
        }
    }

    private void appendWorker(Profile profile, int count) {
        for (int i = 0; i < count; ++i) {
            Worker worker = new Worker(profile);
            this.workers.put(worker.id, worker);
            worker.start();
        }
    }

    private void removeWorkers(int countToRemove, Profile profile) {
        block0: for (int removed = 0; removed < countToRemove; ++removed) {
            for (Map.Entry<Long, Worker> e : this.workers.entrySet()) {
                Worker worker = e.getValue();
                if (!profile.equals(worker.workerProfile) || !worker.isInCircle()) continue;
                this.workers.remove(e.getKey());
                continue block0;
            }
        }
    }

    private class Worker
    extends Thread {
        private final long id;
        private final Profile workerProfile;
        private final Set<String> prefixedTopics;

        private Worker(Profile workerProfile) {
            this.id = ConsumerReactorImpl.this.nextValue.getAndIncrement();
            Objects.requireNonNull(workerProfile, "37t58W6jtS :: workerProfile == null");
            this.workerProfile = workerProfile;
            this.prefixedTopics = ConsumerReactorImpl.this.currentPrefixedTopics.get(workerProfile);
        }

        private boolean isInCircle() {
            return this.prefixedTopics != null && this.prefixedTopics.size() > 0 && ConsumerReactorImpl.this.isWorking() && ConsumerReactorImpl.this.workers.containsKey(this.id);
        }

        /*
         * Exception decompiling
         */
        @Override
        public void run() {
            /*
             * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
             * 
             * org.benf.cfr.reader.util.ConfusedCFRException: Tried to end blocks [2[TRYBLOCK]], but top level block is 27[WHILELOOP]
             *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.processEndingBlocks(Op04StructuredStatement.java:435)
             *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:484)
             *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
             *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
             *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
             *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
             *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
             *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseInnerClassesPass1(ClassFile.java:923)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1035)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
             *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
             *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
             *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
             *     at org.benf.cfr.reader.Main.main(Main.java:54)
             */
            throw new IllegalStateException("Decompilation failed");
        }
    }
}

