/*
 * Decompiled with CFR 0.152.
 */
package ch.admin.bit.jeap.messaging.transactionaloutbox.messaging;

import ch.admin.bit.jeap.messaging.kafka.signature.SignatureService;
import ch.admin.bit.jeap.messaging.kafka.tracing.TracingKafkaTemplateFactory;
import ch.admin.bit.jeap.messaging.transactionaloutbox.outbox.DeferredMessage;
import ch.admin.bit.jeap.messaging.transactionaloutbox.outbox.DeferredMessageLogArgument;
import ch.admin.bit.jeap.messaging.transactionaloutbox.outbox.DeferredMessageSendException;
import ch.admin.bit.jeap.messaging.transactionaloutbox.outbox.DeferredMessageSender;
import ch.admin.bit.jeap.messaging.transactionaloutbox.outbox.OutboxMetrics;
import ch.admin.bit.jeap.messaging.transactionaloutbox.outbox.OutboxTracing;
import ch.admin.bit.jeap.messaging.transactionaloutbox.outbox.TransactionalOutboxConfiguration;
import io.micrometer.core.annotation.Timed;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import lombok.Generated;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

class KafkaDeferredMessageSender
implements DeferredMessageSender {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(KafkaDeferredMessageSender.class);
    private final KafkaTemplate<byte[], byte[]> kafkaTemplateImmediateSending;
    private final KafkaTemplate<byte[], byte[]> kafkaTemplateScheduledSending;
    private final TransactionalOutboxConfiguration config;
    private final OutboxTracing outboxTracing;
    private final Optional<TracingKafkaTemplateFactory> tracingKafkaTemplateFactory;
    private final Optional<OutboxMetrics> outboxMetrics;
    private final Optional<SignatureService> signatureService;
    private final String bootstrapServers;

    KafkaDeferredMessageSender(ProducerFactory<byte[], byte[]> producerFactory, TransactionalOutboxConfiguration config, OutboxTracing outboxTracing, Optional<TracingKafkaTemplateFactory> tracingKafkaTemplateFactory, Optional<OutboxMetrics> outboxMetrics, Optional<SignatureService> signatureService) {
        this.outboxTracing = outboxTracing;
        this.tracingKafkaTemplateFactory = tracingKafkaTemplateFactory;
        this.outboxMetrics = outboxMetrics;
        this.signatureService = signatureService;
        this.bootstrapServers = (String)producerFactory.getConfigurationProperties().get("bootstrap.servers");
        this.kafkaTemplateImmediateSending = this.createKafkaTemplate(producerFactory, Map.of("max.block.ms", config.getMessageSendImmediatelyMaxBlockTime().toMillis(), "request.timeout.ms", this.getIntMillis(config.getMessageSendImmediatelyTimeout()), "linger.ms", 0, "delivery.timeout.ms", this.getIntMillis(config.getMessageSendImmediatelyTimeout())));
        this.kafkaTemplateScheduledSending = this.createKafkaTemplate(producerFactory, Map.of("max.block.ms", config.getMessageSendScheduledMaxBlockTime().toMillis(), "request.timeout.ms", this.getIntMillis(config.getMessageSendScheduledTimeout()), "linger.ms", 0, "delivery.timeout.ms", this.getIntMillis(config.getMessageSendScheduledTimeout())));
        this.config = config;
    }

    private int getIntMillis(Duration duration) {
        return Long.valueOf(duration.toMillis()).intValue();
    }

    private KafkaTemplate<byte[], byte[]> createKafkaTemplate(ProducerFactory<byte[], byte[]> producerFactory, Map<String, Object> additionalConfig) {
        HashMap<String, Object> producerFactoryConfig = new HashMap<String, Object>(producerFactory.getConfigurationProperties());
        producerFactoryConfig.remove("interceptor.classes");
        producerFactoryConfig.putAll(additionalConfig);
        if (this.tracingKafkaTemplateFactory.isPresent()) {
            return this.tracingKafkaTemplateFactory.get().createKafkaTemplate(producerFactoryConfig, this.byteArraySerializer(), this.byteArraySerializer());
        }
        return new KafkaTemplate((ProducerFactory)new DefaultKafkaProducerFactory(producerFactoryConfig, this::byteArraySerializer, this::byteArraySerializer));
    }

    private Serializer<byte[]> byteArraySerializer() {
        return new ByteArraySerializer();
    }

    @Override
    @Timed(value="outbox_messages_transmit", extraTags={"delivery_type", "immediate"}, description="Outbox message transmits for immediate delivery.")
    public void sendAsImmediate(DeferredMessage deferredMessage) {
        this.send(deferredMessage, this.config.getMessageSendImmediatelyTimeout(), this.kafkaTemplateImmediateSending);
    }

    @Override
    @Timed(value="outbox_messages_transmit", extraTags={"delivery_type", "scheduled"}, description="Outbox message transmits for scheduled delivery.")
    public void sendAsScheduled(DeferredMessage deferredMessage) {
        this.send(deferredMessage, this.config.getMessageSendScheduledTimeout(), this.kafkaTemplateScheduledSending);
    }

    private void send(DeferredMessage deferredMessage, Duration sendTimeout, KafkaTemplate<byte[], byte[]> kafkaTemplate) {
        byte[] key = deferredMessage.getKey();
        byte[] message = deferredMessage.getMessage();
        String topic = deferredMessage.getTopic();
        long sendFutureTimeoutMillis = sendTimeout.toMillis() + 500L;
        DeferredMessageLogArgument deferredMessageLogArgument = DeferredMessageLogArgument.from(deferredMessage);
        this.outboxTracing.updateCurrentTraceContext(deferredMessage.getTraceContext());
        ProducerRecord producerRecord = new ProducerRecord(topic, (Object)key, (Object)message);
        this.injectSignatureHeadersIfNeeded((ProducerRecord<byte[], byte[]>)producerRecord, message, key);
        try {
            log.debug("Sending message {} to Kafka with a timeout of {} millis.", (Object)deferredMessageLogArgument, (Object)sendFutureTimeoutMillis);
            kafkaTemplate.send(producerRecord).get(sendFutureTimeoutMillis, TimeUnit.MILLISECONDS);
            this.outboxMetrics.ifPresent(metrics -> metrics.countMessagingSend(this.bootstrapServers, topic, deferredMessage.getMessageTypeName(), deferredMessage.getMessageTypeVersion()));
            log.debug("Successfully sent {}.", (Object)deferredMessageLogArgument);
        }
        catch (InterruptedException ie) {
            log.error("Failed sending {}.", (Object)deferredMessageLogArgument);
            Thread.currentThread().interrupt();
            this.convertException(deferredMessage, ie);
        }
        catch (Exception e) {
            log.error("Failed sending {}.", (Object)deferredMessageLogArgument);
            this.convertException(deferredMessage, e);
        }
    }

    private void injectSignatureHeadersIfNeeded(ProducerRecord<byte[], byte[]> producerRecord, byte[] message, byte[] key) {
        this.signatureService.ifPresent(service -> {
            Headers headers = producerRecord.headers();
            service.injectSignature(headers, message, false);
            if (key != null) {
                service.injectSignature(headers, key, true);
            }
        });
    }

    private void convertException(DeferredMessage deferredMessage, Exception e) {
        if (e instanceof KafkaException) {
            Throwable mostSpecificCause = ((KafkaException)e).getMostSpecificCause();
            if (mostSpecificCause instanceof InvalidTopicException) {
                throw DeferredMessageSendException.invalidTopicException(deferredMessage, e);
            }
            if (mostSpecificCause instanceof TopicAuthorizationException) {
                throw DeferredMessageSendException.topicAuthorizationException(deferredMessage, e);
            }
            if (mostSpecificCause instanceof RecordTooLargeException) {
                throw DeferredMessageSendException.messageTooLargeException(deferredMessage, e);
            }
        }
        throw DeferredMessageSendException.generalSendException(deferredMessage, e);
    }
}

