package io.floodplain.hapi.cdc.publish.kafka;

import io.floodplain.hapi.cdc.publish.Message;
import io.floodplain.hapi.cdc.publish.MessagePublisher;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.PostConstruct;
import kotlin.Metadata;
import kotlin.collections.CollectionsKt;
import kotlin.jvm.internal.Intrinsics;
import org.apache.kafka.clients.consumer.Consumer;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.core.env.Environment;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;

/* compiled from: KafkaPublisher.kt */
@ConditionalOnProperty(value = {"floodplain.kafka.enabled"}, havingValue = "true", matchIfMissing = false)
@Metadata(mv = {1, 5, 1}, k = 1, xi = 48, d1 = {"��`\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0010\u000e\n\u0002\u0010\u0012\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0006\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010!\n\u0002\b\u0005\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\u0002\n\u0002\b\u0004\n\u0002\u0010\t\n\u0002\b\u0005\n\u0002\u0010 \n\u0002\u0018\u0002\n\u0002\b\u0002\b\u0017\u0018��2\u00020\u0001B-\u0012\u0012\u0010\u0002\u001a\u000e\u0012\u0004\u0012\u00020\u0004\u0012\u0004\u0012\u00020\u00050\u0003\u0012\u0012\u0010\u0006\u001a\u000e\u0012\u0004\u0012\u00020\u0004\u0012\u0004\u0012\u00020\u00050\u0007¢\u0006\u0002\u0010\bJ\u0010\u0010\u001d\u001a\u00020\u001e2\u0006\u0010\u001f\u001a\u00020\u0004H\u0002J\u0018\u0010 \u001a\u00020\u001e2\u0006\u0010\u001f\u001a\u00020\u00042\u0006\u0010!\u001a\u00020\u0004H\u0016J\b\u0010\"\u001a\u00020#H\u0016J\b\u0010$\u001a\u00020\u001eH\u0017J \u0010%\u001a\u00020\u001e2\u0006\u0010\u001f\u001a\u00020\u00042\u0006\u0010!\u001a\u00020\u00042\u0006\u0010&\u001a\u00020\u0005H\u0016J\u0016\u0010'\u001a\u00020\u001e2\f\u0010(\u001a\b\u0012\u0004\u0012\u00020*0)H\u0016J\b\u0010+\u001a\u00020#H\u0016R0\u0010\t\u001a\u001e\u0012\f\u0012\n \u000b*\u0004\u0018\u00010\u00040\u0004\u0012\f\u0012\n \u000b*\u0004\u0018\u00010\u00050\u00050\nX\u0096\u0004¢\u0006\b\n��\u001a\u0004\b\f\u0010\rR \u0010\u0006\u001a\u000e\u0012\u0004\u0012\u00020\u0004\u0012\u0004\u0012\u00020\u00050\u0007X\u0096\u0004¢\u0006\b\n��\u001a\u0004\b\u000e\u0010\u000fR\u000e\u0010\u0010\u001a\u00020\u0011X\u0082\u0004¢\u0006\u0002\n��R\u0012\u0010\u0012\u001a\u0004\u0018\u00010\u00138\u0002X\u0083\u0004¢\u0006\u0002\n��R\u001a\u0010\u0014\u001a\b\u0012\u0004\u0012\u00020\u00040\u0015X\u0096\u0004¢\u0006\b\n��\u001a\u0004\b\u0016\u0010\u0017R \u0010\u0002\u001a\u000e\u0012\u0004\u0012\u00020\u0004\u0012\u0004\u0012\u00020\u00050\u0003X\u0096\u0004¢\u0006\b\n��\u001a\u0004\b\u0018\u0010\u0019R\u000e\u0010\u001a\u001a\u00020\u001bX\u0082\u000e¢\u0006\u0002\n��R\u000e\u0010\u001c\u001a\u00020\u0011X\u0082\u0004¢\u0006\u0002\n��¨\u0006,"}, d2 = {"Lio/floodplain/hapi/cdc/publish/kafka/KafkaPublisher;", "Lio/floodplain/hapi/cdc/publish/MessagePublisher;", "kafkaTemplate", "Lorg/springframework/kafka/core/KafkaTemplate;", "", "", "consumerFactory", "Lorg/springframework/kafka/core/ConsumerFactory;", "(Lorg/springframework/kafka/core/KafkaTemplate;Lorg/springframework/kafka/core/ConsumerFactory;)V", "consumer", "Lorg/apache/kafka/clients/consumer/Consumer;", "kotlin.jvm.PlatformType", "getConsumer", "()Lorg/apache/kafka/clients/consumer/Consumer;", "getConsumerFactory", "()Lorg/springframework/kafka/core/ConsumerFactory;", "deleteCounter", "Ljava/util/concurrent/atomic/AtomicLong;", "env", "Lorg/springframework/core/env/Environment;", "existingTopics", "", "getExistingTopics", "()Ljava/util/List;", "getKafkaTemplate", "()Lorg/springframework/kafka/core/KafkaTemplate;", "logger", "Lorg/slf4j/Logger;", "updateCounter", "createIfMissing", "", "topic", "delete", "key", "deleteCount", "", "initialize", "publish", "payload", "publishAll", "messages", "", "Lio/floodplain/hapi/cdc/publish/Message;", "updateCount", "hapi-change-capture"})
@Service
/* loaded from: input_file:io/floodplain/hapi/cdc/publish/kafka/KafkaPublisher.class */
public class KafkaPublisher implements MessagePublisher {

