/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.kafkaconnector.common.services.kafka;

import java.util.HashMap;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.camel.kafkaconnector.common.PluginPathHelper;
import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
import org.apache.camel.test.infra.kafka.services.KafkaService;
import org.apache.kafka.connect.runtime.isolation.PluginDiscoveryMode;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EmbeddedKafkaService
implements KafkaService {
    private static final Logger LOG = LoggerFactory.getLogger(EmbeddedKafkaService.class);
    private static final long OFFSET_COMMIT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(30L);
    private EmbeddedConnectCluster cluster;
    private boolean started;

    public EmbeddedKafkaService() {
        this.buildCluster();
    }

    private void buildCluster() {
        LOG.info("Creating the embedded Kafka connect instance");
        EmbeddedConnectCluster.Builder builder = new EmbeddedConnectCluster.Builder();
        Properties brokerProps = new Properties();
        brokerProps.put("auto.create.topics.enable", String.valueOf(true));
        HashMap<String, Object> workerProps = new HashMap<String, Object>();
        workerProps.put("offset.flush.interval.ms", String.valueOf(OFFSET_COMMIT_INTERVAL_MS));
        String address = "http://localhost:" + NetworkUtils.getFreePort();
        LOG.info("Using the following address for  the listener configuration: {}", (Object)address);
        workerProps.put("listeners", address);
        LOG.info("Setting {} to: {}", (Object)"plugin.discovery", (Object)PluginDiscoveryMode.HYBRID_WARN);
        workerProps.put("plugin.discovery", String.valueOf(PluginDiscoveryMode.HYBRID_WARN));
        String pluginPaths = PluginPathHelper.getInstance().pluginPaths();
        LOG.info("Adding the returned directories to the plugin path. This may take A VERY long time to complete");
        workerProps.put("plugin.path", pluginPaths);
        LOG.info("Building the embedded Kafka connect instance");
        this.cluster = (EmbeddedConnectCluster)((EmbeddedConnectCluster.Builder)((EmbeddedConnectCluster.Builder)((EmbeddedConnectCluster.Builder)((EmbeddedConnectCluster.Builder)builder.name("connect-cluster").numWorkers(1).numBrokers(1)).brokerProps(brokerProps)).workerProps(workerProps)).maskExitProcedures(true)).build();
        LOG.info("Built the embedded Kafka connect instance");
    }

    public String getBootstrapServers() {
        if (this.started) {
            return this.cluster.kafka().bootstrapServers();
        }
        return null;
    }

    public void registerProperties() {
    }

    public void initialize() {
        if (!this.started) {
            this.cluster.start();
            this.started = true;
            this.registerProperties();
            LOG.info("Kafka bootstrap server running at address {}", (Object)this.getBootstrapServers());
        }
    }

    public void shutdown() {
        LOG.info("Stopping the embedded kafka cluster service");
        if (this.started) {
            this.cluster.stop();
            this.started = false;
        }
    }

    public void beforeAll(ExtensionContext extensionContext) {
        this.initialize();
    }

    public void afterAll(ExtensionContext context) {
        this.shutdown();
    }

    public EmbeddedConnectCluster getCluster() {
        if (!this.started) {
            LOG.warn("Returning a non-initialized cluster");
        }
        return this.cluster;
    }
}

