/*
 * Decompiled with CFR 0.152.
 */
package org.swisspush.gateleen.kafka;

import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.kafka.client.producer.KafkaProducer;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import java.util.List;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaMessageSender {
    private static final Logger log = LoggerFactory.getLogger(KafkaMessageSender.class);

    Future<Void> sendMessages(KafkaProducer<String, String> kafkaProducer, List<KafkaProducerRecord<String, String>> messages) {
        Promise promise = Promise.promise();
        log.debug("Start processing {} messages for kafka", (Object)messages.size());
        List futures = messages.stream().map(message -> this.sendMessage(kafkaProducer, (KafkaProducerRecord<String, String>)message)).collect(Collectors.toList());
        CompositeFuture.all(futures).mapEmpty().onComplete(result -> {
            if (result.succeeded()) {
                promise.complete();
                log.debug("Batch messages successfully sent to Kafka.");
            } else {
                promise.fail(result.cause());
            }
        });
        return promise.future();
    }

    private Future<Void> sendMessage(KafkaProducer<String, String> kafkaProducer, KafkaProducerRecord<String, String> message) {
        return kafkaProducer.send(message).compose(metadata -> {
            log.debug("Message successfully sent to kafka topic '{}' on partition {} with offset {}. Timestamp: {}", new Object[]{metadata.getTopic(), metadata.getPartition(), metadata.getOffset(), metadata.getTimestamp()});
            return Future.succeededFuture();
        }).onFailure(throwable -> log.warn("Failed to send message with key '{}' to kafka. Cause: {}", message.key(), throwable));
    }
}

