/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.kafkaconnector.common.services.kafkaconnect;

import java.util.HashMap;
import java.util.Map;
import java.util.function.BiConsumer;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.services.kafka.EmbeddedKafkaService;
import org.apache.camel.kafkaconnector.common.services.kafkaconnect.KafkaConnectService;
import org.apache.camel.test.infra.kafka.services.KafkaService;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.test.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaConnectEmbedded
implements KafkaConnectService {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaConnectEmbedded.class);
    private final EmbeddedConnectCluster cluster;
    private String connectorName;

    public KafkaConnectEmbedded(KafkaService kafkaService) {
        if (!(kafkaService instanceof EmbeddedKafkaService)) {
            throw new RuntimeException("Invalid Kafka service type: " + (kafkaService == null ? "null" : kafkaService.getClass()));
        }
        this.cluster = ((EmbeddedKafkaService)kafkaService).getCluster();
    }

    private void convertProperty(Map<String, String> map, Object key, Object value) {
        map.put(String.valueOf(key), String.valueOf(value));
    }

    @Override
    public void initializeConnector(ConnectorPropertyFactory propertyFactory) {
        LOG.trace("Adding the new connector");
        HashMap configuredProperties = new HashMap();
        propertyFactory.getProperties().forEach((BiConsumer<? super Object, ? super Object>)((BiConsumer<Object, Object>)(k, v) -> this.convertProperty(configuredProperties, k, v)));
        this.connectorName = (String)configuredProperties.get("name");
        LOG.info("Initializing connector {}", (Object)this.connectorName);
        this.cluster.configureConnector(this.connectorName, configuredProperties);
        LOG.trace("Added the new connector");
    }

    @Override
    public void initializeConnectorBlocking(ConnectorPropertyFactory propertyFactory, Integer expectedTaskNumber) throws InterruptedException {
        this.initializeConnector(propertyFactory);
        TestUtils.waitForCondition(() -> {
            ConnectorStateInfo connectorStateInfo = null;
            do {
                connectorStateInfo = this.cluster.connectorStatus(this.connectorName);
                Thread.sleep(20L);
            } while (connectorStateInfo == null);
            return connectorStateInfo.tasks().size() >= expectedTaskNumber && connectorStateInfo.connector().state().equals(AbstractStatus.State.RUNNING.toString()) && connectorStateInfo.tasks().stream().allMatch(s -> s.state().equals(AbstractStatus.State.RUNNING.toString()));
        }, (long)30000L, (String)("The connector " + this.connectorName + " did not start within a reasonable time"));
    }

    @Override
    public void stop() {
        if (this.connectorName != null) {
            try {
                LOG.info("Removing topics used during the test");
                Admin client = this.cluster.kafka().createAdminClient();
                client.deleteTopics(this.cluster.connectorTopics(this.connectorName).topics());
                LOG.info("Removing connector {}", (Object)this.connectorName);
                this.cluster.deleteConnector(this.connectorName);
            }
            finally {
                this.connectorName = null;
            }
        }
    }

    @Override
    public void start() {
    }
}

