/*
 * Decompiled with CFR 0.152.
 */
package org.kie.kogito.test.quarkus.kafka;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.kafka.client.consumer.KafkaConsumer;
import io.vertx.kafka.client.producer.KafkaProducer;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import io.vertx.kafka.client.producer.RecordMetadata;
import java.util.Collections;
import java.util.HashMap;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaTestClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaTestClient.class);
    private static final int TIMEOUT = 10;
    private final KafkaProducer<String, String> producer;
    private final KafkaConsumer<String, String> consumer;
    private Vertx vertx = Vertx.vertx();

    public KafkaTestClient(String hosts) {
        this.producer = this.createDefaultProducer(hosts);
        this.consumer = this.createDefaultConsumer(hosts);
    }

    public KafkaTestClient(KafkaProducer<String, String> producer, KafkaConsumer<String, String> consumer) {
        this.producer = producer;
        this.consumer = consumer;
    }

    private KafkaConsumer<String, String> createDefaultConsumer(String hosts) {
        HashMap<String, Object> consumerConfig = new HashMap<String, Object>();
        consumerConfig.put("enable.auto.commit", "true");
        consumerConfig.put("auto.offset.reset", "earliest");
        consumerConfig.put("bootstrap.servers", hosts);
        consumerConfig.put("key.deserializer", StringDeserializer.class.getName());
        consumerConfig.put("value.deserializer", StringDeserializer.class.getName());
        consumerConfig.put("group.id", KafkaTestClient.class.getName() + "Consumer");
        return KafkaConsumer.create((Vertx)this.vertx, consumerConfig);
    }

    private KafkaProducer<String, String> createDefaultProducer(String hosts) {
        Properties producerConfig = new Properties();
        producerConfig.put("bootstrap.servers", hosts);
        producerConfig.put("acks", "1");
        producerConfig.put("client.id", KafkaTestClient.class.getName() + "Producer");
        producerConfig.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producerConfig.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return KafkaProducer.create((Vertx)this.vertx, (Properties)producerConfig);
    }

    public void consume(Set<String> topics, Consumer<String> callback) {
        this.waitForCompletion(this.consumer.unsubscribe());
        this.consumer.handler(record -> callback.accept((String)record.value()));
        this.waitForCompletion(this.consumer.subscribe(topics));
    }

    public void consume(String topic, Consumer<String> callback) {
        this.consume(Collections.singleton(topic), callback);
    }

    public void produce(String data, String topic) {
        LOGGER.info("Publishing event with data {} for topic {}", (Object)data, (Object)topic);
        this.producer.send(KafkaProducerRecord.create((String)topic, (Object)data), this::produceCallback);
        this.producer.flush();
    }

    public void produceCallback(AsyncResult<RecordMetadata> result) {
        if (result.failed()) {
            LOGGER.error("Event publishing failed", result.cause());
        } else {
            LOGGER.info("Event published {}", result.result());
        }
    }

    public void shutdown() {
        this.waitForCompletion(this.producer.close());
        this.waitForCompletion(this.consumer.close());
    }

    public void waitForCompletion(Future future) {
        try {
            future.toCompletionStage().toCompletableFuture().get(10L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        catch (TimeoutException e) {
            throw new IllegalStateException(e);
        }
        catch (ExecutionException e) {
            if (e.getCause() instanceof RuntimeException) {
                throw (RuntimeException)e.getCause();
            }
            throw new KafkaException(e.getCause());
        }
    }
}

