/*
 * Decompiled with CFR 0.152.
 */
package ch.admin.bit.jeap.messaging.sequentialinbox.inbox;

import ch.admin.bit.jeap.messaging.kafka.errorhandling.ErrorServiceSender;
import ch.admin.bit.jeap.messaging.sequentialinbox.configuration.model.Sequence;
import ch.admin.bit.jeap.messaging.sequentialinbox.configuration.model.SequencedMessageType;
import ch.admin.bit.jeap.messaging.sequentialinbox.configuration.model.SequentialInboxConfiguration;
import ch.admin.bit.jeap.messaging.sequentialinbox.inbox.BufferedMessageTracing;
import ch.admin.bit.jeap.messaging.sequentialinbox.inbox.DeserializedMessage;
import ch.admin.bit.jeap.messaging.sequentialinbox.inbox.FailedConsumerRecord;
import ch.admin.bit.jeap.messaging.sequentialinbox.inbox.MessageHandlerService;
import ch.admin.bit.jeap.messaging.sequentialinbox.inbox.SequentialInboxDeserializer;
import ch.admin.bit.jeap.messaging.sequentialinbox.jpa.MessageRepository;
import ch.admin.bit.jeap.messaging.sequentialinbox.metrics.SequentialInboxMetricsCollector;
import ch.admin.bit.jeap.messaging.sequentialinbox.persistence.BufferedMessage;
import ch.admin.bit.jeap.messaging.sequentialinbox.persistence.SequenceInstance;
import ch.admin.bit.jeap.messaging.sequentialinbox.persistence.SequencedMessage;
import ch.admin.bit.jeap.messaging.sequentialinbox.persistence.SequencedMessageState;
import ch.admin.bit.jeap.messaging.sequentialinbox.spring.SequentialInboxException;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
class BufferedMessageService {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(BufferedMessageService.class);
    private final ErrorServiceSender errorServiceSender;
    private final SequentialInboxDeserializer inboxDeserializer;
    private final MessageHandlerService messageHandlerService;
    private final MessageRepository messageRepository;
    private final SequentialInboxConfiguration sequentialInboxConfiguration;
    private final BufferedMessageTracing bufferedMessageTracing;
    private final SequentialInboxMetricsCollector metricsCollector;

    boolean processBufferedMessages(SequenceInstance sequenceInstance, Sequence sequence) {
        boolean waitingMessageProcessed;
        List<SequencedMessage> waitingAndProcessedMessages = this.messageRepository.getWaitingAndProcessedMessagesInNewTransaction(sequenceInstance);
        List<SequencedMessage> waitingMessages = BufferedMessageService.waitingMessagesInModifiableList(waitingAndProcessedMessages);
        Set<String> processedMessageTypes = BufferedMessageService.processedMessageTypes(waitingAndProcessedMessages);
        do {
            Optional<SequencedMessage> nextWaitingMessageReadyToBeProcessed;
            if ((nextWaitingMessageReadyToBeProcessed = waitingMessages.stream().filter(sequencedMessage -> this.sequencedMessageType((SequencedMessage)sequencedMessage).isReleaseConditionSatisfied(processedMessageTypes)).findFirst()).isPresent()) {
                log.debug("Next waiting message ready to be processed: {}", nextWaitingMessageReadyToBeProcessed);
                SequencedMessage sequencedMessage2 = nextWaitingMessageReadyToBeProcessed.get();
                waitingMessages.remove(sequencedMessage2);
                waitingMessageProcessed = true;
                boolean success = this.handleBufferedMessage(sequencedMessage2);
                if (!success) continue;
                processedMessageTypes.add(sequencedMessage2.getMessageType());
                continue;
            }
            log.debug("No waiting message ready to be processed in sequence {}", (Object)sequenceInstance);
            waitingMessageProcessed = false;
        } while (waitingMessageProcessed);
        return sequence.isComplete(processedMessageTypes);
    }

    private SequencedMessageType sequencedMessageType(SequencedMessage sequencedMessage) {
        return this.sequentialInboxConfiguration.requireSequencedMessageTypeByQualifiedName(sequencedMessage.getMessageType());
    }

