/*
 * Decompiled with CFR 0.152.
 */
package no.nav.common.kafka.consumer.util;

import io.micrometer.core.instrument.MeterRegistry;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.function.Consumer;
import lombok.NonNull;
import no.nav.common.kafka.consumer.ConsumeStatus;
import no.nav.common.kafka.consumer.KafkaConsumerClient;
import no.nav.common.kafka.consumer.KafkaConsumerClientConfig;
import no.nav.common.kafka.consumer.KafkaConsumerClientImpl;
import no.nav.common.kafka.consumer.TopicConsumer;
import no.nav.common.kafka.consumer.feilhandtering.KafkaConsumerRepository;
import no.nav.common.kafka.consumer.feilhandtering.StoreOnFailureTopicConsumer;
import no.nav.common.kafka.consumer.util.ConsumerUtils;
import no.nav.common.kafka.consumer.util.TopicConsumerConfig;
import no.nav.common.kafka.consumer.util.TopicConsumerListener;
import no.nav.common.kafka.consumer.util.TopicConsumerLogger;
import no.nav.common.kafka.consumer.util.TopicConsumerMetrics;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaConsumerClientBuilder {
    private static final Logger log = LoggerFactory.getLogger(KafkaConsumerClientBuilder.class);
    private final List<TopicConfig<?, ?>> consumerTopicConfigs = new ArrayList();
    private Properties properties;
    private long pollDurationMs = -1L;

    private KafkaConsumerClientBuilder() {
    }

    public static KafkaConsumerClientBuilder builder() {
        return new KafkaConsumerClientBuilder();
    }

    public KafkaConsumerClientBuilder withProperties(@NonNull Properties properties) {
        if (properties == null) {
            throw new NullPointerException("properties is marked non-null but is null");
        }
        this.properties = (Properties)properties.clone();
        return this;
    }

    public KafkaConsumerClientBuilder withTopicConfig(TopicConfig<?, ?> topicConfig) {
        this.consumerTopicConfigs.add(topicConfig);
        return this;
    }

    public KafkaConsumerClientBuilder withTopicConfigs(List<TopicConfig<?, ?>> topicConfigs) {
        this.consumerTopicConfigs.addAll(topicConfigs);
        return this;
    }

    public KafkaConsumerClientBuilder withPollDuration(long pollDurationMs) {
        this.pollDurationMs = pollDurationMs;
        return this;
    }

    public KafkaConsumerClient build() {
        if (this.properties == null) {
            throw new IllegalStateException("Cannot build kafka consumer without properties");
        }
        HashMap consumers = new HashMap();
        this.consumerTopicConfigs.forEach(consumerTopicConfig -> {
            KafkaConsumerClientBuilder.validateConfig(consumerTopicConfig);
            consumers.put(consumerTopicConfig.getConsumerConfig().getTopic(), KafkaConsumerClientBuilder.createTopicConsumer(consumerTopicConfig));
        });
        KafkaConsumerClientConfig config = new KafkaConsumerClientConfig(this.properties, consumers);
        if (this.pollDurationMs >= 0L) {
            config.setPollDurationMs(this.pollDurationMs);
        }
        return new KafkaConsumerClientImpl(config);
    }

    private static void validateConfig(TopicConfig<?, ?> consumerTopicConfig) {
        if (consumerTopicConfig.consumerConfig == null) {
            throw new IllegalStateException("Config is missing");
        }
        if (consumerTopicConfig.consumerConfig.topic == null) {
            throw new IllegalStateException("Topic is missing");
        }
        if (consumerTopicConfig.consumerConfig.keyDeserializer == null) {
            throw new IllegalStateException("Key deserializer is missing");
        }
        if (consumerTopicConfig.consumerConfig.valueDeserializer == null) {
            throw new IllegalStateException("Value deserializer is missing");
        }
        if (consumerTopicConfig.consumerConfig.consumer == null) {
            throw new IllegalStateException("Topic consumer is missing");
        }
    }

    public static <K, V> TopicConsumer<byte[], byte[]> createTopicConsumer(TopicConfig<K, V> consumerTopicConfig) {
        List listeners = consumerTopicConfig.getListeners();
        TopicConsumerConfig config = consumerTopicConfig.getConsumerConfig();
        KafkaConsumerRepository consumerRepository = consumerTopicConfig.getConsumerRepository();
        TopicConsumer<byte[], byte[]> topicConsumer = record -> {
            ConsumeStatus status = ConsumerUtils.safeConsume(ConsumerUtils.createTopicConsumer(config), record);
            if (!listeners.isEmpty()) {
                ConsumerRecord deserializedRecord = ConsumerUtils.deserializeConsumerRecord((ConsumerRecord<byte[], byte[]>)record, config.getKeyDeserializer(), config.getValueDeserializer());
                listeners.forEach(listener -> {
                    try {
                        listener.onConsumed(deserializedRecord, status);
                    }
                    catch (Exception e) {
                        log.error("Caught exception from consumer listener", (Throwable)e);
                    }
                });
            }
            return status;
        };
        if (consumerRepository != null) {
            return new StoreOnFailureTopicConsumer(topicConsumer, consumerRepository);
        }
        return topicConsumer;
    }

    public static class TopicConfig<K, V> {
        private final List<TopicConsumerListener<K, V>> listeners = new ArrayList<TopicConsumerListener<K, V>>();
        private TopicConsumerConfig<K, V> consumerConfig;
        private KafkaConsumerRepository consumerRepository;

        public TopicConfig<K, V> withConsumerConfig(TopicConsumerConfig<K, V> consumerConfig) {
            this.consumerConfig = consumerConfig;
            return this;
        }

        public TopicConfig<K, V> withConsumerConfig(String topic, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, TopicConsumer<K, V> consumer) {
            this.consumerConfig = new TopicConsumerConfig<K, V>(topic, keyDeserializer, valueDeserializer, consumer);
            return this;
        }

        public TopicConfig<K, V> withConsumerConfig(String topic, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, Consumer<ConsumerRecord<K, V>> consumer) {
            this.consumerConfig = new TopicConsumerConfig<K, V>(topic, keyDeserializer, valueDeserializer, ConsumerUtils.toTopicConsumer(consumer));
            return this;
        }

        public TopicConfig<K, V> withLogging() {
            this.listeners.add(new TopicConsumerLogger());
            return this;
        }

        public TopicConfig<K, V> withMetrics(MeterRegistry meterRegistry) {
            this.listeners.add(new TopicConsumerMetrics(meterRegistry));
            return this;
        }

        public TopicConfig<K, V> withStoreOnFailure(KafkaConsumerRepository consumerRepository) {
            this.consumerRepository = consumerRepository;
            return this;
        }

        public TopicConfig<K, V> withListener(TopicConsumerListener<K, V> consumerListener) {
            this.listeners.add(consumerListener);
            return this;
        }

        public TopicConfig<K, V> withListeners(List<TopicConsumerListener<K, V>> consumerListeners) {
            this.listeners.addAll(consumerListeners);
            return this;
        }

        public List<TopicConsumerListener<K, V>> getListeners() {
            return this.listeners;
        }

        public TopicConsumerConfig<K, V> getConsumerConfig() {
            return this.consumerConfig;
        }

        public KafkaConsumerRepository getConsumerRepository() {
            return this.consumerRepository;
        }
    }
}

