/*
 * Decompiled with CFR 0.152.
 */
package ch.admin.bit.jeap.errorhandling.infrastructure.kafka;

import brave.kafka.clients.KafkaTracing;
import ch.admin.bit.jeap.errorhandling.infrastructure.kafka.ResendClusterProvider;
import ch.admin.bit.jeap.errorhandling.infrastructure.persistence.Error;
import ch.admin.bit.jeap.errorhandling.infrastructure.persistence.MessageHeader;
import ch.admin.bit.jeap.messaging.kafka.KafkaConfiguration;
import ch.admin.bit.jeap.messaging.kafka.properties.KafkaProperties;
import ch.admin.bit.jeap.messaging.kafka.tracing.TraceContext;
import ch.admin.bit.jeap.messaging.kafka.tracing.TraceContextUpdater;
import ch.admin.bit.jeap.messaging.kafka.tracing.TracingKafkaProducerFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.stereotype.Component;

@Component
public class KafkaFailedEventResender {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(KafkaFailedEventResender.class);
    private final ResendClusterProvider resendClusterProvider;
    private final TraceContextUpdater traceContextUpdater;
    private final Map<String, KafkaTemplate<Object, Object>> kafkaTemplateByClusterName;
    @Value(value="${jeap.errorhandling.timeout-seconds:60}")
    private int timeoutSeconds;

    public KafkaFailedEventResender(ResendClusterProvider resendClusterProvider, KafkaProperties kafkaProperties, KafkaConfiguration kafkaConfiguration, KafkaTracing kafkaTracing, TraceContextUpdater traceContextUpdater) {
        this.resendClusterProvider = resendClusterProvider;
        this.traceContextUpdater = traceContextUpdater;
        this.kafkaTemplateByClusterName = kafkaProperties.clusterNames().stream().collect(Collectors.toMap(clusterName -> clusterName, clusterName -> KafkaFailedEventResender.createKafkaTemplate(kafkaConfiguration, kafkaTracing, clusterName)));
    }

    private static KafkaTemplate<Object, Object> createKafkaTemplate(KafkaConfiguration kafkaConfiguration, KafkaTracing kafkaTracing, String clusterName) {
        return new KafkaTemplate((ProducerFactory)new TracingKafkaProducerFactory(kafkaTracing.messagingTracing(), KafkaFailedEventResender.adaptKafkaConfiguration(clusterName, kafkaConfiguration)));
    }

    public void resend(Error error) {
        byte[] message = error.getCausingEventMessage().getPayload();
        byte[] key = error.getCausingEventMessage().getKey();
        String topic = error.getCausingEventMessage().getTopic();
        String clusterName = this.resendClusterProvider.getResendClusterNameFor(error.getCausingEvent());
        log.info("Resending event {} for error {} to topic '{}' on cluster '{}'.", new Object[]{error.getCausingEventMetadata().getId(), error.getId(), topic, clusterName});
        if (error.getOriginalTraceContext() != null) {
            log.debug("Original traceId found on the error to resend. Overriding the current tracing context with the original traceId {}", (Object)error.getOriginalTraceContext());
            this.traceContextUpdater.setTraceContext(new TraceContext(error.getOriginalTraceContext().getTraceIdHigh(), error.getOriginalTraceContext().getTraceId(), error.getOriginalTraceContext().getSpanId(), error.getOriginalTraceContext().getParentSpanId(), error.getOriginalTraceContext().getTraceIdString()));
        }
        ProducerRecord producerRecord = new ProducerRecord(topic, (Object)key, (Object)message);
        KafkaFailedEventResender.addHeadersFromCausingEvent(error, (ProducerRecord<Object, Object>)producerRecord);
        CompletableFuture sendResult = this.kafkaTemplateByClusterName.get(clusterName).send(producerRecord);
        try {
            sendResult.get(this.timeoutSeconds, TimeUnit.SECONDS);
        }
        catch (Exception e) {
            throw new RuntimeException("Cannot resend event on Kafka", e);
        }
        log.info("Resent event {} for error {} to topic '{}' on cluster '{}'.", new Object[]{error.getCausingEventMetadata().getId(), error.getId(), topic, clusterName});
    }

    private static void addHeadersFromCausingEvent(Error error, ProducerRecord<Object, Object> producerRecord) {
        List<MessageHeader> headers = error.getCausingEvent().getHeaders();
        if (headers != null) {
            for (MessageHeader header : headers) {
                producerRecord.headers().add(header.getHeaderName(), header.getHeaderValue());
            }
        }
    }

    private static Map<String, Object> adaptKafkaConfiguration(String clusterName, KafkaConfiguration kafkaConfiguration) {
        HashMap<String, Object> props = new HashMap<String, Object>(kafkaConfiguration.producerConfig(clusterName));
        props.put("key.serializer", ByteArraySerializer.class);
        props.put("value.serializer", ByteArraySerializer.class);
        props.remove("interceptor.classes");
        return props;
    }
}

