/*
 * Decompiled with CFR 0.152.
 */
package kz.greetgo.kafka.core;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import kz.greetgo.kafka.consumer.ConsumerDefinition;
import kz.greetgo.kafka.consumer.Invoker;
import kz.greetgo.kafka.core.KafkaReactorAbstract;
import kz.greetgo.kafka.core.MockProducerHolder;
import kz.greetgo.kafka.core.PushFilter;
import kz.greetgo.kafka.core.logger.Logger;
import kz.greetgo.kafka.errors.ConsumerInvocationException;
import kz.greetgo.kafka.model.Box;
import kz.greetgo.kafka.model.BoxHolder;
import kz.greetgo.kafka.producer.ProducerFacadeBridge;
import kz.greetgo.kafka.producer.ProducerSource;
import kz.greetgo.kafka.serializer.BoxSerializer;
import kz.greetgo.kafka.util.BoxUtil;
import kz.greetgo.kafka.util.GenericUtil;
import kz.greetgo.kafka.util.KeyUtil;
import kz.greetgo.kafka.util.PushFilterOnControllerClasses;
import kz.greetgo.strconverter.StrConverter;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.ByteArraySerializer;

public class KafkaSimulator
extends KafkaReactorAbstract {
    private final ConcurrentHashMap<String, MockProducerHolder> producers = new ConcurrentHashMap();
    private final ProducerSource producerSource = new ProducerSource(){

        @Override
        public Logger logger() {
            return KafkaSimulator.this.logger;
        }

        @Override
        public StrConverter getStrConverter() {
            return KafkaSimulator.this.strConverterSupplier().get();
        }

        @Override
        public byte[] extractKey(Object object) {
            return KeyUtil.extractKey(object);
        }

        @Override
        public String author() {
            return KafkaSimulator.this.authorGetter == null ? null : (String)KafkaSimulator.this.authorGetter.get();
        }

        @Override
        public long getProducerConfigUpdateTimestamp(String producerName) {
            return 0L;
        }

        @Override
        public Map<String, Object> getConfigFor(String producerName) {
            return new HashMap<String, Object>();
        }

        @Override
        public Producer<byte[], Box> createProducer(String producerName, ByteArraySerializer keySerializer, BoxSerializer valueSerializer) {
            if (KafkaSimulator.this.producers.containsKey(producerName)) {
                throw new RuntimeException("Producer with name = " + producerName + " already created. Please select another name");
            }
            MockProducerHolder mockProducerHolder = new MockProducerHolder(producerName, keySerializer, valueSerializer, KafkaSimulator.this.getCluster());
            KafkaSimulator.this.producers.put(mockProducerHolder.getProducerName(), mockProducerHolder);
            return mockProducerHolder.getProducer();
        }
    };
    private final List<ConsumerRecord<byte[], Box>> pushedRecords = Collections.synchronizedList(new ArrayList());
    private List<ConsumerDefinition> consumerDefinitionList;

    @Override
    public void stopConsumers() {
    }

    @Override
    protected ProducerSource getProducerSource() {
        return this.producerSource;
    }

    protected Cluster getCluster() {
        return Cluster.empty();
    }

    public void push(Class<?> ... controllerClass) {
        this.push(PushFilterOnControllerClasses.on(controllerClass));
    }

    public void push(PushFilter pushFilter) {
        for (MockProducerHolder producer : this.producers.values()) {
            producer.getProducer().flush();
            List history = producer.getProducer().history();
            producer.getProducer().clear();
            for (ProducerRecord record : history) {
                this.pushRecord((ProducerRecord<byte[], Box>)record, producer, pushFilter);
            }
        }
    }

    private void pushRecord(ProducerRecord<byte[], Box> r, MockProducerHolder producer, PushFilter pushFilter) {
        TopicPartition topicPartition = producer.topicPartition(r);
        ConsumerRecord consumerRecord = new ConsumerRecord(topicPartition.topic(), topicPartition.partition(), 1L, GenericUtil.longNullAsZero(r.timestamp()), TimestampType.CREATE_TIME, Long.valueOf(1L), 1, 1, r.key(), (Object)this.serialization((Box)r.value()), r.headers());
        List<ConsumerDefinition> consumerDefinitionList = this.consumerDefinitionList;
        if (consumerDefinitionList != null) {
            HashMap<TopicPartition, List<ConsumerRecord>> map = new HashMap<TopicPartition, List<ConsumerRecord>>();
            map.put(topicPartition, Collections.singletonList(consumerRecord));
            ConsumerRecords singleList = new ConsumerRecords(map);
            for (ConsumerDefinition consumerDefinition : consumerDefinitionList) {
                if (!pushFilter.canPush(consumerDefinition)) continue;
                Invoker invoker = consumerDefinition.getInvoker();
                Set<String> usingProducerNames = invoker.getUsingProducerNames();
                Invoker.InvokeSession invokeSession = invoker.createSession();
                Throwable throwable = null;
                try {
                    for (String producerName : usingProducerNames) {
                        ProducerFacadeBridge producerFacade = ProducerFacadeBridge.createPermanentBridge(producerName, this.producerSource);
                        invokeSession.putProducer(producerName, producerFacade);
                    }
                    Invoker.InvokeResult invokeResult = invokeSession.invoke((ConsumerRecords<byte[], Box>)singleList);
                    if (invokeResult.needToCommit()) continue;
                    throw new ConsumerInvocationException("6x292NmZmS :: Cannot invoke consumer " + consumerDefinition.logDisplay() + " of record " + r.value(), invokeResult.exceptionInMethod());
                }
                catch (Throwable throwable2) {
                    throwable = throwable2;
                    throw throwable2;
                }
                finally {
                    if (invokeSession == null) continue;
                    if (throwable != null) {
                        try {
                            invokeSession.close();
                        }
                        catch (Throwable throwable3) {
                            throwable.addSuppressed(throwable3);
                        }
                        continue;
                    }
                    invokeSession.close();
                }
            }
        }
        this.pushedRecords.add((ConsumerRecord<byte[], Box>)consumerRecord);
    }

    private Box serialization(Box value) {
        StrConverter strConverter = this.strConverterSupplier().get();
        String str = strConverter.toStr((Object)value);
        return (Box)strConverter.fromStr(str);
    }

    public void clearAllProducers() {
        for (MockProducerHolder producer : this.producers.values()) {
            producer.getProducer().clear();
        }
    }

    public void clearPushed() {
        this.pushedRecords.clear();
    }

    public List<ConsumerRecord<byte[], Box>> allPushed() {
        return Collections.unmodifiableList(new ArrayList<ConsumerRecord<byte[], Box>>(this.pushedRecords));
    }

    public <T> List<BoxHolder<T>> pushedOf(Class<T> aClass) {
        return this.pushedRecords.stream().map(rec -> BoxUtil.hold((Box)rec.value(), aClass)).filter(Optional::isPresent).map(Optional::get).collect(Collectors.toList());
    }

    @Override
    public void startConsumers() {
        this.verifyControllerList();
        this.consumerDefinitionList = this.accumulateConsumerDefinitionList();
    }
}

