/*
 * Decompiled with CFR 0.152.
 */
package ch.admin.bit.jeap.errorhandling.infrastructure.kafka;

import ch.admin.bit.jeap.messaging.kafka.KafkaConfiguration;
import ch.admin.bit.jeap.messaging.kafka.auth.KafkaAuthProperties;
import ch.admin.bit.jeap.messaging.kafka.properties.KafkaProperties;
import ch.admin.bit.jeap.messaging.kafka.spring.JeapKafkaBeanNames;
import ch.admin.bit.jeap.messaging.kafka.spring.JeapKafkaPropertyFactory;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import lombok.Generated;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;

@Component
public class KafkaDeadLetterBatchConsumerProducer {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(KafkaDeadLetterBatchConsumerProducer.class);
    private final String deadLetterTopicName;
    private final String errorTopicName;
    private final KafkaConfiguration kafkaConfiguration;
    private final BeanFactory beanFactory;
    private final JeapKafkaBeanNames beanNames;
    private final String defaultClusterName;

    public KafkaDeadLetterBatchConsumerProducer(@Value(value="${jeap.errorhandling.deadLetterTopicName}") String deadLetterTopicName, @Value(value="${jeap.errorhandling.topic}") String errorTopicName, KafkaConfiguration kafkaConfiguration, Environment environment, BeanFactory beanFactory) {
        this.deadLetterTopicName = deadLetterTopicName;
        this.errorTopicName = errorTopicName;
        this.kafkaConfiguration = kafkaConfiguration;
        this.beanFactory = beanFactory;
        KafkaProperties kafkaProperties = JeapKafkaPropertyFactory.createJeapKafkaProperties((Environment)environment);
        this.beanNames = new JeapKafkaBeanNames(kafkaProperties.getDefaultClusterName());
        this.defaultClusterName = kafkaProperties.getDefaultClusterName();
    }

    public void consumeAndProduce(int maxRecords) {
        log.info("Start consuming and producing messages from topic {} to topic {} with maxRecords {}", new Object[]{this.deadLetterTopicName, this.errorTopicName, maxRecords});
        try (KafkaConsumer<byte[], byte[]> consumer = this.createConsumer(this.kafkaConfiguration, maxRecords);
             KafkaProducer<byte[], byte[]> producer = this.createProducer(this.kafkaConfiguration);){
            consumer.subscribe(Collections.singletonList(this.deadLetterTopicName));
            ConsumerRecords records = consumer.poll(Duration.ofSeconds(30L));
            int received = records.count();
            for (ConsumerRecord data : records) {
                log.debug("Received message from partition {} with offset {}", (Object)data.partition(), (Object)data.offset());
                if (data.key() != null) {
                    producer.send(new ProducerRecord(this.errorTopicName, (Object)((byte[])data.value())));
                    continue;
                }
                producer.send(new ProducerRecord(this.errorTopicName, (Object)((byte[])data.key()), (Object)((byte[])data.value())));
            }
            log.info("Consumed and produced {} records. Stopping consumer and producer.", (Object)received);
        }
        catch (Exception e) {
            log.error("Error occurred while consuming and producing messages", (Throwable)e);
            throw new IllegalStateException("Error occurred while consuming and producing messages", e);
        }
    }

    private KafkaConsumer<byte[], byte[]> createConsumer(KafkaConfiguration kafkaConfiguration, int maxRecords) {
        Map defaultConsumerConfig = kafkaConfiguration.consumerConfig(this.defaultClusterName);
        Map<String, Object> props = this.commonConfig(this.defaultClusterName);
        props.put("bootstrap.servers", defaultConsumerConfig.get("bootstrap.servers"));
        props.put("group.id", defaultConsumerConfig.get("group.id"));
        props.put("key.deserializer", ByteArrayDeserializer.class);
        props.put("value.deserializer", ByteArrayDeserializer.class);
        props.put("max.poll.records", maxRecords);
        props.put("auto.offset.reset", "earliest");
        return new KafkaConsumer(props);
    }

    private Map<String, Object> commonConfig(String clusterName) {
        KafkaAuthProperties kafkaAuthProperties = (KafkaAuthProperties)this.beanFactory.getBean(this.beanNames.getAuthPropertiesBeanName(clusterName));
        return new HashMap<String, Object>(kafkaAuthProperties.authenticationProperties(clusterName));
    }

    private KafkaProducer<byte[], byte[]> createProducer(KafkaConfiguration kafkaConfiguration) {
        HashMap<String, Class<ByteArraySerializer>> props = new HashMap<String, Class<ByteArraySerializer>>(kafkaConfiguration.producerConfig(this.defaultClusterName));
        props.put("key.serializer", ByteArraySerializer.class);
        props.put("value.serializer", ByteArraySerializer.class);
        props.remove("interceptor.classes");
        return new KafkaProducer(props);
    }
}

