/*
 * Decompiled with CFR 0.152.
 */
package kz.greetgo.kafka.core;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongSupplier;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import kz.greetgo.kafka.consumer.ConsumerActivateFilter;
import kz.greetgo.kafka.consumer.ConsumerDefinition;
import kz.greetgo.kafka.consumer.ConsumerDefinitionCustom;
import kz.greetgo.kafka.consumer.ConsumerDefinitionCustomBuilder;
import kz.greetgo.kafka.consumer.ConsumerReactor;
import kz.greetgo.kafka.consumer.ConsumerReactorBuilder;
import kz.greetgo.kafka.consumer.DynamicReactor;
import kz.greetgo.kafka.consumer.DynamicReactorAdding;
import kz.greetgo.kafka.consumer.DynamicReactorAddingImpl;
import kz.greetgo.kafka.consumer.DynamicReactors;
import kz.greetgo.kafka.consumer.InvokeSession;
import kz.greetgo.kafka.consumer.Profile;
import kz.greetgo.kafka.consumer.config.ConsumerReactorConfigFactory;
import kz.greetgo.kafka.core.KafkaReactorAbstract;
import kz.greetgo.kafka.core.logger.Logger;
import kz.greetgo.kafka.core.logger.LoggerType;
import kz.greetgo.kafka.producer.BoxRecord;
import kz.greetgo.kafka.producer.ProducerSource;
import kz.greetgo.kafka.producer.config.ProducerConnectParams;
import kz.greetgo.kafka.producer.config.ProducerReactorConfigFactory;
import kz.greetgo.kafka.util.ConsumerActivateFilterUtil;
import kz.greetgo.kafka.util.Handler;
import kz.greetgo.kafka.util.KeyUtil;
import kz.greetgo.kafka.util.Listener;
import kz.greetgo.strconverter.StrConverter;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;

