/*
 * Decompiled with CFR 0.152.
 */
package dk.cloudcreate.essentials.components.foundation.test.messaging.queue;

import dk.cloudcreate.essentials.components.foundation.messaging.RedeliveryPolicy;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueueConsumer;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueues;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.Message;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.MessageMetaData;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueueEntryId;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueueName;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueuedMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueuedMessageHandler;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.TransactionalMode;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.ConsumeFromQueue;
import dk.cloudcreate.essentials.components.foundation.test.messaging.queue.test_data.CustomerId;
import dk.cloudcreate.essentials.components.foundation.test.messaging.queue.test_data.OrderEvent;
import dk.cloudcreate.essentials.components.foundation.test.messaging.queue.test_data.OrderId;
import dk.cloudcreate.essentials.components.foundation.test.messaging.queue.test_data.ProductId;
import dk.cloudcreate.essentials.components.foundation.transaction.UnitOfWork;
import dk.cloudcreate.essentials.components.foundation.transaction.UnitOfWorkFactory;
import dk.cloudcreate.essentials.components.foundation.types.CorrelationId;
import java.time.Clock;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public abstract class DurableQueuesIT<DURABLE_QUEUES extends DurableQueues, UOW extends UnitOfWork, UOW_FACTORY extends UnitOfWorkFactory<UOW>> {
    protected UOW_FACTORY unitOfWorkFactory;
    protected DURABLE_QUEUES durableQueues;

    @BeforeEach
    void setup() {
        this.unitOfWorkFactory = this.createUnitOfWorkFactory();
        this.resetQueueStorage(this.unitOfWorkFactory);
        this.durableQueues = this.createDurableQueues(this.unitOfWorkFactory);
        this.durableQueues.start();
    }

    @AfterEach
    void cleanup() {
        if (this.durableQueues != null) {
            this.durableQueues.stop();
        }
    }

    protected abstract DURABLE_QUEUES createDurableQueues(UOW_FACTORY var1);

    protected abstract UOW_FACTORY createUnitOfWorkFactory();

    protected abstract void resetQueueStorage(UOW_FACTORY var1);

    protected <R> R withDurableQueue(Supplier<R> supplier) {
        if (this.durableQueues.getTransactionalMode() == TransactionalMode.FullyTransactional) {
            return (R)this.unitOfWorkFactory.withUnitOfWork(uow -> supplier.get());
        }
        return supplier.get();
    }

    protected void usingDurableQueue(Runnable action) {
        if (this.durableQueues.getTransactionalMode() == TransactionalMode.FullyTransactional) {
            this.unitOfWorkFactory.usingUnitOfWork(uow -> action.run());
        } else {
            action.run();
        }
    }

    @Test
    void test_simple_enqueueing_and_afterwards_querying_queued_messages() {
        QueueName queueName = QueueName.of((CharSequence)"TestQueue");
        Message message1 = Message.of((Object)new OrderEvent.OrderAdded(OrderId.random(), CustomerId.random(), 1234L), (MessageMetaData)MessageMetaData.of((String)"correlation_id", (Object)CorrelationId.random(), (String)"trace_id", (Object)UUID.randomUUID().toString()));
        QueueEntryId idMsg1 = this.withDurableQueue(() -> this.durableQueues.queueMessage(queueName, message1));
        Message message2 = Message.of((Object)new OrderEvent.ProductAddedToOrder(OrderId.random(), ProductId.random(), 2), (MessageMetaData)MessageMetaData.of((String)"correlation_id", (Object)CorrelationId.random(), (String)"trace_id", (Object)UUID.randomUUID().toString()));
        QueueEntryId idMsg2 = this.withDurableQueue(() -> this.durableQueues.queueMessage(queueName, message2));
        Message message3 = Message.of((Object)new OrderEvent.OrderAccepted(OrderId.random()));
        QueueEntryId idMsg3 = this.withDurableQueue(() -> this.durableQueues.queueMessage(queueName, message3));
        Assertions.assertThat((long)this.durableQueues.getTotalMessagesQueuedFor(queueName)).isEqualTo(3L);
        List queuedMessages = this.durableQueues.getQueuedMessages(queueName, DurableQueues.QueueingSortOrder.ASC, 0L, 20L);
        Assertions.assertThat((int)queuedMessages.size()).isEqualTo(3);
        Assertions.assertThat((Object)((QueuedMessage)this.durableQueues.getQueuedMessage(idMsg1).get())).isEqualTo(queuedMessages.get(0));
        Assertions.assertThat((Object)((QueuedMessage)queuedMessages.get(0)).getMessage()).usingRecursiveComparison().isEqualTo((Object)message1);
        Assertions.assertThat((CharSequence)((QueuedMessage)queuedMessages.get(0)).getId()).isEqualTo((Object)idMsg1);
        Assertions.assertThat((CharSequence)((QueuedMessage)queuedMessages.get(0)).getQueueName()).isEqualTo((Object)queueName);
        Assertions.assertThat((OffsetDateTime)((QueuedMessage)queuedMessages.get(0)).getAddedTimestamp()).isBefore(OffsetDateTime.now(Clock.systemUTC()));
        Assertions.assertThat((OffsetDateTime)((QueuedMessage)queuedMessages.get(0)).getNextDeliveryTimestamp()).isBefore(OffsetDateTime.now(Clock.systemUTC()));
        Assertions.assertThat((boolean)((QueuedMessage)queuedMessages.get(0)).isDeadLetterMessage()).isFalse();
        Assertions.assertThat((String)((QueuedMessage)queuedMessages.get(0)).getLastDeliveryError()).isEqualTo(null);
        Assertions.assertThat((int)((QueuedMessage)queuedMessages.get(0)).getRedeliveryAttempts()).isEqualTo(0);
        Assertions.assertThat((int)((QueuedMessage)queuedMessages.get(0)).getTotalDeliveryAttempts()).isEqualTo(0);
        Assertions.assertThat((Object)((QueuedMessage)this.durableQueues.getQueuedMessage(idMsg2).get())).isEqualTo(queuedMessages.get(1));
        Assertions.assertThat((Object)((QueuedMessage)queuedMessages.get(1)).getMessage()).usingRecursiveComparison().isEqualTo((Object)message2);
        Assertions.assertThat((CharSequence)((QueuedMessage)queuedMessages.get(1)).getId()).isEqualTo((Object)idMsg2);
        Assertions.assertThat((CharSequence)((QueuedMessage)queuedMessages.get(1)).getQueueName()).isEqualTo((Object)queueName);
        Assertions.assertThat((OffsetDateTime)((QueuedMessage)queuedMessages.get(1)).getAddedTimestamp()).isBefore(OffsetDateTime.now(Clock.systemUTC()));
        Assertions.assertThat((OffsetDateTime)((QueuedMessage)queuedMessages.get(1)).getNextDeliveryTimestamp()).isBefore(OffsetDateTime.now(Clock.systemUTC()));
        Assertions.assertThat((boolean)((QueuedMessage)queuedMessages.get(1)).isDeadLetterMessage()).isFalse();
        Assertions.assertThat((String)((QueuedMessage)queuedMessages.get(1)).getLastDeliveryError()).isEqualTo(null);
        Assertions.assertThat((int)((QueuedMessage)queuedMessages.get(1)).getRedeliveryAttempts()).isEqualTo(0);
        Assertions.assertThat((int)((QueuedMessage)queuedMessages.get(1)).getTotalDeliveryAttempts()).isEqualTo(0);
        Assertions.assertThat((Object)((QueuedMessage)this.durableQueues.getQueuedMessage(idMsg3).get())).isEqualTo(queuedMessages.get(2));
        Assertions.assertThat((Object)((QueuedMessage)queuedMessages.get(2)).getMessage()).usingRecursiveComparison().isEqualTo((Object)message3);
        Assertions.assertThat((CharSequence)((QueuedMessage)queuedMessages.get(2)).getId()).isEqualTo((Object)idMsg3);
        Assertions.assertThat((CharSequence)((QueuedMessage)queuedMessages.get(2)).getQueueName()).isEqualTo((Object)queueName);
        Assertions.assertThat((OffsetDateTime)((QueuedMessage)queuedMessages.get(2)).getAddedTimestamp()).isBefore(OffsetDateTime.now(Clock.systemUTC()));
        Assertions.assertThat((OffsetDateTime)((QueuedMessage)queuedMessages.get(2)).getNextDeliveryTimestamp()).isBefore(OffsetDateTime.now(Clock.systemUTC()));
        Assertions.assertThat((boolean)((QueuedMessage)queuedMessages.get(2)).isDeadLetterMessage()).isFalse();
        Assertions.assertThat((String)((QueuedMessage)queuedMessages.get(2)).getLastDeliveryError()).isEqualTo(null);
        Assertions.assertThat((int)((QueuedMessage)queuedMessages.get(2)).getRedeliveryAttempts()).isEqualTo(0);
        Assertions.assertThat((int)((QueuedMessage)queuedMessages.get(2)).getTotalDeliveryAttempts()).isEqualTo(0);
        int numberOfDeletedMessages = this.durableQueues.purgeQueue(queueName);
        Assertions.assertThat((int)numberOfDeletedMessages).isEqualTo(3);
        Assertions.assertThat((long)this.durableQueues.getTotalMessagesQueuedFor(queueName)).isEqualTo(0L);
    }

    @Test
    void verify_queued_messages_are_dequeued_in_order() {
        QueueName queueName = QueueName.of((CharSequence)"TestQueue");
        this.durableQueues.purgeQueue(queueName);
        Message message1 = Message.of((Object)new OrderEvent.OrderAdded(OrderId.random(), CustomerId.random(), 1234L), (MessageMetaData)MessageMetaData.of((String)"correlation_id", (Object)CorrelationId.random(), (String)"trace_id", (Object)UUID.randomUUID().toString()));
        QueueEntryId idMsg1 = this.withDurableQueue(() -> this.durableQueues.queueMessage(queueName, message1));
        Message message2 = Message.of((Object)new OrderEvent.ProductAddedToOrder(OrderId.random(), ProductId.random(), 2), (MessageMetaData)MessageMetaData.of((String)"correlation_id", (Object)CorrelationId.random(), (String)"trace_id", (Object)UUID.randomUUID().toString()));
        QueueEntryId idMsg2 = this.withDurableQueue(() -> this.durableQueues.queueMessage(queueName, message2));
        Message message3 = Message.of((Object)new OrderEvent.OrderAccepted(OrderId.random()));
        QueueEntryId idMsg3 = this.withDurableQueue(() -> this.durableQueues.queueMessage(queueName, message3));
        Assertions.assertThat((long)this.durableQueues.getTotalMessagesQueuedFor(queueName)).isEqualTo(3L);
        RecordingQueuedMessageHandler recordingQueueMessageHandler = new RecordingQueuedMessageHandler();
        DurableQueueConsumer consumer = this.durableQueues.consumeFromQueue(ConsumeFromQueue.builder().setQueueName(queueName).setRedeliveryPolicy(RedeliveryPolicy.fixedBackoff().setRedeliveryDelay(Duration.ofMillis(200L)).setMaximumNumberOfRedeliveries(5).build()).setParallelConsumers(1).setQueueMessageHandler((QueuedMessageHandler)recordingQueueMessageHandler).build());
        Awaitility.waitAtMost((Duration)Duration.ofSeconds(2L)).untilAsserted(() -> Assertions.assertThat((int)recordingQueueMessageHandler.messages.size()).isEqualTo(3));
        Assertions.assertThat((Object)recordingQueueMessageHandler.messages.get(0)).usingRecursiveComparison().isEqualTo((Object)message1);
        Assertions.assertThat((Object)recordingQueueMessageHandler.messages.get(1)).usingRecursiveComparison().isEqualTo((Object)message2);
        Assertions.assertThat((Object)recordingQueueMessageHandler.messages.get(2)).usingRecursiveComparison().isEqualTo((Object)message3);
        consumer.cancel();
    }

    @Test
    void verify_a_message_queues_as_a_dead_letter_message_is_marked_as_such_and_will_not_be_delivered_to_the_consumer() {
        QueueName queueName = QueueName.of((CharSequence)"TestQueue");
        Message message1 = Message.of((Object)new OrderEvent.OrderAdded(OrderId.random(), CustomerId.random(), 1234L), (MessageMetaData)MessageMetaData.of((String)"correlation_id", (Object)CorrelationId.random(), (String)"trace_id", (Object)UUID.randomUUID().toString()));
        this.usingDurableQueue(() -> this.durableQueues.queueMessageAsDeadLetterMessage(queueName, message1, (Exception)new RuntimeException("On purpose")));
        Assertions.assertThat((long)this.durableQueues.getTotalMessagesQueuedFor(queueName)).isEqualTo(0L);
        List deadLetterMessages = this.durableQueues.getDeadLetterMessages(queueName, DurableQueues.QueueingSortOrder.ASC, 0L, 20L);
        Assertions.assertThat((int)deadLetterMessages.size()).isEqualTo(1);
        RecordingQueuedMessageHandler recordingQueueMessageHandler = new RecordingQueuedMessageHandler();
        DurableQueueConsumer consumer = this.durableQueues.consumeFromQueue(queueName, RedeliveryPolicy.fixedBackoff((Duration)Duration.ofMillis(200L), (int)5), 1, (QueuedMessageHandler)recordingQueueMessageHandler);
        Awaitility.await().during(Duration.ofMillis(1990L)).atMost(Duration.ofSeconds(2000L)).until(() -> recordingQueueMessageHandler.messages.size() == 0);
        consumer.cancel();
    }

    @Test
    void verify_failed_messages_are_redelivered() {
        QueueName queueName = QueueName.of((CharSequence)"TestQueue");
        Message message1 = Message.of((Object)new OrderEvent.OrderAdded(OrderId.random(), CustomerId.random(), 12345L), (MessageMetaData)MessageMetaData.of((String)"correlation_id", (Object)CorrelationId.random(), (String)"trace_id", (Object)UUID.randomUUID().toString()));
        this.usingDurableQueue(() -> this.durableQueues.queueMessage(queueName, message1));
        Assertions.assertThat((long)this.durableQueues.getTotalMessagesQueuedFor(queueName)).isEqualTo(1L);
        AtomicInteger deliveryCountForMessage1 = new AtomicInteger();
        RecordingQueuedMessageHandler recordingQueueMessageHandler = new RecordingQueuedMessageHandler(msg -> {
            int count = deliveryCountForMessage1.incrementAndGet();
            if (count <= 3) {
                throw new RuntimeException("Thrown on purpose. Delivery count: " + deliveryCountForMessage1);
            }
        });
        DurableQueueConsumer consumer = this.durableQueues.consumeFromQueue(queueName, RedeliveryPolicy.fixedBackoff((Duration)Duration.ofMillis(200L), (int)5), 1, (QueuedMessageHandler)recordingQueueMessageHandler);
        Awaitility.waitAtMost((Duration)Duration.ofSeconds(2L)).untilAsserted(() -> Assertions.assertThat((int)recordingQueueMessageHandler.messages.size()).isEqualTo(4));
        Assertions.assertThat((Object)recordingQueueMessageHandler.messages.get(0)).usingRecursiveComparison().isEqualTo((Object)message1);
        Assertions.assertThat((Object)recordingQueueMessageHandler.messages.get(1)).usingRecursiveComparison().isEqualTo((Object)message1);
        Assertions.assertThat((Object)recordingQueueMessageHandler.messages.get(2)).usingRecursiveComparison().isEqualTo((Object)message1);
        Assertions.assertThat((Object)recordingQueueMessageHandler.messages.get(3)).usingRecursiveComparison().isEqualTo((Object)message1);
        consumer.cancel();
    }

    @Test
    void verify_a_message_that_failed_too_many_times_is_marked_as_dead_letter_message_AND_the_message_can_be_resurrected() {
        QueueName queueName = QueueName.of((CharSequence)"TestQueue");
        Message message1 = Message.of((Object)new OrderEvent.OrderAdded(OrderId.random(), CustomerId.random(), 123456L), (MessageMetaData)MessageMetaData.of((String)"correlation_id", (Object)CorrelationId.random(), (String)"trace_id", (Object)UUID.randomUUID().toString()));
        QueueEntryId message1Id = this.withDurableQueue(() -> this.durableQueues.queueMessage(queueName, message1));
        Assertions.assertThat((long)this.durableQueues.getTotalMessagesQueuedFor(queueName)).isEqualTo(1L);
        AtomicInteger deliveryCountForMessage1 = new AtomicInteger();
        RecordingQueuedMessageHandler recordingQueueMessageHandler = new RecordingQueuedMessageHandler(msg -> {
            int count = deliveryCountForMessage1.incrementAndGet();
            if (count <= 6) {
                throw new RuntimeException("Thrown on purpose. Delivery count: " + deliveryCountForMessage1);
            }
        });
        DurableQueueConsumer consumer = this.durableQueues.consumeFromQueue(queueName, RedeliveryPolicy.fixedBackoff((Duration)Duration.ofMillis(200L), (int)5), 1, (QueuedMessageHandler)recordingQueueMessageHandler);
        Awaitility.waitAtMost((Duration)Duration.ofSeconds(4L)).untilAsserted(() -> Assertions.assertThat((int)recordingQueueMessageHandler.messages.size()).isEqualTo(6));
        Assertions.assertThat((Object)recordingQueueMessageHandler.messages.get(0)).usingRecursiveComparison().isEqualTo((Object)message1);
        Assertions.assertThat((Object)recordingQueueMessageHandler.messages.get(1)).usingRecursiveComparison().isEqualTo((Object)message1);
        Assertions.assertThat((Object)recordingQueueMessageHandler.messages.get(2)).usingRecursiveComparison().isEqualTo((Object)message1);
        Assertions.assertThat((Object)recordingQueueMessageHandler.messages.get(3)).usingRecursiveComparison().isEqualTo((Object)message1);
        Assertions.assertThat((Object)recordingQueueMessageHandler.messages.get(4)).usingRecursiveComparison().isEqualTo((Object)message1);
        Assertions.assertThat((Object)recordingQueueMessageHandler.messages.get(5)).usingRecursiveComparison().isEqualTo((Object)message1);
        Assertions.assertThat((long)this.durableQueues.getTotalMessagesQueuedFor(queueName)).isEqualTo(0L);
        Optional deadLetterMessage = this.withDurableQueue(() -> this.durableQueues.getDeadLetterMessage(message1Id));
        Assertions.assertThat((Optional)deadLetterMessage).isPresent();
        Assertions.assertThat((Object)((QueuedMessage)deadLetterMessage.get()).getMessage()).usingRecursiveComparison().isEqualTo((Object)message1);
        QueuedMessage firstDeadLetterMessage = (QueuedMessage)this.durableQueues.getDeadLetterMessages(queueName, DurableQueues.QueueingSortOrder.ASC, 0L, 20L).get(0);
        Assertions.assertThat((Object)((QueuedMessage)deadLetterMessage.get())).usingRecursiveComparison().isEqualTo((Object)firstDeadLetterMessage);
        Optional wasResurrectedResult = this.withDurableQueue(() -> this.durableQueues.resurrectDeadLetterMessage(message1Id, Duration.ofMillis(1000L)));
        Assertions.assertThat((Optional)wasResurrectedResult).isPresent();
        Assertions.assertThat((long)this.durableQueues.getTotalMessagesQueuedFor(queueName)).isEqualTo(1L);
        Awaitility.waitAtMost((Duration)Duration.ofSeconds(4L)).untilAsserted(() -> Assertions.assertThat((int)recordingQueueMessageHandler.messages.size()).isEqualTo(7));
        Assertions.assertThat((Object)recordingQueueMessageHandler.messages.get(6)).usingRecursiveComparison().isEqualTo((Object)message1);
        Awaitility.waitAtMost((Duration)Duration.ofMillis(500L)).untilAsserted(() -> Assertions.assertThat((List)this.durableQueues.getQueuedMessages(queueName, DurableQueues.QueueingSortOrder.ASC, 0L, 20L)).isEmpty());
        Assertions.assertThat((long)this.durableQueues.getTotalMessagesQueuedFor(queueName)).isEqualTo(0L);
        Assertions.assertThat((List)this.durableQueues.getDeadLetterMessages(queueName, DurableQueues.QueueingSortOrder.ASC, 0L, 20L)).isEmpty();
        consumer.cancel();
    }

    static class RecordingQueuedMessageHandler
    implements QueuedMessageHandler {
        Consumer<Object> functionLogic;
        List<Object> messages = new ArrayList<Object>();

        RecordingQueuedMessageHandler() {
        }

        RecordingQueuedMessageHandler(Consumer<Object> functionLogic) {
            this.functionLogic = functionLogic;
        }

        public void handle(QueuedMessage message) {
            this.messages.add(message.getMessage());
            if (this.functionLogic != null) {
                this.functionLogic.accept(message.getMessage());
            }
        }
    }
}

