/*
 * Decompiled with CFR 0.152.
 */
package pl.netroute.hussar.service.kafka.api;

import java.util.List;
import java.util.Set;
import lombok.NonNull;
import org.apache.kafka.clients.admin.AdminClient;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.kafka.ConfluentKafkaContainer;
import pl.netroute.hussar.core.api.Endpoint;
import pl.netroute.hussar.core.configuration.api.ConfigurationRegistry;
import pl.netroute.hussar.core.docker.api.DockerNetwork;
import pl.netroute.hussar.core.helper.EndpointHelper;
import pl.netroute.hussar.core.network.api.NetworkConfigurer;
import pl.netroute.hussar.core.service.ServiceStartupContext;
import pl.netroute.hussar.core.service.api.BaseDockerService;
import pl.netroute.hussar.core.service.api.BaseDockerServiceConfig;
import pl.netroute.hussar.core.service.api.Service;
import pl.netroute.hussar.core.service.registerer.EndpointRegisterer;
import pl.netroute.hussar.service.kafka.api.KafkaAdminClientFactory;
import pl.netroute.hussar.service.kafka.api.KafkaDockerServiceConfig;
import pl.netroute.hussar.service.kafka.api.KafkaTopic;
import pl.netroute.hussar.service.kafka.api.KafkaTopicAutoCreationConfigurer;
import pl.netroute.hussar.service.kafka.api.KafkaTopicConfigurer;

public class KafkaDockerService
extends BaseDockerService<KafkaDockerServiceConfig> {
    private static final int KAFKA_LISTENING_PORT = 9092;
    @NonNull
    private final KafkaTopicConfigurer topicConfigurer;
    @NonNull
    private final KafkaTopicAutoCreationConfigurer topicAutoCreationConfigurer;

    KafkaDockerService(@NonNull ConfluentKafkaContainer container, @NonNull DockerNetwork dockerNetwork, @NonNull KafkaDockerServiceConfig config, @NonNull ConfigurationRegistry configurationRegistry, @NonNull EndpointRegisterer endpointRegisterer, @NonNull NetworkConfigurer networkConfigurer, @NonNull KafkaTopicConfigurer topicConfigurer, @NonNull KafkaTopicAutoCreationConfigurer topicAutoCreationConfigurer) {
        super((GenericContainer)container, dockerNetwork, (BaseDockerServiceConfig)config, configurationRegistry, endpointRegisterer, networkConfigurer);
        if (container == null) {
            throw new NullPointerException("container is marked non-null but is null");
        }
        if (dockerNetwork == null) {
            throw new NullPointerException("dockerNetwork is marked non-null but is null");
        }
        if (config == null) {
            throw new NullPointerException("config is marked non-null but is null");
        }
        if (configurationRegistry == null) {
            throw new NullPointerException("configurationRegistry is marked non-null but is null");
        }
        if (endpointRegisterer == null) {
            throw new NullPointerException("endpointRegisterer is marked non-null but is null");
        }
        if (networkConfigurer == null) {
            throw new NullPointerException("networkConfigurer is marked non-null but is null");
        }
        if (topicConfigurer == null) {
            throw new NullPointerException("topicConfigurer is marked non-null but is null");
        }
        if (topicAutoCreationConfigurer == null) {
            throw new NullPointerException("topicAutoCreationConfigurer is marked non-null but is null");
        }
        this.topicConfigurer = topicConfigurer;
        this.topicAutoCreationConfigurer = topicAutoCreationConfigurer;
    }

    protected List<Integer> getInternalPorts() {
        return List.of(Integer.valueOf(9092));
    }

    protected void configureContainer(GenericContainer<?> container) {
        super.configureContainer(container);
        ConfluentKafkaContainer kafkaContainer = (ConfluentKafkaContainer)container;
        this.topicAutoCreationConfigurer.configure(((KafkaDockerServiceConfig)this.config).isTopicAutoCreation(), kafkaContainer);
    }

    protected void doAfterServiceStartup(ServiceStartupContext context) {
        super.doAfterServiceStartup(context);
        Endpoint endpoint = EndpointHelper.getAnyEndpointOrFail((Service)this);
        try (AdminClient adminClient = KafkaAdminClientFactory.create(endpoint);){
            this.configureTopics(adminClient);
        }
    }

    private void configureTopics(AdminClient adminClient) {
        Set<KafkaTopic> topics = ((KafkaDockerServiceConfig)this.config).getTopics();
        topics.forEach(topic -> this.topicConfigurer.configure(adminClient, (KafkaTopic)topic));
    }
}