    @NotNull
    private final KafkaTemplate<String, byte[]> kafkaTemplate;

    @NotNull
    private final ConsumerFactory<String, byte[]> consumerFactory;

    @NotNull
    private Logger logger;

    @NotNull
    private final AtomicLong updateCounter;

    @NotNull
    private final AtomicLong deleteCounter;

    @Autowired
    @Nullable
    private final Environment env;

    @NotNull
    private final Consumer<String, byte[]> consumer;

    @NotNull
    private final List<String> existingTopics;

    public KafkaPublisher(@NotNull KafkaTemplate<String, byte[]> kafkaTemplate, @NotNull ConsumerFactory<String, byte[]> consumerFactory) {
        Intrinsics.checkNotNullParameter(kafkaTemplate, "kafkaTemplate");
        Intrinsics.checkNotNullParameter(consumerFactory, "consumerFactory");
        this.kafkaTemplate = kafkaTemplate;
        this.consumerFactory = consumerFactory;
        Logger logger = LoggerFactory.getLogger(KafkaPublisher.class);
        Intrinsics.checkNotNullExpressionValue(logger, "getLogger(KafkaPublisher::class.java)");
        this.logger = logger;
        this.updateCounter = new AtomicLong(0L);
        this.deleteCounter = new AtomicLong(0L);
        Consumer<String, byte[]> createConsumer = getConsumerFactory().createConsumer();
        Intrinsics.checkNotNullExpressionValue(createConsumer, "consumerFactory.createConsumer()");
        this.consumer = createConsumer;
        this.existingTopics = new ArrayList();
    }

    @NotNull
    public KafkaTemplate<String, byte[]> getKafkaTemplate() {
        return this.kafkaTemplate;
    }

    @NotNull
    public ConsumerFactory<String, byte[]> getConsumerFactory() {
        return this.consumerFactory;
    }

    @NotNull
    public Consumer<String, byte[]> getConsumer() {
        return this.consumer;
    }

    @NotNull
    public List<String> getExistingTopics() {
        return this.existingTopics;
    }

    @PostConstruct
    public void initialize() {
        getExistingTopics().addAll(getConsumer().listTopics().keySet());
    }

    private final void createIfMissing(String str) {
        if (getExistingTopics().contains(str)) {
            return;
        }
        this.logger.info("Topic " + str + " is missing, attempting to create");
        TopicBuilder.name(str).compact().partitions(1).build();
        this.logger.info("Topic " + str + " created successfully");
        getExistingTopics().add(str);
    }

    @Override // io.floodplain.hapi.cdc.publish.MessagePublisher
    public long updateCount() {
        return this.updateCounter.get();
    }

    @Override // io.floodplain.hapi.cdc.publish.MessagePublisher
    public long deleteCount() {
        return this.deleteCounter.incrementAndGet();
    }

    @Override // io.floodplain.hapi.cdc.publish.MessagePublisher
    public void publish(@NotNull String str, @NotNull String str2, @NotNull byte[] bArr) {
        Intrinsics.checkNotNullParameter(str, "topic");
        Intrinsics.checkNotNullParameter(str2, "key");
        Intrinsics.checkNotNullParameter(bArr, "payload");
        createIfMissing(str);
        this.updateCounter.incrementAndGet();
        this.logger.debug("Publishing to topic " + str + " and key: " + str2 + " size: " + bArr.length);
        getKafkaTemplate().send(str, str2, bArr).completable().get();
        this.logger.debug("done");
    }

    @Override // io.floodplain.hapi.cdc.publish.MessagePublisher
    public void publishAll(@NotNull List<Message> list) {
        Intrinsics.checkNotNullParameter(list, "messages");
        this.updateCounter.addAndGet(list.size());
        List<Message> list2 = list;
        ArrayList arrayList = new ArrayList(CollectionsKt.collectionSizeOrDefault(list2, 10));
        for (Message message : list2) {
            createIfMissing(message.getTopic());
            arrayList.add(getKafkaTemplate().send(message.getTopic(), message.getKey(), message.getBody()));
        }
        ListenableFuture listenableFuture = (ListenableFuture) CollectionsKt.lastOrNull(arrayList);
        if (listenableFuture != null) {
        }
        this.logger.debug("successfully sent #" + list.size() + " messages");
    }

    @Override // io.floodplain.hapi.cdc.publish.MessagePublisher
    public void delete(@NotNull String str, @NotNull String str2) {
        Intrinsics.checkNotNullParameter(str, "topic");
        Intrinsics.checkNotNullParameter(str2, "key");
        createIfMissing(str);
        this.deleteCounter.incrementAndGet();
        this.logger.debug("Deleting from topic " + str + " and key: " + str2 + " " + getKafkaTemplate());
        getKafkaTemplate().send(str, str2, (Object) null).completable().get();
        this.logger.debug("done");
    }
}
