/*
 * Decompiled with CFR 0.152.
 */
package no.nav.common.kafka.consumer.util;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import no.nav.common.kafka.consumer.ConsumeStatus;
import no.nav.common.kafka.consumer.TopicConsumer;
import no.nav.common.kafka.consumer.feilhandtering.StoredConsumerRecord;
import no.nav.common.kafka.consumer.feilhandtering.StoredRecordConsumer;
import no.nav.common.kafka.consumer.util.JsonTopicConsumer;
import no.nav.common.kafka.util.KafkaUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerUtils {
    private static final Logger log = LoggerFactory.getLogger(ConsumerUtils.class);

    public static <K, V> StoredConsumerRecord mapToStoredRecord(ConsumerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
        byte[] key = keySerializer.serialize(record.topic(), record.key());
        byte[] value = valueSerializer.serialize(record.topic(), record.value());
        String headersJson = KafkaUtils.headersToJson(record.headers());
        return new StoredConsumerRecord(record.topic(), record.partition(), record.offset(), key, value, headersJson, record.timestamp());
    }

    public static StoredConsumerRecord mapToStoredRecord(ConsumerRecord<byte[], byte[]> record) {
        String headersJson = KafkaUtils.headersToJson(record.headers());
        return new StoredConsumerRecord(record.topic(), record.partition(), record.offset(), (byte[])record.key(), (byte[])record.value(), headersJson, record.timestamp());
    }

    public static <K, V> ConsumerRecord<K, V> mapFromStoredRecord(StoredConsumerRecord record, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        Object key = keyDeserializer.deserialize(record.getTopic(), record.getKey());
        Object value = valueDeserializer.deserialize(record.getTopic(), record.getValue());
        Headers headers = KafkaUtils.jsonToHeaders(record.getHeadersJson());
        ConsumerRecord consumerRecord = new ConsumerRecord(record.getTopic(), record.getPartition(), record.getOffset(), record.getTimestamp(), TimestampType.CREATE_TIME, -1L, -1, -1, key, value);
        headers.forEach(header -> consumerRecord.headers().add(header));
        return consumerRecord;
    }

    public static <K, V> Map<String, StoredRecordConsumer> toStoredRecordConsumerMap(Map<String, TopicConsumer<K, V>> consumerMap, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        HashMap<String, StoredRecordConsumer> storedRecordConsumerMap = new HashMap<String, StoredRecordConsumer>();
        consumerMap.forEach((topic, topicConsumer) -> storedRecordConsumerMap.put((String)topic, ConsumerUtils.toStoredRecordConsumer(topicConsumer, keyDeserializer, valueDeserializer)));
        return storedRecordConsumerMap;
    }

    public static <K, V> StoredRecordConsumer toStoredRecordConsumer(TopicConsumer<K, V> topicConsumer, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        return storedRecord -> topicConsumer.consume(ConsumerUtils.mapFromStoredRecord(storedRecord, keyDeserializer, valueDeserializer));
    }

    public static <K, V> TopicConsumer<K, V> aggregateConsumer(List<TopicConsumer<K, V>> consumers) {
        return record -> {
            ConsumeStatus aggregatedStatus = ConsumeStatus.OK;
            for (TopicConsumer consumer : consumers) {
                ConsumeStatus status = consumer.consume(record);
                if (status != ConsumeStatus.FAILED) continue;
                aggregatedStatus = ConsumeStatus.FAILED;
            }
            return aggregatedStatus;
        };
    }

    public static <K, V> ConsumeStatus consume(Consumer<ConsumerRecord<K, V>> consumer, ConsumerRecord<K, V> record) {
        consumer.accept(record);
        return ConsumeStatus.OK;
    }

    public static <K, V> ConsumeStatus safeConsume(TopicConsumer<K, V> topicConsumer, ConsumerRecord<K, V> consumerRecord) {
        try {
            ConsumeStatus status = topicConsumer.consume(consumerRecord);
            if (status == null) {
                log.warn("Consumer returned null instead of OK/FAILED, defaulting to FAILED. topic={} partition={} offset={}", new Object[]{consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset()});
                return ConsumeStatus.FAILED;
            }
            return status;
        }
        catch (Exception e) {
            String msg = String.format("Consumer failed to process record from topic=%s partition=%d offset=%d", consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset());
            log.error(msg, (Throwable)e);
            return ConsumeStatus.FAILED;
        }
    }

    public static <K, V, D> JsonTopicConsumer<K, V, D> jsonConsumer(Class<D> dataClass, Function<D, ConsumeStatus> consumer) {
        return new JsonTopicConsumer(dataClass, (k, t) -> (ConsumeStatus)((Object)((Object)consumer.apply(t))));
    }

    public static <K, V, D> JsonTopicConsumer<K, V, D> jsonConsumer(Class<D> dataClass, Consumer<D> consumer) {
        return new JsonTopicConsumer(dataClass, (record, data) -> {
            consumer.accept(data);
            return ConsumeStatus.OK;
        });
    }

    public static <K, V, D> JsonTopicConsumer<K, V, D> jsonConsumer(Class<D> dataClass, BiConsumer<ConsumerRecord<K, V>, D> consumer) {
        return new JsonTopicConsumer(dataClass, (record, data) -> {
            consumer.accept((ConsumerRecord)record, (Object)data);
            return ConsumeStatus.OK;
        });
    }
}

