/*
 * Decompiled with CFR 0.152.
 */
package de.otto.synapse.endpoint.sender.kafka;

import de.otto.synapse.endpoint.MessageInterceptorRegistry;
import de.otto.synapse.endpoint.sender.AbstractMessageSenderEndpoint;
import de.otto.synapse.endpoint.sender.kafka.KafkaEncoder;
import de.otto.synapse.message.Message;
import de.otto.synapse.message.TextMessage;
import de.otto.synapse.translator.MessageFormat;
import de.otto.synapse.translator.MessageTranslator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Scheduled;

public class KafkaMessageSender
extends AbstractMessageSenderEndpoint {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaMessageSender.class);
    public static final long UPDATE_PARTITION_DELAY = 10000L;
    private final KafkaTemplate<String, String> kafkaTemplate;
    private final AtomicReference<KafkaEncoder> encoder = new AtomicReference();

    public KafkaMessageSender(String channelName, MessageInterceptorRegistry interceptorRegistry, MessageTranslator<TextMessage> messageTranslator, KafkaTemplate<String, String> kafkaTemplate) {
        super(channelName, interceptorRegistry, messageTranslator);
        this.kafkaTemplate = kafkaTemplate;
    }

    @Scheduled(initialDelay=10000L, fixedDelay=10000L)
    public void updatePartitions() {
        this.encoder.set(this.createEncoder());
    }

    protected CompletableFuture<Void> doSend(@Nonnull TextMessage message) {
        this.encoder.updateAndGet(x -> x != null ? x : this.createEncoder());
        ProducerRecord<String, String> record = this.encoder.get().apply((Message<String>)message);
        return CompletableFuture.allOf(this.kafkaTemplate.send(record).completable());
    }

    protected CompletableFuture<Void> doSendBatch(@Nonnull Stream<TextMessage> messageStream) {
        return CompletableFuture.allOf((CompletableFuture[])messageStream.map(this::doSend).toArray(CompletableFuture[]::new));
    }

    public MessageFormat getMessageFormat() {
        return MessageFormat.V1;
    }

    private KafkaEncoder createEncoder() {
        int numPartitions = this.kafkaTemplate.partitionsFor(this.getChannelName()).size();
        return new KafkaEncoder(this.getChannelName(), numPartitions);
    }
}

