/*
 * Decompiled with CFR 0.152.
 */
package ch.admin.bit.jeap.processcontext.adapter.kafka.message.consumer;

import ch.admin.bit.jeap.messaging.avro.AvroMessage;
import ch.admin.bit.jeap.messaging.avro.AvroMessageKey;
import ch.admin.bit.jeap.messaging.kafka.properties.KafkaProperties;
import ch.admin.bit.jeap.messaging.kafka.spring.JeapKafkaBeanNames;
import ch.admin.bit.jeap.processcontext.adapter.kafka.message.consumer.KafkaMessageListener;
import ch.admin.bit.jeap.processcontext.adapter.kafka.message.filter.MessageFilterConfigurationException;
import ch.admin.bit.jeap.processcontext.adapter.kafka.message.filter.MessageFiltersConfiguration;
import ch.admin.bit.jeap.processcontext.domain.message.MessageReceiver;
import ch.admin.bit.jeap.processcontext.domain.port.MessageConsumerFactory;
import ch.admin.bit.jeap.processcontext.plugin.api.message.MessageFilter;
import jakarta.annotation.PreDestroy;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

@Component
public class KafkaMessageConsumerFactory
implements MessageConsumerFactory {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(KafkaMessageConsumerFactory.class);
    private final String appName;
    private final KafkaProperties kafkaProperties;
    private final BeanFactory beanFactory;
    private final JeapKafkaBeanNames jeapKafkaBeanNames;
    private final List<ConcurrentMessageListenerContainer<?, ?>> containers = new CopyOnWriteArrayList();
    private final MessageFiltersConfiguration messageFiltersConfiguration;

    public KafkaMessageConsumerFactory(@Value(value="${spring.application.name}") String appName, KafkaProperties kafkaProperties, BeanFactory beanFactory, MessageFiltersConfiguration messageFiltersConfiguration) {
        this.appName = appName;
        this.kafkaProperties = kafkaProperties;
        this.beanFactory = beanFactory;
        this.jeapKafkaBeanNames = new JeapKafkaBeanNames(kafkaProperties.getDefaultClusterName());
        this.messageFiltersConfiguration = messageFiltersConfiguration;
    }

    public void startConsumer(String topicName, String messageName, String clusterName, MessageReceiver messageReceiver) {
        if (!StringUtils.hasText((String)clusterName)) {
            clusterName = this.kafkaProperties.getDefaultClusterName();
        }
        log.info("Starting domain event listener for event '{}' on topic '{}' on cluster '{}'", new Object[]{messageName, topicName, clusterName});
        KafkaMessageListener listener = new KafkaMessageListener(messageName, messageReceiver, this.getMessageFilterInstanceForMessageName(messageName, this.messageFiltersConfiguration));
        this.startConsumer(topicName, messageName, clusterName, listener);
    }

    private MessageFilter<AvroMessage> getMessageFilterInstanceForMessageName(String messageName, MessageFiltersConfiguration messageFiltersConfiguration) {
        String className = messageFiltersConfiguration.getFilters().get(messageName);
        if (StringUtils.hasText((String)className)) {
            log.info("Found message filter for message type '{}': {}", (Object)messageName, (Object)className);
            return this.newMessageFilterInstance(className);
        }
        return null;
    }

    private <T extends MessageFilter<?>> T newMessageFilterInstance(String className) {
        try {
            Class<?> messageFilterClass = Class.forName(className);
            return (T)((MessageFilter)messageFilterClass.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]));
        }
        catch (ClassCastException | ReflectiveOperationException e) {
            throw new MessageFilterConfigurationException(className, e);
        }
    }

    private ConcurrentMessageListenerContainer<AvroMessageKey, AvroMessage> startConsumer(String topicName, String eventName, String clusterName, AcknowledgingMessageListener<AvroMessageKey, AvroMessage> messageListener) {
        ConcurrentMessageListenerContainer container = (ConcurrentMessageListenerContainer)this.getKafkaListenerContainerFactory(clusterName).createContainer(new String[]{topicName});
        container.getContainerProperties().setGroupId(this.appName + "_" + topicName + "_" + eventName);
        container.setupMessageListener(messageListener);
        container.start();
        this.containers.add(container);
        return container;
    }

    private ConcurrentKafkaListenerContainerFactory<AvroMessageKey, AvroMessage> getKafkaListenerContainerFactory(String clusterName) {
        try {
            return (ConcurrentKafkaListenerContainerFactory)this.beanFactory.getBean(this.jeapKafkaBeanNames.getListenerContainerFactoryBeanName(clusterName));
        }
        catch (NoSuchBeanDefinitionException exception) {
            log.error("No kafkaListenerContainerFactory found for cluster with name '{}'", (Object)clusterName);
            throw new IllegalStateException("No kafkaListenerContainerFactory found for cluster with name " + clusterName);
        }
    }

    @PreDestroy
    public void stop() {
        log.info("Stopping all domain event listener containers...");
        this.containers.forEach(concurrentMessageListenerContainer -> concurrentMessageListenerContainer.stop(true));
    }
}

