/*
 * Decompiled with CFR 0.152.
 */
package org.kie.kogito.test.kafka;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.stream.StreamSupport;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaTestClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaTestClient.class);
    private final KafkaProducer<String, String> producer;
    private final KafkaConsumer<String, String> consumer;
    private final Object shutdownLock = new Object();
    private boolean shutdown = false;

    public KafkaTestClient(String hosts) {
        this(KafkaTestClient.createDefaultProducer(hosts), KafkaTestClient.createDefaultConsumer(hosts));
    }

    public KafkaTestClient(KafkaProducer<String, String> producer, KafkaConsumer<String, String> consumer) {
        this.producer = producer;
        this.consumer = consumer;
    }

    public void consume(Collection<String> topics, Consumer<String> callback) {
        this.consumer.subscribe(topics);
        CompletableFuture.runAsync(() -> {
            while (!this.shutdown) {
                Object object = this.shutdownLock;
                synchronized (object) {
                    ConsumerRecords records = this.consumer.poll(Duration.ofMillis(500L));
                    StreamSupport.stream(records.spliterator(), true).map(ConsumerRecord::value).forEach(callback::accept);
                    this.consumer.commitSync();
                }
            }
        });
    }

    public void consume(String topic, Consumer<String> callback) {
        this.consume(Collections.singletonList(topic), callback);
    }

    public void produce(String data, String topic) {
        this.producer.send(new ProducerRecord(topic, (Object)data), (m, ex) -> Optional.ofNullable(ex).ifPresent(e -> LOGGER.error("Error publishing message {}", (Object)m, (Object)ex)));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() {
        this.shutdown = true;
        Object object = this.shutdownLock;
        synchronized (object) {
            this.consumer.close();
        }
        this.producer.close();
    }

    private static KafkaConsumer<String, String> createDefaultConsumer(String hosts) {
        Properties consumerConfig = new Properties();
        consumerConfig.put("enable.auto.commit", "true");
        consumerConfig.put("auto.offset.reset", "earliest");
        consumerConfig.put("bootstrap.servers", hosts);
        consumerConfig.put("key.deserializer", IntegerDeserializer.class.getName());
        consumerConfig.put("value.deserializer", StringDeserializer.class.getName());
        consumerConfig.put("group.id", KafkaTestClient.class.getName() + "Consumer");
        return new KafkaConsumer(consumerConfig);
    }

    private static KafkaProducer<String, String> createDefaultProducer(String hosts) {
        Properties producerConfig = new Properties();
        producerConfig.put("bootstrap.servers", hosts);
        producerConfig.put("client.id", KafkaTestClient.class.getName() + "Producer");
        producerConfig.put("key.serializer", IntegerSerializer.class.getName());
        producerConfig.put("value.serializer", StringSerializer.class.getName());
        return new KafkaProducer(producerConfig);
    }
}

