/*
 * Decompiled with CFR 0.152.
 */
package org.qubership.atp.itf.lite.backend.configuration;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.UUIDDeserializer;
import org.apache.kafka.common.serialization.UUIDSerializer;
import org.qubership.atp.itf.lite.backend.exceptions.internal.ItfLiteKafkaListenerContainerFactoryException;
import org.qubership.atp.itf.lite.backend.model.api.kafka.GetAccessTokenFinish;
import org.qubership.atp.itf.lite.backend.model.api.kafka.ItfExportRequestEvent;
import org.qubership.atp.itf.lite.backend.model.api.kafka.ItfExportResponseEvent;
import org.qubership.atp.itf.lite.backend.model.api.kafka.ItfLiteExecutionFinishEvent;
import org.qubership.atp.itf.lite.backend.model.api.kafka.MiaExportRequestEvent;
import org.qubership.atp.itf.lite.backend.model.api.kafka.MiaExportResponseEvent;
import org.qubership.atp.itf.lite.backend.model.api.kafka.ProjectEvent;
import org.qubership.atp.itf.lite.backend.service.RequestExportService;
import org.qubership.atp.itf.lite.backend.service.SseEmitterService;
import org.qubership.atp.itf.lite.backend.service.kafka.KafkaExecutionFinishResponseService;
import org.qubership.atp.itf.lite.backend.service.kafka.KafkaExecutionFinishSendingService;
import org.qubership.atp.itf.lite.backend.service.kafka.KafkaExportEventExceptionResponseSendingService;
import org.qubership.atp.itf.lite.backend.service.kafka.KafkaExportEventResponseService;
import org.qubership.atp.itf.lite.backend.service.kafka.KafkaExportEventSendingService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.converter.MessageConverter;
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;

@Configuration
@EnableKafka
public class KafkaConfiguration {
    private static final Logger log = LoggerFactory.getLogger(KafkaConfiguration.class);
    public static final String CATALOG_PROJECT_EVENT_CONTAINER_FACTORY = "catalogProjectEventContainerFactory";
    @Value(value="${kafka.itflite.export.mia.start.topic}")
    public String kafkaItfLiteExportMiaStartTopic;
    @Value(value="${kafka.itflite.export.mia.finish.topic}")
    public String kafkaItfLiteExportMiaFinishTopic;
    @Value(value="${kafka.itflite.export.mia.replicas:3}")
    public int kafkaItfLiteExportMiaReplicas;
    @Value(value="${kafka.itflite.export.mia.partitions:3}")
    public int kafkaItfLiteExportMiaPartitions;
    @Value(value="${kafka.itflite.export.itf.start.topic}")
    public String kafkaItfLiteExportItfStartTopic;
    @Value(value="${kafka.itflite.export.itf.finish.topic}")
    public String kafkaItfLiteExportItfFinishTopic;
    @Value(value="${kafka.itflite.export.itf.replicas:3}")
    public int kafkaItfLiteExportItfReplicas;
    @Value(value="${kafka.itflite.export.itf.partitions:3}")
    public int kafkaItfLiteExportItfPartitions;
    @Value(value="${kafka.itflite.execution.finish.topic}")
    public String kafkaItfLiteExecutionFinishTopic;
    @Value(value="${kafka.itflite.execution.finish.replicas:3}")
    public int kafkaItfLiteExecutionFinishReplicas;
    @Value(value="${kafka.itflite.execution.finish.partitions:3}")
    public int kafkaItfLiteExecutionFinishPartitions;
    @Value(value="${kafka.itflite.getaccesstoken.finish.topic}")
    public String kafkaItfLiteGetAccessTokenFinishTopic;
    @Value(value="${kafka.itflite.getaccesstoken.finish.replicas:3}")
    public int kafkaItfLiteGetAccessTokenFinishReplicas;
    @Value(value="${kafka.itflite.getaccesstoken.finish.partitions:3}")
    public int kafkaItfLiteGetAccessTokenFinishPartitions;
    @Value(value="${spring.kafka.bootstrap-servers}")
    public String bootstrapServers;
    public static final String MIA_EXPORT_KAFKA_TEMPLATE_BEAN_NAME = "miaExportKafkaTemplate";
    public static final String MIA_FINISH_EXPORT_KAFKA_TEMPLATE_BEAN_NAME = "miaFinishExportKafkaTemplate";
    public static final String MIA_EXPORT_KAFKA_CONTAINER_FACTORY_BEAN_NAME = "miaExportContainerFactory";
    public static final String GET_ACCESS_TOKEN_KAFKA_TEMPLATE_BEAN_NAME = "getAccessTokenKafkaTemplate";
    public static final String GET_ACCESS_TOKEN_KAFKA_CONTAINER_FACTORY_BEAN_NAME = "getAccessTokenContainerFactory";
    public static final String ITF_EXPORT_KAFKA_TEMPLATE_BEAN_NAME = "itfExportKafkaTemplate";
    public static final String ITF_FINISH_EXPORT_KAFKA_TEMPLATE_BEAN_NAME = "itfFinishExportKafkaTemplate";
    public static final String ITF_EXPORT_KAFKA_CONTAINER_FACTORY_BEAN_NAME = "itfExportContainerFactory";
    public static final String ITF_LITE_EXECUTION_FINISH_TEMPLATE_BEAN_NAME = "itfLiteExecutionFinishKafkaTemplate";
    public static final String ITF_LITE_EXECUTION_FINISH_CONTAINER_FACTORY_BEAN_NAME = "itfLiteExecutionFinishContainerFactory";
    public static final String ENVIRONMENT_KAFKA_CONTAINER_FACTORY_BEAN_NAME = "environmentContainerFactory";

