/*
 * Decompiled with CFR 0.152.
 */
package no.shhsoft.k3aembedded;

import java.io.IOException;
import java.io.PrintStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import kafka.server.KafkaConfig;
import kafka.server.KafkaRaftServer;
import kafka.server.MetaProperties;
import kafka.server.Server;
import kafka.tools.StorageTool;
import no.shhsoft.k3aembedded.FileUtils;
import no.shhsoft.k3aembedded.NetworkUtils;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
import org.apache.kafka.server.common.MetadataVersion;
import scala.Option;
import scala.collection.immutable.Seq;
import scala.jdk.CollectionConverters;

public final class K3aEmbedded {
    private static final int NODE_ID = 1;
    private Server server;
    private Path logDirectory;
    private final int brokerPort;
    private final int controllerPort;
    private final int[] additionalPorts;
    private final Map<String, Object> additionalConfiguration;
    private final AdditionalConfigurationProvider additionalConfigurationProvider;
    private final List<AdditionalListener> additionalListeners;

    private K3aEmbedded(int brokerPort, int controllerPort, int numAdditionalPorts, Map<String, Object> additionalConfiguration, AdditionalConfigurationProvider additionalConfigurationProvider, List<AdditionalListener> additionalListeners) {
        this.brokerPort = brokerPort > 0 ? brokerPort : NetworkUtils.getRandomAvailablePort();
        this.controllerPort = controllerPort > 0 ? controllerPort : NetworkUtils.getRandomAvailablePort();
        this.additionalConfiguration = additionalConfiguration;
        this.additionalConfigurationProvider = additionalConfigurationProvider;
        this.additionalListeners = additionalListeners;
        this.additionalPorts = new int[numAdditionalPorts];
        for (int q = 0; q < this.additionalPorts.length; ++q) {
            this.additionalPorts[q] = NetworkUtils.getRandomAvailablePort();
        }
    }

    public void start() {
        if (this.server != null) {
            throw new RuntimeException("Server already started");
        }
        this.logDirectory = this.createKafkaLogDirectory();
        HashMap<String, Object> map = this.getConfigMap();
        KafkaConfig config = new KafkaConfig(map);
        this.formatKafkaLogDirectory(config);
        this.server = new KafkaRaftServer(config, Time.SYSTEM);
        this.server.startup();
    }

    public void stop() {
        if (this.server == null) {
            return;
        }
        this.server.shutdown();
        this.server.awaitShutdown();
        this.server = null;
        FileUtils.deleteRecursively(this.logDirectory.toFile());
        this.logDirectory = null;
    }

    public int getBrokerPort() {
        return this.brokerPort;
    }

    public int getControllerPort() {
        return this.controllerPort;
    }

    public int getAdditionalPort(int index) {
        return this.additionalPorts[index];
    }

    public String getBootstrapServers() {
        return "localhost:" + this.getBrokerPort();
    }

    public String getBootstrapServersForAdditionalPort(int index) {
        return "localhost:" + this.additionalPorts[index];
    }

    private HashMap<String, Object> getConfigMap() {
        HashMap<String, Object> map = new HashMap<String, Object>();
        map.put("node.id", String.valueOf(1));
        map.put("process.roles", "broker, controller");
        map.put("controller.quorum.voters", "1@localhost:" + this.controllerPort);
        map.put("controller.listener.names", "CONTROLLER");
        map.put("inter.broker.listener.name", "BROKER");
        map.put("listeners", this.getListenersString());
        map.put("listener.security.protocol.map", this.getSecurityProtocolsString());
        map.put("log.dir", this.logDirectory.toString());
        map.put("offsets.topic.num.partitions", "1");
        map.put("offsets.topic.replication.factor", "1");
        map.put("group.initial.rebalance.delay.ms", "0");
        this.validateAndAddConfiguration(map, this.additionalConfiguration);
        if (this.additionalConfigurationProvider != null) {
            this.validateAndAddConfiguration(map, this.additionalConfigurationProvider.getAdditionalConfiguration());
        }
        return map;
    }

    private String getListenersString() {
        StringBuilder sb = new StringBuilder();
        sb.append("BROKER://:" + this.brokerPort + ", CONTROLLER://:" + this.controllerPort);
        for (AdditionalListener additionalListener : this.additionalListeners) {
            int port = additionalListener.port <= 0 ? this.additionalPorts[additionalListener.port] : additionalListener.port;
            sb.append(", " + additionalListener.name + "://:" + port);
        }
        return sb.toString();
    }

