/*
 * Decompiled with CFR 0.152.
 */
package kz.greetgo.kafka.core;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import kz.greetgo.kafka.consumer.ConsumerActivateFilter;
import kz.greetgo.kafka.consumer.ConsumerDefinition;
import kz.greetgo.kafka.consumer.ConsumerDefinitionCustom;
import kz.greetgo.kafka.consumer.ConsumerDefinitionCustomBuilder;
import kz.greetgo.kafka.consumer.DynamicReactor;
import kz.greetgo.kafka.consumer.DynamicReactorAdding;
import kz.greetgo.kafka.consumer.DynamicReactorAddingImpl;
import kz.greetgo.kafka.consumer.DynamicReactors;
import kz.greetgo.kafka.consumer.InvokeResult;
import kz.greetgo.kafka.consumer.InvokeSession;
import kz.greetgo.kafka.consumer.InvokeSessionFactory;
import kz.greetgo.kafka.consumer.Profile;
import kz.greetgo.kafka.core.KafkaReactorAbstract;
import kz.greetgo.kafka.core.MockProducerHolder;
import kz.greetgo.kafka.core.PushFilter;
import kz.greetgo.kafka.core.logger.Logger;
import kz.greetgo.kafka.errors.ConsumerInvocationException;
import kz.greetgo.kafka.model.Box;
import kz.greetgo.kafka.model.BoxHolder;
import kz.greetgo.kafka.producer.BoxRecord;
import kz.greetgo.kafka.producer.ProducerSource;
import kz.greetgo.kafka.util.BoxUtil;
import kz.greetgo.kafka.util.ConsumerActivateFilterUtil;
import kz.greetgo.kafka.util.GenericUtil;
import kz.greetgo.kafka.util.Handler;
import kz.greetgo.kafka.util.KeyUtil;
import kz.greetgo.kafka.util.Listener;
import kz.greetgo.kafka.util.PushFilterOnControllerClasses;
import kz.greetgo.strconverter.StrConverter;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;