    @Bean
    public KafkaAdmin admin() {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("bootstrap.servers", this.bootstrapServers);
        configs.put("retries", 3);
        return new KafkaAdmin(configs);
    }

    @Bean(value={"itfLiteExecutionFinishContainerFactory"})
    public ConcurrentKafkaListenerContainerFactory<UUID, String> itfLiteExecutionFinishKafkaListenerContainerFactory() {
        log.debug("Start itf-lite execution finish kafka container factory: {}", (Object)ITF_LITE_EXECUTION_FINISH_CONTAINER_FACTORY_BEAN_NAME);
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(this.itfLiteExecutionFinishConsumerFactory());
        factory.setMessageConverter((MessageConverter)new StringJsonMessageConverter());
        return factory;
    }

    public ConsumerFactory<UUID, String> itfLiteExecutionFinishConsumerFactory() {
        log.debug("itf-lite execution finish consumer factory configuration.");
        Map<String, Object> props = this.consumerFactoryProperties(ItfLiteExecutionFinishEvent.class);
        return new DefaultKafkaConsumerFactory(props);
    }

    @Bean(name={"itfLiteExecutionFinishKafkaTemplate"})
    public KafkaTemplate<UUID, ItfLiteExecutionFinishEvent> itfLiteExecutionFinishKafkaTemplate() {
        log.debug("Create itf-lite execution finish kafkaTemplate bean.");
        this.createOrUpdateTopic(this.kafkaItfLiteExecutionFinishTopic, this.kafkaItfLiteExecutionFinishPartitions, this.kafkaItfLiteExecutionFinishReplicas);
        Map<String, Object> configProps = this.producerConfigProperties();
        DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory(configProps);
        return new KafkaTemplate((ProducerFactory)producerFactory);
    }

    @Bean
    public KafkaExecutionFinishSendingService kafkaExecutionFinishSendingService(KafkaTemplate<UUID, ItfLiteExecutionFinishEvent> itfLiteExecutionFinishKafkaTemplate) {
        return new KafkaExecutionFinishSendingService(this.kafkaItfLiteExecutionFinishTopic, itfLiteExecutionFinishKafkaTemplate);
    }

