/*
 * Decompiled with CFR 0.152.
 */
package org.swisspush.gateleen.kafka;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.kafka.client.producer.KafkaProducer;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaMessageSender {
    private static final Logger log = LoggerFactory.getLogger(KafkaMessageSender.class);
    private MeterRegistry meterRegistry;
    private final Map<String, Counter> successSendCounterMap = new HashMap<String, Counter>();
    private final Map<String, Counter> failSendCounterMap = new HashMap<String, Counter>();
    public static final String SUCCESS_SEND_MESSAGES_METRIC = "gateleen.kafka.send.success.messages";
    public static final String SUCCESS_SEND_MESSAGES_METRIC_DESCRIPTION = "Amount of successfully sent kafka messages";
    public static final String FAIL_SEND_MESSAGES_METRIC = "gateleen.kafka.send.fail.messages";
    public static final String FAIL_SEND_MESSAGES_METRIC_DESCRIPTION = "Amount of failed kafka message sendings";
    public static final String TOPIC = "topic";

    public void setMeterRegistry(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
        this.successSendCounterMap.clear();
        this.failSendCounterMap.clear();
    }

    Future<Void> sendMessages(KafkaProducer<String, String> kafkaProducer, List<KafkaProducerRecord<String, String>> messages) {
        Promise promise = Promise.promise();
        log.debug("Start processing {} messages for kafka", (Object)messages.size());
        List futures = messages.stream().map(message -> this.sendMessage(kafkaProducer, (KafkaProducerRecord<String, String>)message)).collect(Collectors.toList());
        Future.all(futures).mapEmpty().onComplete(result -> {
            if (result.succeeded()) {
                promise.complete();
                log.debug("Batch messages successfully sent to Kafka.");
            } else {
                promise.fail(result.cause());
            }
        });
        return promise.future();
    }

    private Future<Void> sendMessage(KafkaProducer<String, String> kafkaProducer, KafkaProducerRecord<String, String> message) {
        return kafkaProducer.send(message).compose(metadata -> {
            log.debug("Message successfully sent to kafka topic '{}' on partition {} with offset {}. Timestamp: {}", new Object[]{metadata.getTopic(), metadata.getPartition(), metadata.getOffset(), metadata.getTimestamp()});
            this.incrementSuccessCount(metadata.getTopic());
            return Future.succeededFuture();
        }).onFailure(throwable -> {
            log.warn("Failed to send message with key '{}' to kafka. Cause: {}", message.key(), throwable);
            this.incrementFailCount1(message.topic());
        });
    }

    private synchronized void incrementSuccessCount(String topic) {
        Counter counter = this.successSendCounterMap.get(topic);
        if (counter != null) {
            counter.increment();
            return;
        }
        if (this.meterRegistry != null) {
            Counter newCounter = Counter.builder((String)SUCCESS_SEND_MESSAGES_METRIC).description(SUCCESS_SEND_MESSAGES_METRIC_DESCRIPTION).tag(TOPIC, topic).register(this.meterRegistry);
            newCounter.increment();
            this.successSendCounterMap.put(topic, newCounter);
        }
    }

    private synchronized void incrementFailCount1(String topic) {
        Counter counter = this.failSendCounterMap.get(topic);
        if (counter != null) {
            counter.increment();
            return;
        }
        if (this.meterRegistry != null) {
            Counter newCounter = Counter.builder((String)FAIL_SEND_MESSAGES_METRIC).description(FAIL_SEND_MESSAGES_METRIC_DESCRIPTION).tag(TOPIC, topic).register(this.meterRegistry);
            newCounter.increment();
            this.failSendCounterMap.put(topic, newCounter);
        }
    }
}

