/*
 * Decompiled with CFR 0.152.
 */
package one.tomorrow.transactionaloutbox.commons;

import com.google.protobuf.Message;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import one.tomorrow.transactionaloutbox.commons.KafkaProtobufSerializer;
import one.tomorrow.transactionaloutbox.commons.ProxiedKafkaContainer;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.test.utils.KafkaTestUtils;

public interface CommonKafkaTestSupport<T> {
    public static Consumer<String, byte[]> createConsumer(String bootstrapServers) {
        return CommonKafkaTestSupport.createConsumer(bootstrapServers, ByteArrayDeserializer.class);
    }

    public static Map<String, Object> consumerProps(String bootstrapServers, String group, boolean autoCommit) {
        HashMap<String, Object> props = new HashMap<String, Object>();
        props.put("bootstrap.servers", bootstrapServers);
        props.put("group.id", group);
        props.put("enable.auto.commit", autoCommit);
        props.put("auto.commit.interval.ms", "10");
        props.put("session.timeout.ms", "60000");
        props.put("auto.offset.reset", "earliest");
        return props;
    }

    public static <T> Consumer<String, T> createConsumer(String bootstrapServers, Class<? extends Deserializer<T>> deserializerClass) {
        Map<String, Object> consumerProps = CommonKafkaTestSupport.consumerProps(bootstrapServers, "testGroup", true);
        consumerProps.put("key.deserializer", StringDeserializer.class);
        consumerProps.put("value.deserializer", deserializerClass);
        DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(consumerProps);
        return cf.createConsumer("testConsumer-" + System.currentTimeMillis(), "someClientIdSuffix");
    }

    public static Map<String, Object> producerProps() {
        return CommonKafkaTestSupport.producerProps(ProxiedKafkaContainer.bootstrapServers);
    }

    public static Map<String, Object> producerProps(String bootstrapServers) {
        return new HashMap<String, Object>(Map.of("bootstrap.servers", bootstrapServers));
    }

    public static KafkaProducer<String, Message> createTopicAndProducer(String bootstrapServers, String ... topics) {
        CommonKafkaTestSupport.createTopic(bootstrapServers, topics);
        return CommonKafkaTestSupport.createProducer(bootstrapServers);
    }

    public static KafkaProducer<String, Message> createProducer(String bootstrapServers) {
        Map<String, Object> props = CommonKafkaTestSupport.producerProps(bootstrapServers);
        props.put("key.serializer", StringSerializer.class);
        props.put("value.serializer", KafkaProtobufSerializer.class);
        return new KafkaProducer(props);
    }

    public static void createTopic(String bootstrapServers, String ... topics) {
        Map<String, Object> props = CommonKafkaTestSupport.producerProps(bootstrapServers);
        try (AdminClient client = AdminClient.create(props);){
            List newTopics = Arrays.stream(topics).map(topic -> new NewTopic(topic, 1, 1)).collect(Collectors.toList());
            try {
                client.createTopics(newTopics).all().get();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    default public ConsumerRecords<String, T> getAndCommitRecords() {
        return this.getAndCommitRecords(-1);
    }

    default public ConsumerRecords<String, T> getAndCommitRecords(int minRecords) {
        ConsumerRecords records = KafkaTestUtils.getRecords(this.consumer(), (Duration)Duration.ofSeconds(10L), (int)minRecords);
        this.consumer().commitSync();
        return records;
    }

    public Consumer<String, T> consumer();
}