    @Bean
    public KafkaExecutionFinishResponseService kafkaExecutionFinishResponseService(SseEmitterService sseEmitterService) {
        return new KafkaExecutionFinishResponseService(sseEmitterService);
    }

    @Bean(name={"miaExportKafkaTemplate"})
    public KafkaTemplate<UUID, MiaExportRequestEvent> miaExportKafkaTemplate() {
        this.createOrUpdateTopic(this.kafkaItfLiteExportMiaStartTopic, this.kafkaItfLiteExportMiaPartitions, this.kafkaItfLiteExportMiaReplicas);
        Map<String, Object> configProps = this.producerConfigProperties();
        DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory(configProps);
        return new KafkaTemplate((ProducerFactory)producerFactory);
    }

    @Bean(name={"itfExportKafkaTemplate"})
    public KafkaTemplate<UUID, ItfExportRequestEvent> itfExportKafkaTemplate() {
        this.createOrUpdateTopic(this.kafkaItfLiteExportItfStartTopic, this.kafkaItfLiteExportItfPartitions, this.kafkaItfLiteExportItfReplicas);
        Map<String, Object> configProps = this.producerConfigProperties();
        DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory(configProps);
        return new KafkaTemplate((ProducerFactory)producerFactory);
    }

    @Bean(name={"miaFinishExportKafkaTemplate"})
    public KafkaTemplate<UUID, MiaExportResponseEvent> miaFinishExportKafkaTemplate() {
        this.createOrUpdateTopic(this.kafkaItfLiteExportMiaFinishTopic, this.kafkaItfLiteExportMiaPartitions, this.kafkaItfLiteExportMiaReplicas);
        Map<String, Object> configProps = this.producerConfigProperties();
        DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory(configProps);
        return new KafkaTemplate((ProducerFactory)producerFactory);
    }

    @Bean(name={"itfFinishExportKafkaTemplate"})
    public KafkaTemplate<UUID, ItfExportResponseEvent> itfFinishExportKafkaTemplate() {
        this.createOrUpdateTopic(this.kafkaItfLiteExportItfFinishTopic, this.kafkaItfLiteExportItfPartitions, this.kafkaItfLiteExportItfReplicas);
        Map<String, Object> configProps = this.producerConfigProperties();
        DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory(configProps);
        return new KafkaTemplate((ProducerFactory)producerFactory);
    }

    private Map<String, Object> producerConfigProperties() {
        HashMap<String, Object> configProps = new HashMap<String, Object>();
        configProps.put("bootstrap.servers", this.bootstrapServers);
        configProps.put("key.serializer", UUIDSerializer.class);
        configProps.put("value.serializer", JsonSerializer.class);
        return configProps;
    }

    @Bean
    public KafkaExportEventSendingService kafkaExportEventSendingService(KafkaTemplate<UUID, MiaExportRequestEvent> miaExportKafkaTemplate, KafkaTemplate<UUID, ItfExportRequestEvent> itfExportKafkaTemplate) {
        return new KafkaExportEventSendingService(this.kafkaItfLiteExportMiaStartTopic, miaExportKafkaTemplate, this.kafkaItfLiteExportItfStartTopic, itfExportKafkaTemplate);
    }

    @Bean
    public KafkaExportEventExceptionResponseSendingService kafkaExportEventExceptionResponseSendingService(KafkaTemplate<UUID, MiaExportResponseEvent> miaFinishExportKafkaTemplate, KafkaTemplate<UUID, ItfExportResponseEvent> itfFinishExportKafkaTemplate) {
        return new KafkaExportEventExceptionResponseSendingService(this.kafkaItfLiteExportMiaFinishTopic, miaFinishExportKafkaTemplate, this.kafkaItfLiteExportItfFinishTopic, itfFinishExportKafkaTemplate);
    }

    @Bean
    public KafkaExportEventResponseService kafkaExportEventResponseService(SseEmitterService sseEmitterService, RequestExportService requestExportService) {
        return new KafkaExportEventResponseService(sseEmitterService, requestExportService);
    }

