/*
 * Decompiled with CFR 0.152.
 */
package ch.squaredesk.nova.comm.kafka;

import ch.squaredesk.nova.comm.kafka.OutgoingMessageMetaData;
import ch.squaredesk.nova.comm.sending.MessageMarshaller;
import ch.squaredesk.nova.comm.sending.MessageSenderImplBase;
import ch.squaredesk.nova.metrics.Metrics;
import io.reactivex.Completable;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaMessageSender<InternalMessageType>
extends MessageSenderImplBase<String, InternalMessageType, String, OutgoingMessageMetaData> {
    private final Producer<String, String> producer;

    protected KafkaMessageSender(String identifier, Properties producerProperties, MessageMarshaller<InternalMessageType, String> messageMarshaller, Metrics metrics) {
        super(identifier, messageMarshaller, metrics);
        this.producer = new KafkaProducer(producerProperties);
    }

    public Completable doSend(InternalMessageType message, OutgoingMessageMetaData sendingInfo) {
        String messageAsText;
        Objects.requireNonNull(message, "message must not be null");
        try {
            messageAsText = (String)this.messageMarshaller.marshal(message);
        }
        catch (Exception e) {
            return Completable.error((Throwable)e);
        }
        ProducerRecord record = new ProducerRecord((String)sendingInfo.destination, (Object)messageAsText);
        return Completable.fromFuture((Future)this.producer.send(record));
    }
}

