/*
 * Decompiled with CFR 0.152.
 */
package pl.netroute.hussar.service.kafka;

import java.util.List;
import java.util.Set;
import lombok.NonNull;
import org.apache.kafka.clients.admin.AdminClient;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import pl.netroute.hussar.core.api.Endpoint;
import pl.netroute.hussar.core.api.configuration.ConfigurationRegistry;
import pl.netroute.hussar.core.api.service.Service;
import pl.netroute.hussar.core.api.service.ServiceStartupContext;
import pl.netroute.hussar.core.helper.EndpointHelper;
import pl.netroute.hussar.core.service.BaseDockerService;
import pl.netroute.hussar.core.service.BaseDockerServiceConfig;
import pl.netroute.hussar.core.service.registerer.EndpointRegisterer;
import pl.netroute.hussar.service.kafka.KafkaAdminClientFactory;
import pl.netroute.hussar.service.kafka.KafkaDockerServiceConfig;
import pl.netroute.hussar.service.kafka.KafkaKraftModeConfigurer;
import pl.netroute.hussar.service.kafka.KafkaListener;
import pl.netroute.hussar.service.kafka.KafkaListenerConfigurer;
import pl.netroute.hussar.service.kafka.KafkaTopicAutoCreationConfigurer;
import pl.netroute.hussar.service.kafka.KafkaTopicConfigurer;
import pl.netroute.hussar.service.kafka.api.KafkaTopic;

public class KafkaDockerService
extends BaseDockerService<KafkaDockerServiceConfig> {
    private final KafkaListenerConfigurer listenerConfigurer;
    private final KafkaTopicConfigurer topicConfigurer;
    private final KafkaTopicAutoCreationConfigurer topicAutoCreationConfigurer;
    private final KafkaKraftModeConfigurer kraftModeConfigurer;

    KafkaDockerService(@NonNull KafkaContainer container, @NonNull KafkaDockerServiceConfig config, @NonNull ConfigurationRegistry configurationRegistry, @NonNull EndpointRegisterer endpointRegisterer, @NonNull KafkaListenerConfigurer listenerConfigurer, @NonNull KafkaTopicConfigurer topicConfigurer, @NonNull KafkaTopicAutoCreationConfigurer topicAutoCreationConfigurer, @NonNull KafkaKraftModeConfigurer kraftModeConfigurer) {
        super((GenericContainer)container, (BaseDockerServiceConfig)config, configurationRegistry, endpointRegisterer);
        if (container == null) {
            throw new NullPointerException("container is marked non-null but is null");
        }
        if (config == null) {
            throw new NullPointerException("config is marked non-null but is null");
        }
        if (configurationRegistry == null) {
            throw new NullPointerException("configurationRegistry is marked non-null but is null");
        }
        if (endpointRegisterer == null) {
            throw new NullPointerException("endpointRegisterer is marked non-null but is null");
        }
        if (listenerConfigurer == null) {
            throw new NullPointerException("listenerConfigurer is marked non-null but is null");
        }
        if (topicConfigurer == null) {
            throw new NullPointerException("topicConfigurer is marked non-null but is null");
        }
        if (topicAutoCreationConfigurer == null) {
            throw new NullPointerException("topicAutoCreationConfigurer is marked non-null but is null");
        }
        if (kraftModeConfigurer == null) {
            throw new NullPointerException("kraftModeConfigurer is marked non-null but is null");
        }
        this.listenerConfigurer = listenerConfigurer;
        this.topicConfigurer = topicConfigurer;
        this.topicAutoCreationConfigurer = topicAutoCreationConfigurer;
        this.kraftModeConfigurer = kraftModeConfigurer;
    }

    public List<Endpoint> getEndpoints() {
        KafkaListener externalListener = KafkaListenerConfigurer.EXTERNAL_LISTENER;
        String host = this.container.getHost();
        Integer port = this.container.getMappedPort(externalListener.port());
        Endpoint endpoint = Endpoint.schemeLess((String)host, (int)port);
        return List.of(endpoint);
    }

    protected void configureContainer(GenericContainer<?> container) {
        super.configureContainer(container);
        KafkaContainer kafkaContainer = (KafkaContainer)container;
        this.listenerConfigurer.configure(kafkaContainer);
        this.topicAutoCreationConfigurer.configure(((KafkaDockerServiceConfig)this.config).isTopicAutoCreation(), kafkaContainer);
        this.configureKraftMode(kafkaContainer);
        kafkaContainer.withExposedPorts(new Integer[]{KafkaListenerConfigurer.EXTERNAL_LISTENER.port()});
    }

    protected void doAfterServiceStartup(ServiceStartupContext context) {
        super.doAfterServiceStartup(context);
        Endpoint endpoint = EndpointHelper.getAnyEndpointOrFail((Service)this);
        AdminClient adminClient = KafkaAdminClientFactory.create(endpoint);
        this.configureTopics(adminClient);
    }

    private void configureKraftMode(KafkaContainer container) {
        if (((KafkaDockerServiceConfig)this.config).isKraftMode()) {
            this.kraftModeConfigurer.configure(container);
        }
    }

    private void configureTopics(AdminClient adminClient) {
        Set<KafkaTopic> topics = ((KafkaDockerServiceConfig)this.config).getTopics();
        topics.forEach(topic -> this.topicConfigurer.configure(adminClient, (KafkaTopic)topic));
    }
}

