/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.kafkaconnector.common.test;

import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.test.FunctionalTestMessageProducer;
import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
import org.apache.camel.kafkaconnector.common.test.TestMessageProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class CamelSinkTestSupport
extends AbstractKafkaTest {
    private static final Logger LOG = LoggerFactory.getLogger(CamelSinkTestSupport.class);

    protected void runTest(ConnectorPropertyFactory connectorPropertyFactory, String topic, int count) throws Exception {
        StringMessageProducer stringMessageProducer = new StringMessageProducer(this.getKafkaService().getBootstrapServers(), topic, count);
        this.runTest(connectorPropertyFactory, stringMessageProducer);
    }

    protected void runTest(ConnectorPropertyFactory connectorPropertyFactory, TestMessageProducer producer) throws Exception {
        connectorPropertyFactory.log();
        this.getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
        LOG.debug("Creating the consumer ...");
        ExecutorService service = Executors.newCachedThreadPool();
        CountDownLatch latch = new CountDownLatch(1);
        service.submit(() -> this.consumeMessages(latch));
        producer.produceMessages();
        LOG.debug("Waiting for the messages to be processed");
        service.shutdown();
        LOG.debug("Waiting for the test to complete");
        this.verifyMessages(latch);
    }

    protected void runTest(ConnectorPropertyFactory connectorPropertyFactory, FunctionalTestMessageProducer producer) throws ExecutionException, InterruptedException {
        connectorPropertyFactory.log();
        this.getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
        LOG.debug("Creating the consumer ...");
        ExecutorService service = Executors.newCachedThreadPool();
        CountDownLatch latch = new CountDownLatch(1);
        service.submit(() -> this.consumeMessages(latch));
        producer.produceMessages();
        LOG.debug("Waiting for the messages to be processed");
        service.shutdown();
        LOG.debug("Waiting for the test to complete");
        this.verifyMessages(latch);
    }

    protected void runTestNonBlocking(ConnectorPropertyFactory connectorPropertyFactory, TestMessageProducer producer) throws Exception {
        connectorPropertyFactory.log();
        this.getKafkaConnectService().initializeConnector(connectorPropertyFactory);
        LOG.debug("Creating the consumer ...");
        ExecutorService service = Executors.newCachedThreadPool();
        CountDownLatch latch = new CountDownLatch(1);
        service.submit(() -> this.consumeMessages(latch));
        producer.produceMessages();
        LOG.debug("Waiting for the messages to be processed");
        service.shutdown();
        LOG.debug("Waiting for the test to complete");
        this.verifyMessages(latch);
    }

    protected boolean waitForData() {
        try {
            Thread.sleep(Duration.ofSeconds(1L).toMillis());
            return true;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }

    protected abstract void consumeMessages(CountDownLatch var1);

    protected abstract void verifyMessages(CountDownLatch var1) throws InterruptedException;
}

