/*
 * Decompiled with CFR 0.152.
 */
package ch.admin.bit.jeap.messaging.sequentialinbox.inbox;

import ch.admin.bit.jeap.messaging.avro.AvroMessage;
import ch.admin.bit.jeap.messaging.avro.AvroMessageKey;
import ch.admin.bit.jeap.messaging.sequentialinbox.configuration.model.Sequence;
import ch.admin.bit.jeap.messaging.sequentialinbox.configuration.model.SequencedMessageType;
import ch.admin.bit.jeap.messaging.sequentialinbox.configuration.model.SequentialInboxConfiguration;
import ch.admin.bit.jeap.messaging.sequentialinbox.inbox.BufferedMessageService;
import ch.admin.bit.jeap.messaging.sequentialinbox.inbox.MessageHandlerService;
import ch.admin.bit.jeap.messaging.sequentialinbox.inbox.SequenceInstanceFactory;
import ch.admin.bit.jeap.messaging.sequentialinbox.inbox.SequencedMessageService;
import ch.admin.bit.jeap.messaging.sequentialinbox.inbox.Transactions;
import ch.admin.bit.jeap.messaging.sequentialinbox.persistence.SequenceInstance;
import ch.admin.bit.jeap.messaging.sequentialinbox.persistence.SequencedMessage;
import ch.admin.bit.jeap.messaging.sequentialinbox.persistence.SequencedMessageState;
import ch.admin.bit.jeap.messaging.sequentialinbox.spring.SequentialInboxMessageHandler;
import io.micrometer.core.annotation.Timed;
import java.util.Optional;
import lombok.Generated;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Component
public class SequentialInboxService {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(SequentialInboxService.class);
    private final SequenceInstanceFactory sequenceInstanceFactory;
    private final SequencedMessageService sequencedMessageService;
    private final SequentialInboxConfiguration inboxConfiguration;
    private final Transactions tx;
    private final MessageHandlerService messageHandlerService;
    private final BufferedMessageService bufferedMessageService;

    @Timed(value="jeap.sequentialinbox.handlemessage", histogram=true, percentiles={0.5, 0.95, 0.99})
    public void handleMessage(ConsumerRecord<AvroMessageKey, AvroMessage> consumerRecord, SequentialInboxMessageHandler messageHandler, Acknowledgment acknowledgment) {
        String messageTypeName;
        SequencedMessageType sequencedMessageType;
        AvroMessage avroMessage = (AvroMessage)consumerRecord.value();
        String contextId = this.getContextId(avroMessage, sequencedMessageType = this.inboxConfiguration.requireSequencedMessageTypeByName(messageTypeName = avroMessage.getType().getName()));
        if (contextId == null) {
            log.debug("Message {} is filtered out from sequencing, handling immediately", (Object)avroMessage);
            messageHandler.invoke((AvroMessageKey)consumerRecord.key(), avroMessage);
            acknowledgment.acknowledge();
            return;
        }
        Sequence sequence = this.inboxConfiguration.getSequenceByMessageTypeName(messageTypeName);
        log.info("Handling message {} ({}) in sequence {} with context ID {}", new Object[]{avroMessage.getType().getName(), avroMessage.getIdentity().getId(), sequence.getName(), contextId});
        long sequenceInstanceId = this.sequenceInstanceFactory.createOrGetSequenceInstance(sequence, contextId);
        this.tx.runInNewTransaction(() -> {
            SequenceInstance sequenceInstance = this.sequenceInstanceFactory.getExistingSequenceInstanceAndLockForUpdate(sequenceInstanceId);
            boolean sequenceComplete = this.tx.callInSuspendedTransaction(() -> this.handleMessage(consumerRecord, messageHandler, sequencedMessageType, sequenceInstance, sequence, contextId));
            if (sequenceComplete) {
                sequenceInstance.close();
            }
        });
        acknowledgment.acknowledge();
    }

    private String getContextId(AvroMessage avroMessage, SequencedMessageType sequencedMessageType) {
        if (!sequencedMessageType.shouldSequenceMessage(avroMessage)) {
            return null;
        }
        return sequencedMessageType.extractContextId(avroMessage);
    }

