/*
 * Decompiled with CFR 0.152.
 */
package ch.admin.bit.jeap.messaging.kafka.errorhandling;

import ch.admin.bit.jeap.messaging.avro.AvroMessage;
import ch.admin.bit.jeap.messaging.avro.AvroMessageKey;
import ch.admin.bit.jeap.messaging.avro.errorevent.MessageHandlerException;
import ch.admin.bit.jeap.messaging.avro.errorevent.MessageHandlerExceptionInformation;
import ch.admin.bit.jeap.messaging.avro.errorevent.MessageProcessingFailedEvent;
import ch.admin.bit.jeap.messaging.avro.errorevent.MessageProcessingFailedEventBuilder;
import ch.admin.bit.jeap.messaging.avro.errorevent.MessageProcessingFailedMessageKey;
import ch.admin.bit.jeap.messaging.kafka.errorhandling.ClusterNameHeaderInterceptor;
import ch.admin.bit.jeap.messaging.kafka.errorhandling.ErrorSerializedMessageHolder;
import ch.admin.bit.jeap.messaging.kafka.errorhandling.ErrorServiceFailedHandler;
import ch.admin.bit.jeap.messaging.kafka.errorhandling.StackTraceHasher;
import ch.admin.bit.jeap.messaging.kafka.properties.KafkaProperties;
import ch.admin.bit.jeap.messaging.kafka.spring.JeapKafkaBeanNames;
import ch.admin.bit.jeap.messaging.kafka.tracing.TracerBridge;
import ch.admin.bit.jeap.messaging.model.Message;
import java.util.concurrent.CompletableFuture;
import lombok.Generated;
import net.logstash.logback.argument.StructuredArguments;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ConsumerRecordRecoverer;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.messaging.converter.MessageConversionException;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.BackOffExecution;