public class KafkaSimulator
extends KafkaReactorAbstract {
    Supplier<Cluster> clusterSupplier;
    private final AtomicBoolean isDirect = new AtomicBoolean(true);
    private final ConcurrentHashMap<String, MockProducerHolder> producers = new ConcurrentHashMap();
    private final ProducerSource producerSource = new ProducerSource(){

        @Override
        public Logger logger() {
            return KafkaSimulator.this.logger;
        }

        @Override
        public StrConverter getStrConverter() {
            return (StrConverter)KafkaSimulator.this.strConverter.get();
        }

        @Override
        public byte[] extractKey(Object object) {
            return KeyUtil.extractKey(object);
        }

        @Override
        public String author() {
            return KafkaSimulator.this.authorSupplier == null ? null : (String)KafkaSimulator.this.authorSupplier.get();
        }

        @Override
        public Map<String, String> getConfigFor(String producerName, @Nonnull Profile profile) {
            return new HashMap<String, String>();
        }

        @Override
        public Listener addConfigListener(String producerName, @Nonnull Profile profile, Handler handler) {
            return () -> {};
        }

        public <valueType> Producer<byte[], valueType> createProducer(String producerName, @Nonnull Profile profile, ByteArraySerializer keySerializer, Serializer<valueType> valueSerializer) {
            if (KafkaSimulator.this.producers.containsKey(producerName)) {
                throw new RuntimeException("aUjVA0J486 :: Producer with name = " + producerName + " already created. Please select another name");
            }
            MockProducerHolder<valueType> x = new MockProducerHolder<valueType>(producerName, keySerializer, valueSerializer, KafkaSimulator.this.clusterSupplier.get());
            KafkaSimulator.this.producers.put(x.getProducerName(), x);
            return x.getProducer();
        }
    };
    private final List<ConsumerRecord<byte[], Object>> pushedRecords = Collections.synchronizedList(new ArrayList());
    private InitVariants initiated = null;
    private final List<ConsumerDefinitionCustom> customConsumerDefinitionList = new ArrayList<ConsumerDefinitionCustom>();
    private final ConcurrentHashMap<Long, DynamicReactorHolder> dynamicReactorMap = new ConcurrentHashMap();
    private final AtomicLong dynamicReactorIdNext = new AtomicLong(1L);

    public KafkaSimulator(Logger logger) {
        super(logger);
    }

    @Override
    public void close() {
    }

    @Override
    public void join() {
    }

    @Override
    public boolean isDirect() {
        return this.isDirect.get();
    }

    public void setDirect(boolean isDirect) {
        this.isDirect.set(isDirect);
    }

    @Override
    protected ProducerSource getProducerSource() {
        return this.producerSource;
    }

    public void push(Class<?> ... controllerClass) {
        this.push(PushFilterOnControllerClasses.on(controllerClass));
    }

    public void push(PushFilter pushFilter) {
        for (MockProducerHolder producer : this.producers.values()) {
            producer.getProducer().flush();
            List history = producer.getProducer().history();
            producer.getProducer().clear();
            for (ProducerRecord record : history) {
                this.pushRecord((ProducerRecord<byte[], Object>)record, producer, pushFilter);
            }
        }
    }

    private void pushRecord(ProducerRecord<byte[], Object> r, MockProducerHolder producer, PushFilter pushFilter) {
        TopicPartition topicPartition = producer.topicPartition(r);
        ConsumerRecord consumerRecord = new ConsumerRecord(topicPartition.topic(), topicPartition.partition(), 1L, GenericUtil.longNullAsZero(r.timestamp()), TimestampType.CREATE_TIME, 1, 1, (Object)((byte[])r.key()), this.serialization(r.value()), r.headers(), Optional.empty());
        ArrayList<ConsumerDefinitionCustom> consumerDefinitionList = new ArrayList<ConsumerDefinitionCustom>();
        List x2 = this.consumerDefinitionList;
        if (x2 != null) {
            consumerDefinitionList.addAll(x2);
        }
        consumerDefinitionList.addAll(this.customConsumerDefinitionList);
        this.dynamicReactorMap.values().stream().map(x -> x.definition).forEach(consumerDefinitionList::add);
        if (consumerDefinitionList.size() > 0) {
            HashMap<TopicPartition, List<ConsumerRecord>> map = new HashMap<TopicPartition, List<ConsumerRecord>>();
            map.put(topicPartition, Collections.singletonList(consumerRecord));
            ConsumerRecords singleList = new ConsumerRecords(map);
            for (ConsumerDefinition consumerDefinition : consumerDefinitionList) {
                if (!pushFilter.canPush(consumerDefinition)) continue;
                InvokeSessionFactory invokeSessionFactory = consumerDefinition.getInvoker();
                try (InvokeSession invokeSession = invokeSessionFactory.createSession();){
                    InvokeResult invokeResult = invokeSession.invoke((ConsumerRecords<byte[], Object>)singleList, new Profile(null));
                    if (invokeResult.needToCommit()) continue;
                    throw new ConsumerInvocationException("6x292NmZmS :: Cannot invoke consumer " + consumerDefinition.logDisplay() + " of record " + r.value(), invokeResult.invocationError());
                }
            }
        }
        this.pushedRecords.add((ConsumerRecord<byte[], Object>)consumerRecord);
    }

    private Object serialization(Object value) {
        StrConverter strConverter = (StrConverter)this.strConverter.get();
        String str = strConverter.toStr(value);
        return strConverter.fromStr(str);
    }

    public void clearAllProducers() {
        for (MockProducerHolder producer : this.producers.values()) {
            producer.getProducer().clear();
        }
    }

    public void clearPushed() {
        this.pushedRecords.clear();
    }

    public List<ConsumerRecord<byte[], Box>> allPushed() {
        ArrayList<ConsumerRecord<byte[], Box>> ret = new ArrayList<ConsumerRecord<byte[], Box>>();
        for (ConsumerRecord<byte[], Object> r : this.pushedRecords) {
            if (!(r.value() instanceof Box)) continue;
            ret.add(r);
        }
        return ret;
    }

    public <T> List<BoxHolder<T>> pushedOf(Class<T> aClass) {
        return this.allPushed().stream().map(rec -> BoxUtil.hold((Box)rec.value(), aClass)).filter(Optional::isPresent).map(Optional::get).collect(Collectors.toList());
    }

    @Override
    public void startConsumers() {
        if (this.initiated != null) {
            throw new RuntimeException("mXF535wem0 :: Already initiated with " + this.initiated);
        }
        this.initiated = InitVariants.STARTED_CONSUMERS;
        this.accumulateConsumerDefinitionList();
    }

    @Override
    public void activateDirectConsumers() {
        if (this.initiated != null) {
            throw new RuntimeException("z4FRU1kuH6 :: Already initiated with " + this.initiated);
        }
        this.initiated = InitVariants.ACTIVATED_DIRECT_CONSUMERS;
        this.accumulateConsumerDefinitionList();
    }

    @Override
    public void addConsumerDefinitionCustomBuilder(ConsumerDefinitionCustomBuilder consumerDefinitionCustomBuilder) {
        this.customConsumerDefinitionList.add(consumerDefinitionCustomBuilder.logger(new Logger()).topicPrefix(this.topicPrefix).build());
    }

    public void clearCustomConsumerDefinitionList() {
        this.customConsumerDefinitionList.clear();
    }

    @Override
    protected void afterKafkaSent(BoxRecord boxRecord) {
    }

    @Override
    public DynamicReactors dynamicConsumers() {
        return new DynamicReactors(){

            private DynamicReactor add(ConsumerDefinitionCustomBuilder consumerDefinitionCustomBuilder) {
                long id = KafkaSimulator.this.dynamicReactorIdNext.getAndIncrement();
                ConsumerDefinitionCustom definition = consumerDefinitionCustomBuilder.build();
                DynamicReactorHolder holder = new DynamicReactorHolder(id, definition);
                KafkaSimulator.this.dynamicReactorMap.put(id, holder);
                return holder;
            }

            @Override
            public DynamicReactorAdding adding() {
                return new DynamicReactorAddingImpl(this::add);
            }

            @Override
            public Collection<DynamicReactor> reactors() {
                return KafkaSimulator.this.dynamicReactorMap.values().stream().map(DynamicReactor.class::cast).collect(Collectors.toList());
            }

            @Override
            public DynamicReactor stopById(long id) {
                return KafkaSimulator.this.dynamicReactorMap.remove(id);
            }

            @Override
            public Collection<DynamicReactor> stopByFilter(Predicate<DynamicReactor> filter) {
                return KafkaSimulator.this.dynamicReactorMap.values().stream().filter(filter).map(DynamicReactor::id).map(KafkaSimulator.this.dynamicReactorMap::remove).filter(Objects::nonNull).collect(Collectors.toList());
            }
        };
    }

    @Override
    protected ConsumerActivateFilter getConsumerActivateFilter() {
        return ConsumerActivateFilterUtil.ALWAYS_TRUE;
    }

    private static class DynamicReactorHolder
    implements DynamicReactor {
        private final long id;
        private final ConsumerDefinitionCustom definition;

        public DynamicReactorHolder(long id, ConsumerDefinitionCustom definition) {
            this.id = id;
            this.definition = definition;
        }

        @Override
        public long id() {
            return this.id;
        }

        @Override
        public ConsumerDefinition definition() {
            return this.definition;
        }

        @Override
        public boolean isWorking() {
            return true;
        }

        @Override
        public void stop() {
        }

        @Override
        public void join() {
        }

        @Override
        public void refreshTopicList() {
        }
    }

    private static enum InitVariants {
        STARTED_CONSUMERS,
        ACTIVATED_DIRECT_CONSUMERS;

    }
}

