/*
 * Decompiled with CFR 0.152.
 */
package ch.admin.bit.jeap.messaging.transactionaloutbox.outbox;

import ch.admin.bit.jeap.messaging.avro.MessageVersionAccessor;
import ch.admin.bit.jeap.messaging.kafka.contract.ContractsValidator;
import ch.admin.bit.jeap.messaging.kafka.interceptor.Callbacks;
import ch.admin.bit.jeap.messaging.kafka.interceptor.JeapKafkaMessageCallback;
import ch.admin.bit.jeap.messaging.model.Message;
import ch.admin.bit.jeap.messaging.transactionaloutbox.outbox.AfterCommitMessageSender;
import ch.admin.bit.jeap.messaging.transactionaloutbox.outbox.DeferredMessage;
import ch.admin.bit.jeap.messaging.transactionaloutbox.outbox.DeferredMessageLogArgument;
import ch.admin.bit.jeap.messaging.transactionaloutbox.outbox.DeferredMessageRepository;
import ch.admin.bit.jeap.messaging.transactionaloutbox.outbox.FailedMessage;
import ch.admin.bit.jeap.messaging.transactionaloutbox.outbox.FailedMessageRepository;
import ch.admin.bit.jeap.messaging.transactionaloutbox.outbox.MessageSerializer;
import ch.admin.bit.jeap.messaging.transactionaloutbox.outbox.OutboxMetrics;
import ch.admin.bit.jeap.messaging.transactionaloutbox.outbox.OutboxTracing;
import ch.admin.bit.jeap.messaging.transactionaloutbox.outbox.TransactionalOutboxException;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Optional;
import lombok.Generated;
import net.logstash.logback.argument.StructuredArguments;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

public class TransactionalOutbox {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(TransactionalOutbox.class);
    private final String clusterName;
    private final MessageSerializer serializer;
    private final DeferredMessageRepository deferredMessageRepository;
    private final FailedMessageRepository failedMessageRepository;
    private final AfterCommitMessageSender afterCommitMessageSender;
    private final ContractsValidator contractsValidator;
    private final Optional<OutboxMetrics> outboxMetrics;
    private final OutboxTracing outboxTracing;
    private final List<JeapKafkaMessageCallback> callbacks;

    @Transactional(propagation=Propagation.MANDATORY)
    public void sendMessage(Message message, String topic) {
        this.sendMessage(message, null, topic);
    }

    @Transactional(propagation=Propagation.MANDATORY)
    public void sendMessageScheduled(Message message, String topic) {
        this.sendMessageScheduled(message, null, topic);
    }

    @Transactional(propagation=Propagation.MANDATORY)
    public void sendMessage(Message message, Object key, String topic) {
        this.sendMessage(message, key, topic, true);
    }

    @Transactional(propagation=Propagation.MANDATORY)
    public void sendMessageScheduled(Message message, Object key, String topic) {
        this.sendMessage(message, key, topic, false);
    }

    public int countFailedMessages(boolean resend) {
        return this.failedMessageRepository.countFailedMessages(resend);
    }

    public int countFailedMessages(ZonedDateTime failedStartingFrom, ZonedDateTime failedBefore, boolean resend) {
        return this.failedMessageRepository.countFailedMessages(failedStartingFrom, failedBefore, resend);
    }

    public List<FailedMessage> findFailedMessages(ZonedDateTime failedStartingFrom, ZonedDateTime failedBefore, boolean resend, int maxNumMessagesToFind) {
        return this.failedMessageRepository.findFailedMessages(failedStartingFrom, failedBefore, resend, maxNumMessagesToFind);
    }

    public List<FailedMessage> findFailedMessages(long afterId, ZonedDateTime failedBefore, boolean resend, int maxNumMessagesToFind) {
        return this.failedMessageRepository.findFailedMessages(afterId, failedBefore, resend, maxNumMessagesToFind);
    }

    public void resendMessageScheduled(long id) {
        this.deferredMessageRepository.markForResend(id, true);
    }

    private void sendMessage(Message message, Object key, String topic, boolean sendImmediately) {
        this.ensurePublisherContract(message, topic);
        byte[] serializedMessage = this.serializer.serializeMessage(message, topic);
        byte[] serializedKey = Optional.ofNullable(key).map(nonNullKey -> this.serializer.serializeKey(nonNullKey, topic)).orElse(null);
        DeferredMessage newDeferredMessage = DeferredMessage.builder().message(serializedMessage).key(serializedKey).clusterName(this.clusterName).topic(topic).messageId(message.getIdentity().getId()).messageIdempotenceId(message.getIdentity().getIdempotenceId()).messageTypeName(message.getType().getName()).messageTypeVersion(MessageVersionAccessor.getGeneratedVersion((Class)message.getClass())).sendImmediately(sendImmediately).traceContext(this.outboxTracing.retrieveCurrentTraceContext()).build();
        DeferredMessage persistedDeferredMessage = this.deferredMessageRepository.save(newDeferredMessage);
        log.debug("Persisted {}.", (Object)DeferredMessageLogArgument.from(persistedDeferredMessage));
        if (sendImmediately) {
            this.afterCommitMessageSender.sendImmediatelyAfterTransactionCommit(persistedDeferredMessage);
        }
        this.outboxMetrics.ifPresent(metrics -> metrics.countTransactionalSend(sendImmediately));
        this.invokeOnSendCallbacks(message, topic);
    }

    private void invokeOnSendCallbacks(Message msg, String topic) {
        this.callbacks.forEach(callback -> Callbacks.invokeCallback((Message)msg, (String)topic, (arg_0, arg_1) -> ((JeapKafkaMessageCallback)callback).onSend(arg_0, arg_1)));
    }

    private void ensurePublisherContract(Message message, String topic) {
        try {
            this.contractsValidator.ensurePublisherContract(message.getType(), topic);
        }
        catch (Exception e) {
            log.error("Contract validation for message with {}, {} and {} failed.", new Object[]{StructuredArguments.kv((String)"messageId", (Object)message.getIdentity().getId()), StructuredArguments.kv((String)"messageIdempotenceId", (Object)message.getIdentity().getIdempotenceId()), StructuredArguments.kv((String)"messageType", (Object)message.getType().getName()), e});
            throw TransactionalOutboxException.contractValidationFailed(message, e);
        }
    }

    @Generated
    public TransactionalOutbox(String clusterName, MessageSerializer serializer, DeferredMessageRepository deferredMessageRepository, FailedMessageRepository failedMessageRepository, AfterCommitMessageSender afterCommitMessageSender, ContractsValidator contractsValidator, Optional<OutboxMetrics> outboxMetrics, OutboxTracing outboxTracing, List<JeapKafkaMessageCallback> callbacks) {
        this.clusterName = clusterName;
        this.serializer = serializer;
        this.deferredMessageRepository = deferredMessageRepository;
        this.failedMessageRepository = failedMessageRepository;
        this.afterCommitMessageSender = afterCommitMessageSender;
        this.contractsValidator = contractsValidator;
        this.outboxMetrics = outboxMetrics;
        this.outboxTracing = outboxTracing;
        this.callbacks = callbacks;
    }
}

