/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.kafkaconnector.common.clients.kafka;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Predicate;
import org.apache.camel.kafkaconnector.common.clients.kafka.ConsumerPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.DefaultConsumerPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.DefaultProducerPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.ProducerPropertyFactory;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;

public class KafkaClient<K, V> {
    private final ConsumerPropertyFactory consumerPropertyFactory;
    private final ProducerPropertyFactory producerPropertyFactory;
    private KafkaProducer<K, V> producer;
    private KafkaConsumer<K, V> consumer;

    public KafkaClient(String bootstrapServer) {
        this.consumerPropertyFactory = new DefaultConsumerPropertyFactory(bootstrapServer);
        this.producerPropertyFactory = new DefaultProducerPropertyFactory(bootstrapServer);
        this.producer = new KafkaProducer(this.producerPropertyFactory.getProperties());
        this.consumer = new KafkaConsumer(this.consumerPropertyFactory.getProperties());
    }

    public void consume(String topic, Predicate<ConsumerRecord<K, V>> predicate) {
        this.consumer.subscribe(Arrays.asList(topic));
        block0: while (true) {
            ConsumerRecord record;
            ConsumerRecords records = this.consumer.poll(Duration.ofMillis(10L));
            Iterator iterator = records.iterator();
            do {
                if (!iterator.hasNext()) continue block0;
            } while (predicate.test(record = (ConsumerRecord)iterator.next()));
            break;
        }
    }

    public void produce(String topic, V message) throws ExecutionException, InterruptedException {
        ProducerRecord record = new ProducerRecord(topic, message);
        Future future = this.producer.send(record);
        future.get();
    }

    public void produce(String topic, V message, Map<String, String> headers) throws ExecutionException, InterruptedException {
        ProducerRecord record = new ProducerRecord(topic, message);
        for (Map.Entry<String, String> entry : headers.entrySet()) {
            record.headers().add((Header)new TestHeader(entry.getKey(), entry.getValue()));
        }
        Future future = this.producer.send(record);
        future.get();
    }

    public void deleteTopic(String topic) {
        Properties props = this.producerPropertyFactory.getProperties();
        AdminClient admClient = AdminClient.create((Properties)props);
        admClient.deleteTopics(Collections.singleton(topic));
    }

    private static class TestHeader
    implements Header {
        private final String key;
        private final String value;

        public TestHeader(String key, String value) {
            this.key = key;
            this.value = value;
        }

        public String key() {
            return this.key;
        }

        public byte[] value() {
            return this.value.getBytes();
        }
    }
}

