/*
 * Decompiled with CFR 0.152.
 */
package ch.admin.bit.jeap.processarchive.kafka;

import ch.admin.bit.jeap.domainevent.avro.AvroDomainEvent;
import ch.admin.bit.jeap.messaging.avro.AvroMessage;
import ch.admin.bit.jeap.messaging.avro.AvroMessageKey;
import ch.admin.bit.jeap.messaging.avro.AvroMessageType;
import ch.admin.bit.jeap.messaging.contract.v2.Contract;
import ch.admin.bit.jeap.messaging.kafka.contract.ContractsProvider;
import ch.admin.bit.jeap.messaging.kafka.contract.ContractsValidator;
import ch.admin.bit.jeap.messaging.kafka.properties.KafkaProperties;
import ch.admin.bit.jeap.messaging.kafka.spring.JeapKafkaBeanNames;
import ch.admin.bit.jeap.messaging.model.MessageType;
import ch.admin.bit.jeap.processarchive.domain.event.DomainEventReceiver;
import ch.admin.bit.jeap.processarchive.kafka.KafkaMessageListener;
import jakarta.annotation.PreDestroy;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

@Component
public class KafkaDomainEventConsumerFactory {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(KafkaDomainEventConsumerFactory.class);
    private final DomainEventReceiver domainEventReceiver;
    private final ContractsValidator contractsValidator;
    private final List<ConcurrentMessageListenerContainer<?, ?>> containers = new CopyOnWriteArrayList();
    private final ContractsProvider contractsProvider;
    private final KafkaProperties kafkaProperties;
    private final BeanFactory beanFactory;
    private final JeapKafkaBeanNames jeapKafkaBeanNames;

    public KafkaDomainEventConsumerFactory(DomainEventReceiver domainEventReceiver, ContractsValidator contractsValidator, ContractsProvider contractsProvider, KafkaProperties kafkaProperties, BeanFactory beanFactory) {
        this.domainEventReceiver = domainEventReceiver;
        this.contractsValidator = contractsValidator;
        this.contractsProvider = contractsProvider;
        this.kafkaProperties = kafkaProperties;
        this.beanFactory = beanFactory;
        this.jeapKafkaBeanNames = new JeapKafkaBeanNames(kafkaProperties.getDefaultClusterName());
    }

    void startConsumer(String topicName, Set<String> eventNames, String clusterName) {
        if (!StringUtils.hasText((String)clusterName)) {
            clusterName = this.kafkaProperties.getDefaultClusterName();
        }
        log.info("Starting domain event listener for event(s) '{}' on topic '{}' on cluster '{}'", new Object[]{eventNames, topicName, clusterName});
        eventNames.forEach(eventName -> this.ensureConsumerContract(topicName, (String)eventName));
        KafkaMessageListener listener = new KafkaMessageListener(eventNames, this.domainEventReceiver);
        this.startConsumer(topicName, clusterName, listener);
    }

    private void ensureConsumerContract(String topicName, String eventName) {
        List<String> eventVersions = this.contractsProvider.getContracts().stream().filter(contract -> contract.getMessageTypeName().equals(eventName) && contract.getRole().equalsIgnoreCase("consumer")).map(Contract::getMessageTypeVersion).toList();
        eventVersions.forEach(version -> this.ensureConsumerContract(eventName, (String)version, topicName));
    }

    private void ensureConsumerContract(String eventName, String eventVersion, String topicName) {
        AvroMessageType type = new AvroMessageType();
        type.setName(eventName);
        type.setVersion(eventVersion);
        this.contractsValidator.ensureConsumerContract((MessageType)type, topicName);
    }

    private void startConsumer(String topicName, String clusterName, AcknowledgingMessageListener<AvroMessageKey, AvroDomainEvent> messageListener) {
        ConcurrentMessageListenerContainer container = (ConcurrentMessageListenerContainer)this.getKafkaListenerContainerFactory(clusterName).createContainer(new String[]{topicName});
        container.setupMessageListener(messageListener);
        container.start();
        this.containers.add(container);
    }

    private ConcurrentKafkaListenerContainerFactory<AvroMessageKey, AvroMessage> getKafkaListenerContainerFactory(String clusterName) {
        try {
            return (ConcurrentKafkaListenerContainerFactory)this.beanFactory.getBean(this.jeapKafkaBeanNames.getListenerContainerFactoryBeanName(clusterName));
        }
        catch (NoSuchBeanDefinitionException exception) {
            log.error("No kafkaListenerContainerFactory found for cluster with name '{}'", (Object)clusterName);
            throw new IllegalStateException("No kafkaListenerContainerFactory found for cluster with name " + clusterName);
        }
    }

    @PreDestroy
    void stop() {
        log.info("Stopping all domain event listener containers...");
        this.containers.forEach(concurrentMessageListenerContainer -> concurrentMessageListenerContainer.stop(true));
    }

    @Generated
    public List<ConcurrentMessageListenerContainer<?, ?>> getContainers() {
        return this.containers;
    }
}

