/*
 * Decompiled with CFR 0.152.
 */
package kz.greetgo.kafka.core;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import kz.greetgo.kafka.consumer.ConsumerConfigDefaults;
import kz.greetgo.kafka.consumer.ConsumerDefinition;
import kz.greetgo.kafka.consumer.ConsumerReactor;
import kz.greetgo.kafka.consumer.ConsumerReactorImpl;
import kz.greetgo.kafka.core.KafkaReactorAbstract;
import kz.greetgo.kafka.core.logger.Logger;
import kz.greetgo.kafka.core.logger.LoggerType;
import kz.greetgo.kafka.errors.NotDefined;
import kz.greetgo.kafka.model.Box;
import kz.greetgo.kafka.producer.ProducerConfigWorker;
import kz.greetgo.kafka.producer.ProducerSource;
import kz.greetgo.kafka.serializer.BoxSerializer;
import kz.greetgo.kafka.util.ConfigLines;
import kz.greetgo.kafka.util.KeyUtil;
import kz.greetgo.strconverter.StrConverter;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;

public class KafkaReactorImpl
extends KafkaReactorAbstract {
    private final List<ConsumerReactorImpl> consumerReactorList = new ArrayList<ConsumerReactorImpl>();
    public ConsumerConfigDefaults consumerConfigDefaults = ConsumerConfigDefaults.withDefaults();
    private final ProducerConfigWorker producerConfigWorker = new ProducerConfigWorker(() -> this.producerConfigStorage, this::putProducerDefaultValues);
    private final ProducerSource producerSource = new ProducerSource(){

        @Override
        public Logger logger() {
            return KafkaReactorImpl.this.logger;
        }

        @Override
        public StrConverter getStrConverter() {
            return KafkaReactorImpl.this.strConverterSupplier().get();
        }

        @Override
        public byte[] extractKey(Object object) {
            return KeyUtil.extractKey(object);
        }

        @Override
        public String author() {
            return KafkaReactorImpl.this.authorGetter == null ? null : (String)KafkaReactorImpl.this.authorGetter.get();
        }

        @Override
        public long getProducerConfigUpdateTimestamp(String producerName) {
            return KafkaReactorImpl.this.producerConfigWorker.getConfigUpdateTimestamp(producerName);
        }

        @Override
        public Map<String, Object> getConfigFor(String producerName) {
            return KafkaReactorImpl.this.producerConfigWorker.getConfigFor(producerName);
        }

        @Override
        public Producer<byte[], Box> createProducer(String producerName, ByteArraySerializer keySerializer, BoxSerializer valueSerializer) {
            Map<String, Object> configMap = this.getConfigFor(producerName);
            configMap.put("bootstrap.servers", KafkaReactorImpl.this.bootstrapServers.get());
            if (KafkaReactorImpl.this.logger.isShow(LoggerType.SHOW_PRODUCER_CONFIG)) {
                KafkaReactorImpl.this.logger.logProducerConfigOnCreating(producerName, configMap);
            }
            return new KafkaProducer(configMap, (Serializer)keySerializer, (Serializer)valueSerializer);
        }
    };

    @Override
    public void startConsumers() {
        this.verifyControllerList();
        List<ConsumerDefinition> consumerDefinitionList = this.accumulateConsumerDefinitionList();
        if (this.consumerConfigStorage == null) {
            throw new NotDefined("consumerConfigStorage in " + this.getClass().getSimpleName() + ".startConsumers()");
        }
        if (this.bootstrapServers == null) {
            throw new NotDefined("bootstrapServers in " + this.getClass().getSimpleName() + ".startConsumers()");
        }
        for (ConsumerDefinition consumerDefinition : consumerDefinitionList) {
            ConsumerReactorImpl consumerReactor = new ConsumerReactorImpl();
            this.consumerReactorList.add(consumerReactor);
            consumerReactor.logger = this.logger;
            consumerReactor.strConverterSupplier = this.strConverterSupplier();
            consumerReactor.bootstrapServers = this.bootstrapServers;
            consumerReactor.configStorage = this.consumerConfigStorage;
            consumerReactor.consumerDefinition = consumerDefinition;
            consumerReactor.producerSource = this.getProducerSource();
            consumerReactor.hostId = this.hostId;
            consumerReactor.consumerConfigDefaults = () -> this.consumerConfigDefaults;
            consumerReactor.start();
        }
    }

    @Override
    public Optional<ConsumerReactor> consumer(String consumerName) {
        for (ConsumerReactorImpl consumerReactor : this.consumerReactorList) {
            if (!consumerReactor.consumerDefinition.getConsumerName().equals(consumerName)) continue;
            return Optional.of(consumerReactor);
        }
        return Optional.empty();
    }

    @Override
    public void stopConsumers() {
        this.consumerReactorList.forEach(ConsumerReactorImpl::stop);
        this.producerConfigWorker.close();
    }

    public void joinToConsumers() {
        this.consumerReactorList.forEach(ConsumerReactorImpl::join);
    }

    protected void putProducerDefaultValues(ConfigLines configLines) {
        configLines.putValue("prod.acts                    ", "all");
        configLines.putValue("prod.buffer.memory           ", "33554432");
        configLines.putValue("prod.compression.type        ", "none");
        configLines.putValue("prod.batch.size              ", "16384");
        configLines.putValue("prod.connections.max.idle.ms ", "540000");
        configLines.putValue("prod.request.timeout.ms      ", "30000");
        configLines.putValue("prod.linger.ms               ", "1");
        configLines.putValue("prod.batch.size              ", "16384");
        configLines.putValue("prod.retries                               ", "2147483647");
        configLines.putValue("prod.max.in.flight.requests.per.connection ", "1");
        configLines.putValue("prod.delivery.timeout.ms                   ", "35000");
    }

    @Override
    public ProducerSource getProducerSource() {
        return this.producerSource;
    }
}

