/*
 * Decompiled with CFR 0.152.
 */
package no.nav.common.kafka.producer.util;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Future;
import no.nav.common.kafka.producer.KafkaProducerClient;
import no.nav.common.kafka.producer.KafkaProducerClientImpl;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class KafkaProducerClientWithMetrics<K, V>
implements KafkaProducerClient<K, V> {
    public static final String KAFKA_PRODUCER_STATUS_COUNTER = "kafka_producer_status";
    public static final String KAFKA_PRODUCER_CURRENT_OFFSET_GAUGE = "kafka_producer_current_offset";
    private final KafkaProducerClient<K, V> client;
    private final MeterRegistry meterRegistry;
    private final Map<String, Counter> statusCounterMap = new HashMap<String, Counter>();
    private final Map<String, Gauge> currentOffsetGaugeMap = new HashMap<String, Gauge>();
    private final Map<String, Long> currentOffsetMap = new HashMap<String, Long>();

    public KafkaProducerClientWithMetrics(Properties properties, MeterRegistry meterRegistry) {
        this.client = new KafkaProducerClientImpl(properties);
        this.meterRegistry = meterRegistry;
    }

    public KafkaProducerClientWithMetrics(KafkaProducerClient<K, V> client, MeterRegistry meterRegistry) {
        this.client = client;
        this.meterRegistry = meterRegistry;
    }

    @Override
    public void close() {
        this.client.close();
    }

    @Override
    public RecordMetadata sendSync(ProducerRecord<K, V> record) {
        try {
            RecordMetadata recordMetadata = this.client.sendSync(record);
            this.updateLatestOffset(recordMetadata);
            this.incrementRecordCount(record, false);
            return recordMetadata;
        }
        catch (Exception e) {
            this.incrementRecordCount(record, true);
            throw e;
        }
    }

    @Override
    public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
        return this.send(record, null);
    }

    @Override
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        Callback metricCallback = (metadata, exception) -> {
            boolean failed = exception != null;
            this.incrementRecordCount(record, failed);
            if (metadata != null) {
                this.updateLatestOffset(metadata);
            }
            if (callback != null) {
                callback.onCompletion(metadata, exception);
            }
        };
        return this.client.send(record, metricCallback);
    }

    @Override
    public Producer<K, V> getProducer() {
        return this.client.getProducer();
    }

    private void updateLatestOffset(RecordMetadata metadata) {
        String key = metadata.topic() + "-" + metadata.partition();
        long currentOffset = metadata.hasOffset() ? metadata.offset() : 0L;
        this.currentOffsetMap.put(key, currentOffset);
        this.currentOffsetGaugeMap.computeIfAbsent(key, k -> Gauge.builder((String)KAFKA_PRODUCER_CURRENT_OFFSET_GAUGE, () -> {
            Long offset = this.currentOffsetMap.get(key);
            return offset != null ? offset : 0L;
        }).tag("topic", metadata.topic()).tag("partition", String.valueOf(metadata.partition())).register(this.meterRegistry));
    }

    private void incrementRecordCount(ProducerRecord<K, V> record, boolean failed) {
        String key = record.topic() + "-" + failed;
        this.statusCounterMap.computeIfAbsent(key, k -> Counter.builder((String)KAFKA_PRODUCER_STATUS_COUNTER).tag("topic", record.topic()).tag("status", failed ? "failed" : "ok").register(this.meterRegistry)).increment();
    }
}

