package org.reactivecommons.async.kafka.config;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.reactivecommons.api.domain.DomainEventBus;
import org.reactivecommons.async.commons.DLQDiscardNotifier;
import org.reactivecommons.async.commons.DiscardNotifier;
import org.reactivecommons.async.commons.converters.MessageConverter;
import org.reactivecommons.async.commons.converters.json.DefaultObjectMapperSupplier;
import org.reactivecommons.async.commons.converters.json.ObjectMapperSupplier;
import org.reactivecommons.async.commons.ext.CustomReporter;
import org.reactivecommons.async.commons.ext.DefaultCustomReporter;
import org.reactivecommons.async.kafka.KafkaDomainEventBus;
import org.reactivecommons.async.kafka.communications.ReactiveMessageListener;
import org.reactivecommons.async.kafka.communications.ReactiveMessageSender;
import org.reactivecommons.async.kafka.communications.topology.KafkaCustomizations;
import org.reactivecommons.async.kafka.communications.topology.TopologyCreator;
import org.reactivecommons.async.kafka.config.props.RCAsyncPropsKafka;
import org.reactivecommons.async.kafka.config.props.RCKafkaProps;
import org.reactivecommons.async.kafka.converters.json.KafkaJacksonMessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;

@EnableConfigurationProperties({RCAsyncPropsKafka.class})
@Configuration
/* loaded from: input_file:org/reactivecommons/async/kafka/config/RCKafkaConfig.class */
public class RCKafkaConfig {
    @ConditionalOnMissingBean({DomainEventBus.class})
    @Bean
    public DomainEventBus kafkaDomainEventBus(ReactiveMessageSender reactiveMessageSender) {
        return new KafkaDomainEventBus(reactiveMessageSender);
    }

    @ConditionalOnMissingBean({ReactiveMessageSender.class})
    @Bean
    public ReactiveMessageSender kafkaReactiveMessageSender(KafkaSender<String, byte[]> kafkaSender, MessageConverter messageConverter, TopologyCreator topologyCreator) {
        return new ReactiveMessageSender(kafkaSender, messageConverter, topologyCreator);
    }

    @ConditionalOnMissingBean({KafkaSender.class})
    @Bean
    public KafkaSender<String, byte[]> kafkaSender(RCAsyncPropsKafka rCAsyncPropsKafka, @Value("${spring.application.name}") String str) {
        RCKafkaProps kafkaProps = rCAsyncPropsKafka.getKafkaProps();
        kafkaProps.put("client.id", str);
        kafkaProps.put("key.serializer", StringSerializer.class);
        kafkaProps.put("value.serializer", ByteArraySerializer.class);
        return KafkaSender.create(SenderOptions.create(kafkaProps));
    }

    @ConditionalOnMissingBean({ReactiveMessageListener.class})
    @Bean
    public ReactiveMessageListener kafkaReactiveMessageListener(ReceiverOptions<String, byte[]> receiverOptions) {
        return new ReactiveMessageListener(receiverOptions);
    }

    @ConditionalOnMissingBean({ReceiverOptions.class})
    @Bean
    public ReceiverOptions<String, byte[]> kafkaReceiverOptions(RCAsyncPropsKafka rCAsyncPropsKafka) {
        RCKafkaProps kafkaProps = rCAsyncPropsKafka.getKafkaProps();
        kafkaProps.put("key.deserializer", StringDeserializer.class);
        kafkaProps.put("value.deserializer", ByteArrayDeserializer.class);
        return ReceiverOptions.create(kafkaProps);
    }

    @ConditionalOnMissingBean({TopologyCreator.class})
    @Bean
    public TopologyCreator kafkaTopologyCreator(RCAsyncPropsKafka rCAsyncPropsKafka, KafkaCustomizations kafkaCustomizations) {
        return new TopologyCreator(AdminClient.create(rCAsyncPropsKafka.getKafkaProps()), kafkaCustomizations);
    }

    @ConditionalOnMissingBean({KafkaCustomizations.class})
    @Bean
    public KafkaCustomizations defaultKafkaCustomizations() {
        return new KafkaCustomizations();
    }

    @ConditionalOnMissingBean({MessageConverter.class})
    @Bean
    public MessageConverter kafkaJacksonMessageConverter(ObjectMapperSupplier objectMapperSupplier) {
        return new KafkaJacksonMessageConverter((ObjectMapper) objectMapperSupplier.get());
    }

    @ConditionalOnMissingBean({DiscardNotifier.class})
    @Bean
    public DiscardNotifier kafkaDiscardNotifier(DomainEventBus domainEventBus, MessageConverter messageConverter) {
        return new DLQDiscardNotifier(domainEventBus, messageConverter);
    }

    @ConditionalOnMissingBean({ObjectMapperSupplier.class})
    @Bean
    public ObjectMapperSupplier defaultObjectMapperSupplier() {
        return new DefaultObjectMapperSupplier();
    }

    @ConditionalOnMissingBean({CustomReporter.class})
    @Bean
    public CustomReporter defaultKafkaCustomReporter() {
        return new DefaultCustomReporter();
    }

    public static RCKafkaProps readPropsFromDotEnv(Path path) throws IOException {
        String[] split = Files.readString(path).split("\n");
        RCKafkaProps rCKafkaProps = new RCKafkaProps();
        for (String str : split) {
            if (!str.startsWith("#")) {
                String[] split2 = str.split("=", 2);
                rCKafkaProps.put(split2[0], split2[1]);
            }
        }
        return rCKafkaProps;
    }

    public static String jassConfig(String str, String str2) {
        return String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", str, str2);
    }
}
