/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.kafkaconnector.common.services.kafka;

import java.util.HashMap;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.camel.kafkaconnector.common.PluginPathHelper;
import org.apache.camel.kafkaconnector.common.services.kafka.KafkaService;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EmbeddedKafkaService
implements KafkaService {
    private static final Logger LOG = LoggerFactory.getLogger(EmbeddedKafkaService.class);
    private static final long OFFSET_COMMIT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(30L);
    private EmbeddedConnectCluster cluster;
    private boolean started;

    public EmbeddedKafkaService() {
        this.buildCluster();
    }

    private void buildCluster() {
        LOG.info("Creating the embedded Kafka connect instance");
        EmbeddedConnectCluster.Builder builder = new EmbeddedConnectCluster.Builder();
        Properties brokerProps = new Properties();
        brokerProps.put("auto.create.topics.enable", String.valueOf(true));
        HashMap<String, String> workerProps = new HashMap<String, String>();
        workerProps.put("offset.flush.interval.ms", String.valueOf(OFFSET_COMMIT_INTERVAL_MS));
        String pluginPaths = PluginPathHelper.getInstance().pluginPaths();
        LOG.info("Adding the returned directories to the plugin path. This may take A VERY long time to complete");
        workerProps.put("plugin.path", pluginPaths);
        LOG.info("Building the embedded Kafka connect instance");
        this.cluster = builder.name("connect-cluster").numWorkers(1).numBrokers(1).brokerProps(brokerProps).workerProps(workerProps).maskExitProcedures(true).build();
        LOG.info("Built the embedded Kafka connect instance");
    }

    @Override
    public String getBootstrapServers() {
        if (this.started) {
            return this.cluster.kafka().bootstrapServers();
        }
        return null;
    }

    @Override
    public void initialize() {
        if (!this.started) {
            this.cluster.start();
            this.started = true;
            LOG.info("Kafka bootstrap server running at address {}", (Object)this.getBootstrapServers());
        }
    }

    @Override
    public void shutdown() {
        LOG.info("Stopping the embedded kafka cluster service");
        if (this.started) {
            this.cluster.stop();
            this.cluster = null;
            this.started = false;
        }
    }

    @Override
    public void beforeTestExecution(ExtensionContext extensionContext) throws Exception {
        this.initialize();
    }

    @Override
    public void afterTestExecution(ExtensionContext context) throws Exception {
        this.shutdown();
    }

    public EmbeddedConnectCluster getCluster() {
        if (!this.started) {
            LOG.warn("Returning a non-initialized cluster");
        }
        return this.cluster;
    }
}