    @Bean(value={"getAccessTokenContainerFactory"})
    public ConcurrentKafkaListenerContainerFactory<UUID, GetAccessTokenFinish> getAccessTokenKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(this.getAccessTokenConsumerFactory());
        factory.setMessageConverter((MessageConverter)new StringJsonMessageConverter());
        factory.setErrorHandler((e, consumerRecord) -> {
            log.error("Error during kafka event processing in {}, consumerRecord: {}", new Object[]{GET_ACCESS_TOKEN_KAFKA_CONTAINER_FACTORY_BEAN_NAME, consumerRecord, e});
            throw new ItfLiteKafkaListenerContainerFactoryException();
        });
        return factory;
    }

    public ConsumerFactory<UUID, GetAccessTokenFinish> getAccessTokenConsumerFactory() {
        Map<String, Object> props = this.consumerFactoryProperties(GetAccessTokenFinish.class);
        return new DefaultKafkaConsumerFactory(props);
    }

    @Bean(name={"getAccessTokenKafkaTemplate"})
    public KafkaTemplate<UUID, GetAccessTokenFinish> getAccessTokenFinishKafkaTemplate() {
        this.createOrUpdateTopic(this.kafkaItfLiteGetAccessTokenFinishTopic, this.kafkaItfLiteGetAccessTokenFinishPartitions, this.kafkaItfLiteGetAccessTokenFinishReplicas);
        Map<String, Object> configProps = this.producerConfigProperties();
        DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory(configProps);
        return new KafkaTemplate((ProducerFactory)producerFactory);
    }

    private void createOrUpdateTopic(String kafkaNotificationTopic, Integer kafkaNotificationPartitions, Integer kafkaNotificationReplicas) {
        log.info("Start createOrUpdateTopic: create or update topic [name={}, partitions={}, replicationFactor={}]", new Object[]{kafkaNotificationTopic, kafkaNotificationPartitions, kafkaNotificationReplicas});
        try (AdminClient client = AdminClient.create((Map)this.admin().getConfigurationProperties());){
            ListTopicsResult ltr = client.listTopics();
            Set existingTopics = (Set)ltr.names().get();
            if (existingTopics.contains(kafkaNotificationTopic)) {
                log.debug("createOrUpdateTopic: update topic [name={}, partitions={}]", (Object)kafkaNotificationTopic, (Object)kafkaNotificationPartitions);
                HashMap<String, NewPartitions> newPartitionSet = new HashMap<String, NewPartitions>();
                newPartitionSet.put(kafkaNotificationTopic, NewPartitions.increaseTo((int)kafkaNotificationPartitions));
                client.createPartitions(newPartitionSet).all().get();
            } else {
                log.debug("createOrUpdateTopic: create new topic [name={}, partitions={}]", (Object)kafkaNotificationTopic, (Object)kafkaNotificationPartitions);
                client.createTopics(Collections.singleton(this.topic(kafkaNotificationTopic, kafkaNotificationPartitions, kafkaNotificationReplicas)), new CreateTopicsOptions().timeoutMs(Integer.valueOf(10000))).all().get();
            }
        }
        catch (Exception ex) {
            log.error("Cannot create topic [name={}, partitions={}, replicas={}]", new Object[]{kafkaNotificationTopic, kafkaNotificationPartitions, kafkaNotificationReplicas});
        }
    }

    private NewTopic topic(String kafkaNotificationTopic, Integer kafkaNotificationPartitions, Integer kafkaNotificationReplicas) {
        return TopicBuilder.name((String)kafkaNotificationTopic).partitions(kafkaNotificationPartitions.intValue()).replicas(kafkaNotificationReplicas.intValue()).build();
    }

    private Map<String, Object> consumerFactoryProperties(Class<?> exportResponseEventClazz) {
        HashMap<String, Object> props = new HashMap<String, Object>();
        props.put("bootstrap.servers", this.bootstrapServers);
        props.put("key.deserializer", ErrorHandlingDeserializer.class);
        props.put("spring.deserializer.key.delegate.class", UUIDDeserializer.class);
        props.put("value.deserializer", ErrorHandlingDeserializer.class);
        props.put("spring.deserializer.value.delegate.class", StringDeserializer.class);
        props.put("spring.json.value.default.type", exportResponseEventClazz);
        props.put("enable.auto.commit", false);
        return props;
    }

    public ConsumerFactory<UUID, String> miaConsumerFactory() {
        Map<String, Object> props = this.consumerFactoryProperties(MiaExportResponseEvent.class);
        return new DefaultKafkaConsumerFactory(props);
    }

    @Bean(value={"catalogProjectEventContainerFactory"})
    public ConcurrentKafkaListenerContainerFactory<UUID, String> catalogProjectEventContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(this.catalogConsumerFactory());
        factory.setMessageConverter((MessageConverter)new StringJsonMessageConverter());
        factory.setErrorHandler((e, consumerRecord) -> {
            log.error("Error during kafka event processing in {}, consumerRecord: {}", new Object[]{CATALOG_PROJECT_EVENT_CONTAINER_FACTORY, consumerRecord, e});
            throw new ItfLiteKafkaListenerContainerFactoryException();
        });
        return factory;
    }

    public ConsumerFactory<UUID, String> catalogConsumerFactory() {
        Map<String, Object> props = this.consumerFactoryProperties(ProjectEvent.class);
        return new DefaultKafkaConsumerFactory(props);
    }

    @Bean(value={"miaExportContainerFactory"})
    public ConcurrentKafkaListenerContainerFactory<UUID, String> miaKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(this.miaConsumerFactory());
        factory.setMessageConverter((MessageConverter)new StringJsonMessageConverter());
        factory.setErrorHandler((e, consumerRecord) -> {
            log.error("Error during kafka event processing in {}, consumerRecord: {}", new Object[]{MIA_EXPORT_KAFKA_CONTAINER_FACTORY_BEAN_NAME, consumerRecord, e});
            throw new ItfLiteKafkaListenerContainerFactoryException();
        });
        return factory;
    }

    public ConsumerFactory<UUID, String> itfConsumerFactory() {
        Map<String, Object> props = this.consumerFactoryProperties(ItfExportResponseEvent.class);
        return new DefaultKafkaConsumerFactory(props);
    }

    @Bean(value={"itfExportContainerFactory"})
    public ConcurrentKafkaListenerContainerFactory<UUID, String> itfKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(this.itfConsumerFactory());
        factory.setMessageConverter((MessageConverter)new StringJsonMessageConverter());
        factory.setErrorHandler((e, consumerRecord) -> {
            log.error("Error during kafka event processing in {}, consumerRecord: {}", new Object[]{ITF_EXPORT_KAFKA_CONTAINER_FACTORY_BEAN_NAME, consumerRecord, e});
            throw new ItfLiteKafkaListenerContainerFactoryException();
        });
        return factory;
    }

    @Bean(value={"environmentContainerFactory"})
    public ConcurrentKafkaListenerContainerFactory<UUID, String> environmentKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(this.environmentConsumerFactory());
        factory.setMessageConverter((MessageConverter)new StringJsonMessageConverter());
        factory.setErrorHandler((e, consumerRecord) -> {
            log.error("Error during kafka event processing in {}, consumerRecord: {}", new Object[]{ENVIRONMENT_KAFKA_CONTAINER_FACTORY_BEAN_NAME, consumerRecord, e});
            throw new ItfLiteKafkaListenerContainerFactoryException();
        });
        return factory;
    }

    public ConsumerFactory<UUID, String> environmentConsumerFactory() {
        Map<String, Object> props = this.consumerFactoryProperties(String.class);
        return new DefaultKafkaConsumerFactory(props);
    }
}

