/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.kafkaconnector.common.services.kafkaconnect;

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.services.kafka.KafkaService;
import org.apache.camel.kafkaconnector.common.services.kafkaconnect.KafkaConnectRunner;
import org.apache.camel.kafkaconnector.common.services.kafkaconnect.KafkaConnectService;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaConnectRunnerService
implements KafkaConnectService {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaConnectRunnerService.class);
    private final KafkaConnectRunner kafkaConnectRunner;
    private final ExecutorService service = Executors.newCachedThreadPool();

    public KafkaConnectRunnerService(KafkaService kafkaService) {
        Objects.nonNull(kafkaService);
        LOG.debug("Connecting the Kafka Connect Runner to {}", (Object)kafkaService.getBootstrapServers());
        this.kafkaConnectRunner = new KafkaConnectRunner(kafkaService.getBootstrapServers());
    }

    private void checkInitializationState(KafkaConnectRunner.ConnectorInitState initState) {
        Objects.nonNull(initState);
        Throwable error = initState.getError();
        Map<String, String> configs = initState.getConfigs();
        String name = configs.get("name");
        if (error != null) {
            LOG.error("Failed to create the connector {}: {}", new Object[]{name, error.getMessage(), error});
            throw new RuntimeException(String.format("Failed to create the connector %s: %s", name, error.getMessage()), error);
        }
        if (!initState.isCreated()) {
            LOG.debug("Failed to create connector {}", (Object)name);
            throw new RuntimeException(String.format("Failed to create connector %s", name));
        }
        LOG.debug("Created and initialized the connector {}", (Object)name);
    }

    private void checkInitializationState(KafkaConnectRunner.ConnectorInitState initState, CountDownLatch latch) {
        try {
            this.checkInitializationState(initState);
        }
        finally {
            latch.countDown();
        }
    }

    @Override
    public void initializeConnector(ConnectorPropertyFactory propertyFactory) throws ExecutionException, InterruptedException {
        this.kafkaConnectRunner.initializeConnector(propertyFactory, this::checkInitializationState);
    }

    @Override
    public void initializeConnectorBlocking(ConnectorPropertyFactory propertyFactory, Integer expectedTaskNumber) throws ExecutionException, InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        this.kafkaConnectRunner.initializeConnector(propertyFactory, this::checkInitializationState, latch);
        if (!latch.await(30L, TimeUnit.SECONDS)) {
            Assert.fail((String)"The connector did not start within a reasonable time");
        }
    }

    @Override
    public void stop() {
        this.kafkaConnectRunner.stop();
        try {
            this.service.awaitTermination(5L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            LOG.warn("The test was interrupted while executing");
        }
    }

    @Override
    public void start() {
        CountDownLatch latch = new CountDownLatch(1);
        this.service.submit(() -> this.kafkaConnectRunner.run(latch));
        try {
            if (!latch.await(30L, TimeUnit.SECONDS)) {
                LOG.warn("The Kafka Connect Runner timed out while initializing");
                throw new RuntimeException("The Kafka Connect Runner timed out while initializing");
            }
        }
        catch (InterruptedException e) {
            LOG.error("The test was interrupted while executing");
        }
    }
}

