/*
 * Decompiled with CFR 0.152.
 */
package ch.admin.bit.jeap.messaging.sequentialinbox.inbox;

import ch.admin.bit.jeap.messaging.avro.AvroMessage;
import ch.admin.bit.jeap.messaging.avro.AvroMessageKey;
import ch.admin.bit.jeap.messaging.kafka.errorhandling.ClusterNameHeaderInterceptor;
import ch.admin.bit.jeap.messaging.kafka.properties.KafkaProperties;
import ch.admin.bit.jeap.messaging.sequentialinbox.configuration.model.SequencedMessageType;
import ch.admin.bit.jeap.messaging.sequentialinbox.jpa.MessageRepository;
import ch.admin.bit.jeap.messaging.sequentialinbox.jpa.SequenceInstanceRepository;
import ch.admin.bit.jeap.messaging.sequentialinbox.kafka.TraceContextFactory;
import ch.admin.bit.jeap.messaging.sequentialinbox.persistence.BufferedMessage;
import ch.admin.bit.jeap.messaging.sequentialinbox.persistence.SequenceInstance;
import ch.admin.bit.jeap.messaging.sequentialinbox.persistence.SequencedMessage;
import ch.admin.bit.jeap.messaging.sequentialinbox.persistence.SequencedMessageState;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import lombok.Generated;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

@Component
class SequencedMessageService {
    private final TraceContextFactory traceContextFactory;
    private final MessageRepository messageRepository;
    private final KafkaProperties kafkaProperties;
    private final SequenceInstanceRepository sequenceInstanceRepository;

    @Transactional(propagation=Propagation.REQUIRES_NEW, isolation=Isolation.READ_COMMITTED)
    public void storeSequencedMessage(Optional<SequencedMessage> existingSequencedMessage, SequenceInstance sequenceInstance, SequencedMessageState state, ConsumerRecord<AvroMessageKey, AvroMessage> consumerRecord) {
        if (existingSequencedMessage.isPresent()) {
            this.messageRepository.setMessageStateInCurrentTransaction(existingSequencedMessage.get(), state);
            return;
        }
        BufferedMessage bufferedMessage = null;
        if (state == SequencedMessageState.WAITING) {
            bufferedMessage = BufferedMessage.builder().sequenceInstanceId(sequenceInstance.getId()).key(consumerRecord.key() == null ? null : ((AvroMessageKey)consumerRecord.key()).getSerializedMessage()).value(((AvroMessage)consumerRecord.value()).getSerializedMessage()).build();
        }
        String clusterName = this.getClusterName(consumerRecord);
        SequencedMessage sequencedMessage = SequencedMessage.builder().messageType(((AvroMessage)consumerRecord.value()).getType().getName()).sequenceInstanceId(sequenceInstance.getId()).clusterName(clusterName).topic(consumerRecord.topic()).sequencedMessageId(UUID.fromString(((AvroMessage)consumerRecord.value()).getIdentity().getId())).idempotenceId(((AvroMessage)consumerRecord.value()).getIdentity().getIdempotenceId()).traceContext(this.traceContextFactory.currentTraceContext()).state(state).build();
        this.messageRepository.saveMessage(bufferedMessage, sequencedMessage);
    }

    private String getClusterName(ConsumerRecord<AvroMessageKey, AvroMessage> consumerRecord) {
        String clusterName = ClusterNameHeaderInterceptor.getClusterName(consumerRecord);
        return clusterName == null ? this.kafkaProperties.getDefaultClusterName() : clusterName;
    }

    Optional<SequencedMessage> findByMessageTypeAndIdempotenceId(String messageType, String idempotenceId) {
        return this.messageRepository.findByMessageTypeAndIdempotenceIdInNewTransaction(messageType, idempotenceId);
    }

    boolean isReleaseConditionSatisfied(SequencedMessageType sequencedMessageType, SequenceInstance sequenceInstance) {
        Set<String> emptyProcessedMessageSet = Set.of();
        if (sequencedMessageType.isReleaseConditionSatisfied(emptyProcessedMessageSet)) {
            return true;
        }
        Set<String> processedMessageTypes = this.messageRepository.getProcessedMessageTypesInSequenceInNewTransaction(sequenceInstance);
        return sequencedMessageType.isReleaseConditionSatisfied(processedMessageTypes);
    }

    @Generated
    public SequencedMessageService(TraceContextFactory traceContextFactory, MessageRepository messageRepository, KafkaProperties kafkaProperties, SequenceInstanceRepository sequenceInstanceRepository) {
        this.traceContextFactory = traceContextFactory;
        this.messageRepository = messageRepository;
        this.kafkaProperties = kafkaProperties;
        this.sequenceInstanceRepository = sequenceInstanceRepository;
    }
}