public class ErrorServiceSender
implements ConsumerRecordRecoverer {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(ErrorServiceSender.class);
    private final BeanFactory beanFactory;
    private final KafkaProperties kafkaProperties;
    private final ErrorServiceFailedHandler errorServiceFailedHandler;
    private final BackOff backOff;
    private final TracerBridge tracerBridge;
    private final JeapKafkaBeanNames jeapKafkaBeanNames;
    private final StackTraceHasher stackTraceHasher;

    public ErrorServiceSender(BeanFactory beanFactory, KafkaProperties kafkaProperties, ErrorServiceFailedHandler errorServiceFailedHandler, BackOff backOff, TracerBridge tracerBridge, StackTraceHasher stackTraceHasher) {
        this.beanFactory = beanFactory;
        this.kafkaProperties = kafkaProperties;
        this.errorServiceFailedHandler = errorServiceFailedHandler;
        this.backOff = backOff;
        this.tracerBridge = tracerBridge;
        this.jeapKafkaBeanNames = new JeapKafkaBeanNames(kafkaProperties.getDefaultClusterName());
        this.stackTraceHasher = stackTraceHasher;
    }

    private static MessageHandlerExceptionInformation createSerializationExceptionInformation(ErrorSerializedMessageHolder errorSerializedMessageHolder, String errorMessage) {
        Exception cause = new Exception(errorMessage, errorSerializedMessageHolder.getCause());
        return MessageHandlerException.builder().errorCode(MessageHandlerExceptionInformation.StandardErrorCodes.DESERIALIZATION_FAILED.name()).temporality(MessageHandlerExceptionInformation.Temporality.PERMANENT).cause((Throwable)cause).build();
    }

    public void accept(ConsumerRecord<?, ?> consumerRecord, Exception exception) {
        this.restoreOriginalTracingHeaders(consumerRecord);
        try (TracerBridge.TracerBridgeElement ignored = this.getSpan(consumerRecord);){
            this.createAndSendMessageProcessingFailedEvent(consumerRecord, exception);
        }
    }

    private void restoreOriginalTracingHeaders(ConsumerRecord<?, ?> record) {
        if (this.tracerBridge != null) {
            this.tracerBridge.restoreOriginalTraceContext(record);
        }
    }

    private TracerBridge.TracerBridgeElement getSpan(ConsumerRecord<?, ?> record) {
        if (this.tracerBridge != null) {
            return this.tracerBridge.getSpan(record);
        }
        return () -> {};
    }

    private void createAndSendMessageProcessingFailedEvent(ConsumerRecord<?, ?> consumerRecord, Exception exception) {
        MessageProcessingFailedEvent event = this.createMessageProcessingFailedEvent(consumerRecord, exception);
        BackOffExecution backOffExecution = this.backOff.start();
        MessageProcessingFailedMessageKey key = this.createMessageProcessingFailedMessageKey(consumerRecord, event.getIdentity().getIdempotenceId());
        String clusterName = this.getErrorHandlingTopicClusterName(consumerRecord);
        this.sendMessageProcessingFailedEventWithRetry(key, event, backOffExecution, clusterName);
    }

    private String getErrorHandlingTopicClusterName(ConsumerRecord<?, ?> consumerRecord) {
        if (this.kafkaProperties.hasDefaultProducerClusterOverride()) {
            return this.kafkaProperties.getDefaultProducerClusterOverride();
        }
        String clusterName = ClusterNameHeaderInterceptor.getClusterName(consumerRecord);
        if (clusterName == null) {
            clusterName = this.kafkaProperties.getDefaultClusterName();
        }
        return clusterName;
    }

    private MessageProcessingFailedEvent createMessageProcessingFailedEvent(ConsumerRecord<?, ?> consumerRecord, Exception exception) {
        MessageHandlerExceptionInformation exceptionInformation = this.getExceptionInformation(consumerRecord, exception);
        MessageProcessingFailedEvent event = MessageProcessingFailedEventBuilder.create().systemName(this.kafkaProperties.getSystemName()).serviceName(this.kafkaProperties.getServiceName()).originalMessage(consumerRecord, new String[]{"jeap_encrypted_value", "jeap-cert", "jeap-sign", "jeap-sign-key"}).eventHandleException(exceptionInformation).stackTraceMaxLength(this.kafkaProperties.getErrorEventStackTraceMaxLength()).stackTraceHash(this.getStackTraceHash(exception)).build();
        this.logError(consumerRecord.value(), exception);
        return event;
    }

    private String getStackTraceHash(Exception exception) {
        if (this.kafkaProperties.isErrorStackTraceHashEnabled()) {
            return this.stackTraceHasher.hash(exception);
        }
        return null;
    }

    private MessageProcessingFailedMessageKey createMessageProcessingFailedMessageKey(ConsumerRecord<?, ?> originalMessage, String fallbackKeyValue) {
        String key;
        try {
            Message message = (Message)originalMessage.value();
            key = message.getIdentity().getId();
        }
        catch (Exception ex) {
            key = fallbackKeyValue;
        }
        return MessageProcessingFailedMessageKey.newBuilder().setKey(key).build();
    }

    private void sendMessageProcessingFailedEventWithRetry(MessageProcessingFailedMessageKey key, MessageProcessingFailedEvent event, BackOffExecution backOffExecution, String clusterName) {
        int attempt = 1;
        while (true) {
            try {
                KafkaTemplate<AvroMessageKey, AvroMessage> kafkaTemplate = this.getKafkaTemplate(clusterName);
                CompletableFuture future = kafkaTemplate.send(this.kafkaProperties.getErrorTopicName(clusterName), (Object)key, (Object)event);
                future.get();
                log.debug("Successfully sent error event using cluster {}", (Object)clusterName);
                return;
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("Error sending interrupted");
            }
            catch (Exception e) {
                long nextBackOff = backOffExecution.nextBackOff();
                if (nextBackOff == -1L) {
                    log.error("Could not send error message to error handling service (" + attempt + ") and do not retry any more", (Throwable)e);
                    this.errorServiceFailedHandler.handle(e);
                    return;
                }
                log.error("Could not send error message to error handling service (" + attempt + "), retry", (Throwable)e);
                try {
                    Thread.sleep(nextBackOff);
                }
                catch (InterruptedException e2) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException("Error sending interrupted");
                }
                ++attempt;
                continue;
            }
            break;
        }
    }

    private KafkaTemplate<AvroMessageKey, AvroMessage> getKafkaTemplate(String clusterName) {
        String kafkaTemplateBeanName = this.jeapKafkaBeanNames.getKafkaTemplateBeanName(clusterName);
        return (KafkaTemplate)this.beanFactory.getBean(kafkaTemplateBeanName);
    }

    private void logError(Object value, Exception e) {
        if (value instanceof Message) {
            Message msg = (Message)value;
            String messageId = msg.getIdentity().getId();
            log.warn("{} cannot be consumed and will be sent to error service", (Object)StructuredArguments.keyValue((String)"messageId", (Object)messageId), (Object)e);
        } else {
            log.warn("Got a non deserializable message that cannot be consumed and will be sent to error service", (Throwable)e);
        }
    }

    private MessageHandlerExceptionInformation getExceptionInformation(ConsumerRecord<?, ?> consumerRecord, Exception exception) {
        if (!(exception instanceof ListenerExecutionFailedException)) {
            return MessageHandlerException.builder().description(null).errorCode(MessageHandlerExceptionInformation.StandardErrorCodes.UNKNOWN_EXCEPTION.name()).temporality(MessageHandlerExceptionInformation.Temporality.UNKNOWN).cause((Throwable)exception).build();
        }
        ListenerExecutionFailedException listenerException = (ListenerExecutionFailedException)exception;
        if (consumerRecord.key() instanceof ErrorSerializedMessageHolder) {
            ErrorSerializedMessageHolder errorSerializedMessageHolder = (ErrorSerializedMessageHolder)consumerRecord.key();
            return ErrorServiceSender.createSerializationExceptionInformation(errorSerializedMessageHolder, "Could not deserialize key");
        }
        if (consumerRecord.value() instanceof ErrorSerializedMessageHolder) {
            ErrorSerializedMessageHolder errorSerializedMessageHolder = (ErrorSerializedMessageHolder)consumerRecord.value();
            return ErrorServiceSender.createSerializationExceptionInformation(errorSerializedMessageHolder, "Could not deserialize value");
        }
        Throwable cause = listenerException.getCause();
        if (cause instanceof MessageConversionException) {
            return MessageHandlerException.builder().errorCode(MessageHandlerExceptionInformation.StandardErrorCodes.WRONG_EVENT_TYPE.name()).temporality(MessageHandlerExceptionInformation.Temporality.PERMANENT).cause(cause).build();
        }
        return this.convertToValidExceptionInformation(cause);
    }

    private MessageHandlerExceptionInformation convertToValidExceptionInformation(Throwable cause) {
        if (!(cause instanceof MessageHandlerExceptionInformation)) {
            return MessageHandlerException.builder().errorCode(MessageHandlerExceptionInformation.StandardErrorCodes.UNKNOWN_EXCEPTION.name()).temporality(MessageHandlerExceptionInformation.Temporality.UNKNOWN).cause(cause).build();
        }
        MessageHandlerExceptionInformation inf = (MessageHandlerExceptionInformation)cause;
        if (inf.getErrorCode() != null && inf.getMessage() != null && inf.getTemporality() != null) {
            log.debug("errorCode {}, message {}, temporality {} set, returning exception to builder", new Object[]{inf.getErrorCode(), inf.getMessage(), inf.getTemporality()});
            return inf;
        }
        log.error("Got an invalid exception information. Please fix your code!");
        log.error("errorCode {}, message {} or temporality {} are missing in the exception", new Object[]{inf.getErrorCode(), inf.getMessage(), inf.getTemporality()});
        return MessageHandlerException.builder().description(inf.getDescription()).errorCode(inf.getErrorCode() != null ? inf.getErrorCode() : MessageHandlerExceptionInformation.StandardErrorCodes.INVALID_EXCEPTION.name()).temporality(inf.getTemporality() != null ? inf.getTemporality() : MessageHandlerExceptionInformation.Temporality.UNKNOWN).cause(cause).build();
    }
}