    private String getSecurityProtocolsString() {
        StringBuilder sb = new StringBuilder();
        sb.append("BROKER:PLAINTEXT, CONTROLLER:PLAINTEXT");
        for (AdditionalListener additionalListener : this.additionalListeners) {
            sb.append(", " + additionalListener.name + ":" + additionalListener.securityProtocol);
        }
        return sb.toString();
    }

    private void validateAndAddConfiguration(HashMap<String, Object> map, Map<String, Object> additionalConfiguration) {
        if (additionalConfiguration == null) {
            return;
        }
        if (additionalConfiguration.containsKey("node.id") && !map.get("node.id").toString().equals(additionalConfiguration.get("node.id").toString())) {
            throw new RuntimeException("node.id cannot be overriden");
        }
        map.putAll(additionalConfiguration);
    }

    private Path createKafkaLogDirectory() {
        try {
            return Files.createTempDirectory("kafka", new FileAttribute[0]);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private void formatKafkaLogDirectory(KafkaConfig kafkaConfig) {
        if (this.logDirectory == null) {
            throw new RuntimeException("No log directory. This should not happen.");
        }
        String clusterId = Uuid.randomUuid().toString();
        MetadataVersion metadataVersion = MetadataVersion.latest();
        MetaProperties metaProperties = StorageTool.buildMetadataProperties((String)clusterId, (KafkaConfig)kafkaConfig);
        BootstrapMetadata bootstrapMetadata = StorageTool.buildBootstrapMetadata((MetadataVersion)metadataVersion, (Option)Option.empty(), (String)"format command");
        Seq seq = CollectionConverters.ListHasAsScala(Collections.singletonList(this.logDirectory.toString())).asScala().toList().toSeq();
        StorageTool.formatCommand((PrintStream)System.out, (Seq)seq, (MetaProperties)metaProperties, (BootstrapMetadata)bootstrapMetadata, (MetadataVersion)metadataVersion, (boolean)false);
    }

    public static interface AdditionalConfigurationProvider {
        public Map<String, Object> getAdditionalConfiguration();
    }

    private static final class AdditionalListener {
        private final String name;
        private final String securityProtocol;
        private final int port;

        private AdditionalListener(String name, String securityProtocol, int port) {
            this.name = name;
            this.securityProtocol = securityProtocol;
            this.port = port;
        }
    }

    public static final class Builder {
        private int brokerPort = -1;
        private int controllerPort = -1;
        private int numAdditionalPorts = 0;
        private Map<String, Object> additionalConfiguration;
        private AdditionalConfigurationProvider additionalConfigurationProvider;
        private final List<AdditionalListener> additionalListeners = new ArrayList<AdditionalListener>();

        public K3aEmbedded build() {
            return new K3aEmbedded(this.brokerPort, this.controllerPort, this.numAdditionalPorts, this.additionalConfiguration, this.additionalConfigurationProvider, this.additionalListeners);
        }

        public Builder brokerPort(int brokerPort) {
            this.brokerPort = Builder.validatePort(brokerPort);
            return this;
        }

        public Builder controllerPort(int controllerPort) {
            this.controllerPort = Builder.validatePort(controllerPort);
            return this;
        }

        public Builder additionalPorts(int numAdditionalPorts) {
            this.numAdditionalPorts = numAdditionalPorts;
            return this;
        }

        public Builder additionalConfiguration(Map<String, Object> additionalConfiguration) {
            this.additionalConfiguration = additionalConfiguration;
            return this;
        }

        public Builder additionalListenerWithFixedPort(String name, String securityProtocol, int port) {
            this.additionalListeners.add(new AdditionalListener(name, securityProtocol, Builder.validatePort(port)));
            return this;
        }

        public Builder additionalListenerWithPortIndex(String name, String securityProtocol, int portIndex) {
            this.additionalListeners.add(new AdditionalListener(name, securityProtocol, -portIndex));
            return this;
        }

        public Builder additionalConfigurationProvider(AdditionalConfigurationProvider additionalConfigurationProvider) {
            this.additionalConfigurationProvider = additionalConfigurationProvider;
            return this;
        }

        private static int validatePort(int port) {
            if (port < 1 || port > 65535) {
                throw new RuntimeException("Ports must be in the range 1 to 65535");
            }
            return port;
        }
    }
}