    private boolean handleBufferedMessage(SequencedMessage sequencedMessage) {
        try (BufferedMessageTracing.TraceContextRestorer ignored = this.bufferedMessageTracing.updateCurrentTraceContext(sequencedMessage.getTraceContext());){
            Optional<DeserializedMessage> deserializedMessage = this.getDeserializedMessage(sequencedMessage);
            if (deserializedMessage.isEmpty()) {
                log.debug("Deserialization failed for message {}", (Object)sequencedMessage);
                boolean bl = false;
                return bl;
            }
            try {
                log.debug("Processing buffered message {}", (Object)sequencedMessage);
                this.recordWaitingMessageCompletedTimer(sequencedMessage);
                this.messageHandlerService.handle(deserializedMessage.get());
                this.messageRepository.setMessageStateInNewTransaction(sequencedMessage, SequencedMessageState.PROCESSED);
                log.debug("Processed buffered message {}", (Object)sequencedMessage);
            }
            catch (Exception ex) {
                FailedConsumerRecord failedConsumerRecord = FailedConsumerRecord.of(sequencedMessage, deserializedMessage.get().key(), deserializedMessage.get().message());
                this.sendMessageToErrorHandlerAndMarkFailed(sequencedMessage, ex, failedConsumerRecord);
                boolean bl = false;
                if (ignored != null) {
                    ignored.close();
                }
                return bl;
            }
        }
        return true;
    }

    private void recordWaitingMessageCompletedTimer(SequencedMessage sequencedMessage) {
        if (sequencedMessage.getState() == SequencedMessageState.WAITING) {
            Duration waitDuration = Duration.between(sequencedMessage.getCreatedAt(), ZonedDateTime.now());
            this.metricsCollector.onWaitingMessageCompleted(sequencedMessage.getMessageType(), waitDuration);
        }
    }

    private Optional<DeserializedMessage> getDeserializedMessage(SequencedMessage sequencedMessage) {
        DeserializedMessage deserializedMessage;
        BufferedMessage bufferedMessage = this.messageRepository.getBufferedMessageInNewTransaction(sequencedMessage);
        try {
            deserializedMessage = this.inboxDeserializer.deserialize(sequencedMessage, bufferedMessage);
        }
        catch (Exception ex) {
            FailedConsumerRecord failedConsumerRecord = FailedConsumerRecord.of(sequencedMessage, bufferedMessage);
            this.sendMessageToErrorHandlerAndMarkFailed(sequencedMessage, ex, failedConsumerRecord);
            return Optional.empty();
        }
        if (deserializedMessage.deserializationFailed()) {
            this.sendMessageToErrorHandlerAndMarkFailed(sequencedMessage, SequentialInboxException.deserializationFailed(sequencedMessage), FailedConsumerRecord.of(sequencedMessage, deserializedMessage));
            return Optional.empty();
        }
        return Optional.of(deserializedMessage);
    }

    private void sendMessageToErrorHandlerAndMarkFailed(SequencedMessage sequencedMessage, Exception ex, FailedConsumerRecord record) {
        this.errorServiceSender.accept((ConsumerRecord)record, ex);
        this.messageRepository.setMessageStateInNewTransaction(sequencedMessage, SequencedMessageState.FAILED);
    }

    private static List<SequencedMessage> waitingMessagesInModifiableList(List<SequencedMessage> waitingAndProcessedMessages) {
        return waitingAndProcessedMessages.stream().filter(sequencedMessage -> sequencedMessage.getState() == SequencedMessageState.WAITING).collect(Collectors.toCollection(ArrayList::new));
    }

    private static Set<String> processedMessageTypes(List<SequencedMessage> waitingAndProcessedMessages) {
        return waitingAndProcessedMessages.stream().filter(sequencedMessage -> sequencedMessage.getState() == SequencedMessageState.PROCESSED).map(SequencedMessage::getMessageType).collect(Collectors.toSet());
    }

    @Generated
    public BufferedMessageService(ErrorServiceSender errorServiceSender, SequentialInboxDeserializer inboxDeserializer, MessageHandlerService messageHandlerService, MessageRepository messageRepository, SequentialInboxConfiguration sequentialInboxConfiguration, BufferedMessageTracing bufferedMessageTracing, SequentialInboxMetricsCollector metricsCollector) {
        this.errorServiceSender = errorServiceSender;
        this.inboxDeserializer = inboxDeserializer;
        this.messageHandlerService = messageHandlerService;
        this.messageRepository = messageRepository;
        this.sequentialInboxConfiguration = sequentialInboxConfiguration;
        this.bufferedMessageTracing = bufferedMessageTracing;
        this.metricsCollector = metricsCollector;
    }
}