    private boolean handleMessage(ConsumerRecord<AvroMessageKey, AvroMessage> consumerRecord, SequentialInboxMessageHandler messageHandler, SequencedMessageType sequencedMessageType, SequenceInstance sequenceInstance, Sequence sequence, String contextId) {
        AvroMessage avroMessage = (AvroMessage)consumerRecord.value();
        String messageTypeName = avroMessage.getType().getName();
        Optional<SequencedMessage> existingSequencedMessage = this.sequencedMessageService.findByMessageTypeAndIdempotenceId(messageTypeName, avroMessage.getIdentity().getIdempotenceId());
        if (!SequentialInboxService.isAlreadyProcessedOrWaiting(existingSequencedMessage)) {
            if (!this.sequencedMessageService.isReleaseConditionSatisfied(sequencedMessageType, sequenceInstance)) {
                this.bufferMessage(consumerRecord, avroMessage, sequence, contextId, existingSequencedMessage, sequenceInstance);
                return false;
            }
            this.invokeMessageHandler(consumerRecord, messageHandler, existingSequencedMessage, sequenceInstance);
        } else {
            log.info("Message {} (id={}) has already been processed with idempotence ID {}, skipping listener invocation", new Object[]{messageTypeName, avroMessage.getIdentity().getId(), avroMessage.getIdentity().getIdempotenceId()});
        }
        return this.bufferedMessageService.processBufferedMessages(sequenceInstance, sequence);
    }

    private void invokeMessageHandler(ConsumerRecord<AvroMessageKey, AvroMessage> consumerRecord, SequentialInboxMessageHandler messageHandler, Optional<SequencedMessage> existingSequencedMessage, SequenceInstance sequenceInstance) {
        AvroMessage avroMessage = (AvroMessage)consumerRecord.value();
        String messageTypeName = avroMessage.getType().getName();
        try {
            log.debug("Invoking message handler for message {} (id={})", (Object)messageTypeName, (Object)avroMessage.getIdentity().getId());
            this.messageHandlerService.invokeMessageHandler((AvroMessageKey)consumerRecord.key(), avroMessage, messageHandler);
            this.sequencedMessageService.storeSequencedMessage(existingSequencedMessage, sequenceInstance, SequencedMessageState.PROCESSED, consumerRecord);
        }
        catch (Exception ex) {
            log.error("Error processing message {} (id={}), marking as failed", (Object)messageTypeName, (Object)avroMessage.getIdentity().getId());
            this.sequencedMessageService.storeSequencedMessage(existingSequencedMessage, sequenceInstance, SequencedMessageState.FAILED, consumerRecord);
            throw ex;
        }
    }

    private void bufferMessage(ConsumerRecord<AvroMessageKey, AvroMessage> consumerRecord, AvroMessage avroMessage, Sequence sequence, String contextId, Optional<SequencedMessage> existingSequencedMessage, SequenceInstance sequenceInstance) {
        log.info("Buffering message {} in sequence {} with context ID {}", new Object[]{avroMessage.getType().getName(), sequence.getName(), contextId});
        this.sequencedMessageService.storeSequencedMessage(existingSequencedMessage, sequenceInstance, SequencedMessageState.WAITING, consumerRecord);
    }

    private static boolean isAlreadyProcessedOrWaiting(Optional<SequencedMessage> existingSequencedMessage) {
        return existingSequencedMessage.isPresent() && SequencedMessageState.waitingOrProcessed(existingSequencedMessage.get().getState());
    }

    @Generated
    public SequentialInboxService(SequenceInstanceFactory sequenceInstanceFactory, SequencedMessageService sequencedMessageService, SequentialInboxConfiguration inboxConfiguration, Transactions tx, MessageHandlerService messageHandlerService, BufferedMessageService bufferedMessageService) {
        this.sequenceInstanceFactory = sequenceInstanceFactory;
        this.sequencedMessageService = sequencedMessageService;
        this.inboxConfiguration = inboxConfiguration;
        this.tx = tx;
        this.messageHandlerService = messageHandlerService;
        this.bufferedMessageService = bufferedMessageService;
    }
}

