package org.reactivecommons.async.kafka;

import io.micrometer.core.instrument.MeterRegistry;
import java.beans.ConstructorProperties;
import lombok.Generated;
import org.apache.kafka.clients.admin.AdminClient;
import org.reactivecommons.async.commons.DiscardNotifier;
import org.reactivecommons.async.commons.ext.CustomReporter;
import org.reactivecommons.async.commons.reply.ReactiveReplyRouter;
import org.reactivecommons.async.kafka.communications.ReactiveMessageSender;
import org.reactivecommons.async.kafka.communications.topology.KafkaCustomizations;
import org.reactivecommons.async.kafka.communications.topology.TopologyCreator;
import org.reactivecommons.async.kafka.config.props.AsyncKafkaProps;
import org.reactivecommons.async.kafka.converters.json.KafkaJacksonMessageConverter;
import org.reactivecommons.async.kafka.health.KafkaReactiveHealthIndicator;
import org.reactivecommons.async.starter.broker.BrokerProvider;
import org.reactivecommons.async.starter.broker.BrokerProviderFactory;
import org.reactivecommons.async.starter.broker.DiscardProvider;
import org.springframework.boot.ssl.SslBundles;
import org.springframework.stereotype.Service;

@Service("kafka")
/* loaded from: input_file:org/reactivecommons/async/kafka/KafkaBrokerProviderFactory.class */
public class KafkaBrokerProviderFactory implements BrokerProviderFactory<AsyncKafkaProps> {
    private final ReactiveReplyRouter router;
    private final KafkaJacksonMessageConverter converter;
    private final MeterRegistry meterRegistry;
    private final CustomReporter errorReporter;
    private final KafkaCustomizations customizations;
    private final SslBundles sslBundles;

    public String getBrokerType() {
        return "kafka";
    }

    public DiscardProvider getDiscardProvider(AsyncKafkaProps asyncKafkaProps) {
        return new KafkaDiscardProvider(asyncKafkaProps, this.converter, this.customizations, this.sslBundles);
    }

    public BrokerProvider<AsyncKafkaProps> getProvider(String str, AsyncKafkaProps asyncKafkaProps, DiscardProvider discardProvider) {
        TopologyCreator createTopologyCreator = KafkaSetupUtils.createTopologyCreator(asyncKafkaProps, this.customizations, this.sslBundles);
        ReactiveMessageSender createMessageSender = KafkaSetupUtils.createMessageSender(asyncKafkaProps, this.converter, createTopologyCreator, this.sslBundles);
        return new KafkaBrokerProvider(str, asyncKafkaProps, this.router, this.converter, this.meterRegistry, this.errorReporter, new KafkaReactiveHealthIndicator(str, AdminClient.create(asyncKafkaProps.m2getConnectionProperties().buildAdminProperties(this.sslBundles))), KafkaSetupUtils.createMessageListener(asyncKafkaProps, this.sslBundles), createMessageSender, asyncKafkaProps.isUseDiscardNotifierPerDomain() ? KafkaSetupUtils.createDiscardNotifier(createMessageSender, this.converter) : (DiscardNotifier) discardProvider.get(), createTopologyCreator, this.customizations, this.sslBundles);
    }

    @Generated
    @ConstructorProperties({"router", "converter", "meterRegistry", "errorReporter", "customizations", "sslBundles"})
    public KafkaBrokerProviderFactory(ReactiveReplyRouter reactiveReplyRouter, KafkaJacksonMessageConverter kafkaJacksonMessageConverter, MeterRegistry meterRegistry, CustomReporter customReporter, KafkaCustomizations kafkaCustomizations, SslBundles sslBundles) {
        this.router = reactiveReplyRouter;
        this.converter = kafkaJacksonMessageConverter;
        this.meterRegistry = meterRegistry;
        this.errorReporter = customReporter;
        this.customizations = kafkaCustomizations;
        this.sslBundles = sslBundles;
    }
}
