/*
 * Decompiled with CFR 0.152.
 */
package no.nav.common.kafka.consumer.util;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import no.nav.common.kafka.consumer.ConsumeStatus;
import no.nav.common.kafka.consumer.TopicConsumer;
import no.nav.common.kafka.consumer.feilhandtering.StoredConsumerRecord;
import no.nav.common.kafka.consumer.util.KafkaConsumerClientBuilder;
import no.nav.common.kafka.consumer.util.TopicConsumerConfig;
import no.nav.common.kafka.util.KafkaUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerUtils {
    public static final Serializer<byte[]> BYTE_ARRAY_SERIALIZER = new ByteArraySerializer();
    private static final Logger log = LoggerFactory.getLogger(ConsumerUtils.class);

    public static StoredConsumerRecord mapToStoredRecord(ConsumerRecord<byte[], byte[]> record) {
        return ConsumerUtils.mapToStoredRecord(record, BYTE_ARRAY_SERIALIZER, BYTE_ARRAY_SERIALIZER);
    }

    public static <K, V> StoredConsumerRecord mapToStoredRecord(ConsumerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
        byte[] key = keySerializer.serialize(record.topic(), record.key());
        byte[] value = valueSerializer.serialize(record.topic(), record.value());
        String headersJson = KafkaUtils.headersToJson(record.headers());
        return new StoredConsumerRecord(record.topic(), record.partition(), record.offset(), key, value, headersJson, record.timestamp());
    }

    public static ConsumerRecord<byte[], byte[]> mapFromStoredRecord(StoredConsumerRecord record) {
        Headers headers = KafkaUtils.jsonToHeaders(record.getHeadersJson());
        ConsumerRecord consumerRecord = new ConsumerRecord(record.getTopic(), record.getPartition(), record.getOffset(), record.getTimestamp(), TimestampType.CREATE_TIME, -1L, -1, -1, (Object)record.getKey(), (Object)record.getValue());
        headers.forEach(header -> consumerRecord.headers().add(header));
        return consumerRecord;
    }

    public static <K, V> ConsumerRecord<K, V> deserializeConsumerRecord(ConsumerRecord<byte[], byte[]> record, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        Object key = keyDeserializer.deserialize(record.topic(), (byte[])record.key());
        Object value = valueDeserializer.deserialize(record.topic(), (byte[])record.value());
        return new ConsumerRecord(record.topic(), record.partition(), record.offset(), record.timestamp(), record.timestampType(), -1L, -1, -1, key, value);
    }

    public static Map<String, TopicConsumer<byte[], byte[]>> createTopicConsumers(List<TopicConsumerConfig<?, ?>> topicConsumerConfigs) {
        HashMap<String, TopicConsumer<byte[], byte[]>> consumers = new HashMap<String, TopicConsumer<byte[], byte[]>>();
        topicConsumerConfigs.forEach(config -> consumers.put(config.getTopic(), ConsumerUtils.createTopicConsumer(config)));
        return consumers;
    }

    public static <K, V> TopicConsumer<byte[], byte[]> createTopicConsumer(TopicConsumerConfig<K, V> config) {
        return record -> {
            ConsumerRecord deserializedRecord = ConsumerUtils.deserializeConsumerRecord((ConsumerRecord<byte[], byte[]>)record, config.getKeyDeserializer(), config.getValueDeserializer());
            return config.getConsumer().consume(deserializedRecord);
        };
    }

    public static <K, V> TopicConsumer<K, V> toTopicConsumer(Consumer<ConsumerRecord<K, V>> consumer) {
        return record -> {
            consumer.accept(record);
            return ConsumeStatus.OK;
        };
    }

    public static List<TopicConsumerConfig<?, ?>> findConsumerConfigsWithStoreOnFailure(List<KafkaConsumerClientBuilder.TopicConfig<?, ?>> topicConfigs) {
        return topicConfigs.stream().filter(c -> c.getConsumerRepository() != null).map(KafkaConsumerClientBuilder.TopicConfig::getConsumerConfig).collect(Collectors.toList());
    }

    public static <K, V> TopicConsumer<K, V> aggregateConsumer(List<TopicConsumer<K, V>> consumers) {
        return record -> {
            ConsumeStatus aggregatedStatus = ConsumeStatus.OK;
            for (TopicConsumer consumer : consumers) {
                ConsumeStatus status = consumer.consume(record);
                if (status != ConsumeStatus.FAILED) continue;
                aggregatedStatus = ConsumeStatus.FAILED;
            }
            return aggregatedStatus;
        };
    }

    public static <K, V> ConsumeStatus consume(Consumer<ConsumerRecord<K, V>> consumer, ConsumerRecord<K, V> record) {
        consumer.accept(record);
        return ConsumeStatus.OK;
    }

    public static <K, V> ConsumeStatus safeConsume(TopicConsumer<K, V> topicConsumer, ConsumerRecord<K, V> consumerRecord) {
        try {
            ConsumeStatus status = topicConsumer.consume(consumerRecord);
            if (status == null) {
                log.warn("Consumer returned null instead of OK/FAILED, defaulting to FAILED. topic={} partition={} offset={}", new Object[]{consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset()});
                return ConsumeStatus.FAILED;
            }
            return status;
        }
        catch (Exception e) {
            String msg = String.format("Consumer failed to process record from topic=%s partition=%d offset=%d", consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset());
            log.error(msg, (Throwable)e);
            return ConsumeStatus.FAILED;
        }
    }
}