public class KafkaReactorImpl
extends KafkaReactorAbstract {
    ConsumerReactorConfigFactory consumerConfigFactory;
    ConsumerReactorConfigFactory dynamicConsumerConfigFactory;
    ProducerReactorConfigFactory producerConfigFactory;
    LongSupplier refreshTopicListMillis;
    Supplier<String> consumerThreadPrefix;
    ConsumerActivateFilter consumerActivateFilter;
    private final AtomicReference<RefreshStaticListThread> refreshTopicListThread = new AtomicReference<Object>(null);
    private final List<ConsumerReactor> consumerReactorList = Collections.synchronizedList(new ArrayList());
    private final List<ConsumerDefinition> directDefinitionList = new ArrayList<ConsumerDefinition>();
    protected final ThreadLocal<Boolean> isDirectThreadLocal = ThreadLocal.withInitial(() -> false);
    private InitVariant initiated;
    private final AtomicBoolean working = new AtomicBoolean(true);
    private final ProducerSource producerSource = new ProducerSource(){

        @Override
        public Logger logger() {
            return KafkaReactorImpl.this.logger;
        }

        @Override
        public StrConverter getStrConverter() {
            return (StrConverter)KafkaReactorImpl.this.strConverter.get();
        }

        @Override
        public byte[] extractKey(Object object) {
            return KeyUtil.extractKey(object);
        }

        @Override
        public String author() {
            return KafkaReactorImpl.this.authorSupplier == null ? null : (String)KafkaReactorImpl.this.authorSupplier.get();
        }

        @Override
        public Map<String, String> getConfigFor(String producerName, @Nonnull Profile profile) {
            return KafkaReactorImpl.this.producerConfigFactory.getConfig(producerName, profile).getConfigMap();
        }

        @Override
        public Listener addConfigListener(String producerName, @Nonnull Profile profile, Handler handler) {
            return KafkaReactorImpl.this.producerConfigFactory.getConfig(producerName, profile).addChangeHandler(handler);
        }

        @Override
        public <ValueType> Producer<byte[], ValueType> createProducer(final String producerName, final @Nonnull Profile profile, ByteArraySerializer keySerializer, Serializer<ValueType> valueSerializer) {
            HashMap<String, Object> configMap = new HashMap<String, Object>(this.getConfigFor(producerName, profile));
            KafkaReactorImpl.this.producerConnectSetter.setConnect(configMap, new ProducerConnectParams(){

                @Override
                public String producerName() {
                    return producerName;
                }

                @Override
                public String profile() {
                    return profile.profile;
                }
            });
            if (KafkaReactorImpl.this.logger.isShow(LoggerType.SHOW_PRODUCER_CONFIG)) {
                KafkaReactorImpl.this.logger.logProducerConfigOnCreating(producerName, configMap);
            }
            return new KafkaProducer(configMap, (Serializer)keySerializer, valueSerializer);
        }
    };
    private final ConcurrentHashMap<Long, LocalDynConsumer> dynamicReactorMap = new ConcurrentHashMap();
    private final AtomicLong dynamicIdNext = new AtomicLong(1L);
    private final DynamicReactors dynamicReactors = new DynamicReactors(){

        private DynamicReactor add(ConsumerDefinitionCustomBuilder consumerDefinitionCustomBuilder) {
            final long id = KafkaReactorImpl.this.dynamicIdNext.getAndIncrement();
            ConsumerDefinitionCustom consumerDefinition = KafkaReactorImpl.this.prepareConsumerDefinition(consumerDefinitionCustomBuilder).build();
            final ConsumerReactor reactor = KafkaReactorImpl.this.prepareReactorBuilder(consumerDefinition, KafkaReactorImpl.this.dynamicConsumerConfigFactory).externalWorkingSupplier(() -> KafkaReactorImpl.this.working.get() && KafkaReactorImpl.this.dynamicReactorMap.containsKey(id)).build();
            LocalDynConsumer dynamicReactor = new LocalDynConsumer(){

                @Override
                public long id() {
                    return id;
                }

                @Override
                public ConsumerDefinition definition() {
                    return reactor.getConsumerDefinition();
                }

                @Override
                public void refreshTopicList() {
                    reactor.refreshTopicList();
                }

                @Override
                public void join() {
                    reactor.join();
                }

                @Override
                public void start() {
                    reactor.start();
                }

                @Override
                public void stop() {
                    reactor.stop();
                }

                @Override
                public boolean isWorking() {
                    return reactor.isWorking();
                }
            };
            KafkaReactorImpl.this.dynamicReactorMap.put(id, dynamicReactor);
            if (KafkaReactorImpl.this.initiated == InitVariant.STARTED_CONSUMERS) {
                reactor.start();
            }
            return dynamicReactor;
        }

        @Override
        public DynamicReactorAdding adding() {
            return new DynamicReactorAddingImpl(this::add);
        }

        @Override
        public Collection<DynamicReactor> reactors() {
            return KafkaReactorImpl.this.dynamicReactorMap.values().stream().map(DynamicReactor.class::cast).collect(Collectors.toList());
        }

        @Override
        public DynamicReactor stopById(long id) {
            return KafkaReactorImpl.this.dynamicReactorMap.remove(id);
        }

        @Override
        public Collection<DynamicReactor> stopByFilter(Predicate<DynamicReactor> filter) {
            List removingIds = KafkaReactorImpl.this.dynamicReactorMap.values().stream().filter(filter).map(DynamicReactor::id).collect(Collectors.toList());
            return removingIds.stream().map(this::stopById).filter(Objects::nonNull).collect(Collectors.toList());
        }
    };

    KafkaReactorImpl(Logger logger) {
        super(logger);
    }

    @Override
    protected ConsumerActivateFilter getConsumerActivateFilter() {
        ConsumerActivateFilter f = this.consumerActivateFilter;
        return f != null ? f : ConsumerActivateFilterUtil.ALWAYS_TRUE;
    }

    @Override
    public boolean isDirect() {
        return this.isDirectThreadLocal.get();
    }

    @Override
    public void startConsumers() {
        if (this.initiated != null) {
            throw new RuntimeException("VkHD4j4DnF :: Reactor already initiated with " + this.initiated);
        }
        this.initiated = InitVariant.STARTED_CONSUMERS;
        this.accumulateConsumerDefinitionList();
        for (ConsumerDefinition consumerDefinition : this.consumerDefinitionList) {
            if (consumerDefinition.ignoreKafka()) continue;
            this.consumerConfigFactory.getConsumerConfig(consumerDefinition);
        }
        for (ConsumerDefinition consumerDefinition : this.consumerDefinitionList) {
            if (consumerDefinition.isDirect()) {
                this.directDefinitionList.add(consumerDefinition);
            }
            if (consumerDefinition.ignoreKafka()) continue;
            ConsumerReactor reactor = this.prepareReactorBuilder(consumerDefinition, this.consumerConfigFactory).build();
            reactor.start();
            this.consumerReactorList.add(reactor);
        }
        this.dynamicReactorMap.values().forEach(LocalDynConsumer::start);
        RefreshStaticListThread oldThread = this.refreshTopicListThread.getAndSet(new RefreshStaticListThread());
        if (oldThread != null) {
            oldThread.localWorking.set(false);
        }
    }

    private ConsumerReactorBuilder prepareReactorBuilder(ConsumerDefinition consumerDefinition, ConsumerReactorConfigFactory consumerConfigFactory) {
        return ConsumerReactor.builder().logger(this.logger).producerSynchronizer(this.producerSynchronizer).strConverter(this.strConverter).consumerConnectSetter(this.consumerConnectSetter).consumerDefinition(consumerDefinition).consumerConfig(consumerConfigFactory.getConsumerConfig(consumerDefinition)).consumerThreadPrefix(this.consumerThreadPrefix);
    }

    @Override
    public void activateDirectConsumers() {
        if (this.initiated != null) {
            throw new RuntimeException("2YMtX1PqkJ :: Reactor already initiated with " + this.initiated);
        }
        this.initiated = InitVariant.ACTIVATED_DIRECT_CONSUMERS;
        this.accumulateConsumerDefinitionList();
        for (ConsumerDefinition consumerDefinition : this.consumerDefinitionList) {
            if (!consumerDefinition.isDirect()) continue;
            this.directDefinitionList.add(consumerDefinition);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void afterKafkaSent(BoxRecord boxRecord) {
        this.isDirectThreadLocal.set(true);
        try {
            for (ConsumerDefinition consumerDefinition : this.directDefinitionList) {
                InvokeSession session = consumerDefinition.getInvoker().createSession();
                try {
                    try {
                        if (!consumerDefinition.prefixedTopicList().stream().filter(x -> boxRecord.profile.equals(x.profile)).flatMap(x -> x.topics.stream()).anyMatch(t -> t.equals(boxRecord.prefixedTopic))) continue;
                        session.invoke(boxRecord.toConsumerRecords(), boxRecord.profile);
                    }
                    catch (Exception e) {
                        if (!this.logger.isShow(LoggerType.LOG_PRODUCER_DIRECT_CONSUMER_ERROR)) continue;
                        this.logger.logProducerDirectConsumerError("VpJ3Dl9570", e, consumerDefinition);
                    }
                }
                finally {
                    if (session == null) continue;
                    session.close();
                }
            }
        }
        finally {
            this.isDirectThreadLocal.set(false);
        }
    }

    @Override
    public ProducerSource getProducerSource() {
        return this.producerSource;
    }

    @Override
    public DynamicReactors dynamicConsumers() {
        return this.dynamicReactors;
    }

    @Override
    public void close() {
        this.working.set(false);
        this.consumerReactorList.forEach(ConsumerReactor::stop);
    }

    @Override
    public void join() {
        this.consumerReactorList.forEach(ConsumerReactor::join);
        this.dynamicReactorMap.values().forEach(DynamicReactor::join);
    }

    static interface LocalDynConsumer
    extends DynamicReactor {
        public void start();
    }

    private static enum InitVariant {
        STARTED_CONSUMERS,
        ACTIVATED_DIRECT_CONSUMERS;

    }

    private class RefreshStaticListThread
    extends Thread {
        final AtomicBoolean localWorking = new AtomicBoolean(true);

        private RefreshStaticListThread() {
            this.start();
        }

        @Override
        public void run() {
            while (KafkaReactorImpl.this.working.get() && this.localWorking.get()) {
                try {
                    Thread.sleep(KafkaReactorImpl.this.refreshTopicListMillis.getAsLong());
                }
                catch (InterruptedException e) {
                    return;
                }
                try {
                    new ArrayList<ConsumerReactor>(KafkaReactorImpl.this.consumerReactorList).forEach(ConsumerReactor::refreshTopicList);
                    KafkaReactorImpl.this.dynamicReactors.reactors().forEach(DynamicReactor::refreshTopicList);
                }
                catch (Exception error) {
                    KafkaReactorImpl.this.logger.logErrorRefreshTopicList(error);
                }
            }
        }
    }
}

