/*
 * Decompiled with CFR 0.152.
 */
package dk.cloudcreate.essentials.components.foundation.test.messaging.queue;

import dk.cloudcreate.essentials.components.foundation.messaging.RedeliveryPolicy;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueueConsumer;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueues;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueuesInterceptor;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.Message;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.MessageMetaData;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.NextQueuedMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.OrderedMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueueEntryId;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueueName;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueuedMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueuedMessageCounts;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueuedMessageHandler;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.TransactionalMode;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.ConsumeFromQueue;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.MarkAsDeadLetterMessage;
import dk.cloudcreate.essentials.components.foundation.test.messaging.queue.test_data.CustomerId;
import dk.cloudcreate.essentials.components.foundation.test.messaging.queue.test_data.OrderEvent;
import dk.cloudcreate.essentials.components.foundation.test.messaging.queue.test_data.OrderId;
import dk.cloudcreate.essentials.components.foundation.test.messaging.queue.test_data.ProductId;
import dk.cloudcreate.essentials.components.foundation.transaction.UnitOfWork;
import dk.cloudcreate.essentials.components.foundation.transaction.UnitOfWorkFactory;
import dk.cloudcreate.essentials.components.foundation.types.CorrelationId;
import dk.cloudcreate.essentials.shared.MessageFormatter;
import dk.cloudcreate.essentials.shared.collections.Lists;
import dk.cloudcreate.essentials.shared.interceptor.InterceptorChain;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public abstract class DurableQueuesIT<DURABLE_QUEUES extends DurableQueues, UOW extends UnitOfWork, UOW_FACTORY extends UnitOfWorkFactory<UOW>> {
    protected UOW_FACTORY unitOfWorkFactory;
    protected DURABLE_QUEUES durableQueues;

    @BeforeEach
    void setup() {
        this.unitOfWorkFactory = this.createUnitOfWorkFactory();
        this.resetQueueStorage(this.unitOfWorkFactory);
        this.durableQueues = this.createDurableQueues(this.unitOfWorkFactory);
        this.durableQueues.start();
    }

    @AfterEach
    void cleanup() {
        if (this.durableQueues != null) {
            this.durableQueues.stop();
        }
    }

    protected abstract DURABLE_QUEUES createDurableQueues(UOW_FACTORY var1);

    protected abstract UOW_FACTORY createUnitOfWorkFactory();

    protected abstract void resetQueueStorage(UOW_FACTORY var1);

    protected <R> R withDurableQueue(Supplier<R> supplier) {
        if (this.durableQueues.getTransactionalMode() == TransactionalMode.FullyTransactional) {
            return (R)this.unitOfWorkFactory.withUnitOfWork(uow -> supplier.get());
        }
        return supplier.get();
    }

    protected void usingDurableQueue(Runnable action) {
        if (this.durableQueues.getTransactionalMode() == TransactionalMode.FullyTransactional) {
            this.unitOfWorkFactory.usingUnitOfWork(uow -> action.run());
        } else {
            action.run();
        }
    }

    @Test
    void test_simple_enqueueing_and_afterwards_querying_queued_messages() {
        QueueName queueName = QueueName.of((CharSequence)"TestQueue");
        Message message1 = Message.of((Object)new OrderEvent.OrderAdded(OrderId.random(), CustomerId.random(), 1234L), (MessageMetaData)MessageMetaData.of((String)"correlation_id", (Object)CorrelationId.random(), (String)"trace_id", (Object)UUID.randomUUID().toString()));
        QueueEntryId idMsg1 = this.withDurableQueue(() -> this.durableQueues.queueMessage(queueName, message1));
        Assertions.assertThat((Optional)this.durableQueues.getQueueNameFor(idMsg1)).isEqualTo(Optional.of(queueName));
        Assertions.assertThat((Optional)this.durableQueues.getQueueNameFor(QueueEntryId.random())).isEmpty();
        Assertions.assertThat((Collection)this.durableQueues.getQueueNames()).isEqualTo(Set.of(queueName));
        Message message2 = Message.of((Object)new OrderEvent.ProductAddedToOrder(OrderId.random(), ProductId.random(), 2), (MessageMetaData)MessageMetaData.of((String)"correlation_id", (Object)CorrelationId.random(), (String)"trace_id", (Object)UUID.randomUUID().toString()));
        QueueEntryId idMsg2 = this.withDurableQueue(() -> this.durableQueues.queueMessage(queueName, message2));
        Message message3 = Message.of((Object)new OrderEvent.OrderAccepted(OrderId.random()));
        QueueEntryId idMsg3 = this.withDurableQueue(() -> this.durableQueues.queueMessage(queueName, message3));
        Assertions.assertThat((long)this.durableQueues.getTotalMessagesQueuedFor(queueName)).isEqualTo(3L);
        Assertions.assertThat((Object)this.durableQueues.getQueuedMessageCountsFor(queueName)).isEqualTo((Object)new QueuedMessageCounts(queueName, 3L, 0L));
        List queuedMessages = this.durableQueues.getQueuedMessages(queueName, DurableQueues.QueueingSortOrder.ASC, 0L, 20L);
        Assertions.assertThat((List)queuedMessages).hasSize(3);
        Assertions.assertThat((Object)((QueuedMessage)this.durableQueues.getQueuedMessage(idMsg1).get())).isEqualTo(queuedMessages.get(0));
        Assertions.assertThat((Object)((QueuedMessage)queuedMessages.get(0)).getMessage()).usingRecursiveComparison().isEqualTo((Object)message1);
        Assertions.assertThat((CharSequence)((QueuedMessage)queuedMessages.get(0)).getId()).isEqualTo((Object)idMsg1);
        Assertions.assertThat((CharSequence)((QueuedMessage)queuedMessages.get(0)).getQueueName()).isEqualTo((Object)queueName);
        Assertions.assertThat((OffsetDateTime)((QueuedMessage)queuedMessages.get(0)).getAddedTimestamp()).isBefore(OffsetDateTime.now(Clock.systemUTC()));
        Assertions.assertThat((OffsetDateTime)((QueuedMessage)queuedMessages.get(0)).getNextDeliveryTimestamp()).isBefore(OffsetDateTime.now(Clock.systemUTC()));
        Assertions.assertThat((boolean)((QueuedMessage)queuedMessages.get(0)).isDeadLetterMessage()).isFalse();
        Assertions.assertThat((String)((QueuedMessage)queuedMessages.get(0)).getLastDeliveryError()).isEqualTo(null);
        Assertions.assertThat((int)((QueuedMessage)queuedMessages.get(0)).getRedeliveryAttempts()).isEqualTo(0);
        Assertions.assertThat((int)((QueuedMessage)queuedMessages.get(0)).getTotalDeliveryAttempts()).isEqualTo(0);
        Assertions.assertThat((Object)((QueuedMessage)this.durableQueues.getQueuedMessage(idMsg2).get())).isEqualTo(queuedMessages.get(1));
        Assertions.assertThat((Object)((QueuedMessage)queuedMessages.get(1)).getMessage()).usingRecursiveComparison().isEqualTo((Object)message2);
        Assertions.assertThat((CharSequence)((QueuedMessage)queuedMessages.get(1)).getId()).isEqualTo((Object)idMsg2);
        Assertions.assertThat((CharSequence)((QueuedMessage)queuedMessages.get(1)).getQueueName()).isEqualTo((Object)queueName);
        Assertions.assertThat((OffsetDateTime)((QueuedMessage)queuedMessages.get(1)).getAddedTimestamp()).isBefore(OffsetDateTime.now(Clock.systemUTC()));
        Assertions.assertThat((OffsetDateTime)((QueuedMessage)queuedMessages.get(1)).getNextDeliveryTimestamp()).isBefore(OffsetDateTime.now(Clock.systemUTC()));
        Assertions.assertThat((boolean)((QueuedMessage)queuedMessages.get(1)).isDeadLetterMessage()).isFalse();
        Assertions.assertThat((String)((QueuedMessage)queuedMessages.get(1)).getLastDeliveryError()).isEqualTo(null);
        Assertions.assertThat((int)((QueuedMessage)queuedMessages.get(1)).getRedeliveryAttempts()).isEqualTo(0);
        Assertions.assertThat((int)((QueuedMessage)queuedMessages.get(1)).getTotalDeliveryAttempts()).isEqualTo(0);
        Assertions.assertThat((Object)((QueuedMessage)this.durableQueues.getQueuedMessage(idMsg3).get())).isEqualTo(queuedMessages.get(2));
        Assertions.assertThat((Object)((QueuedMessage)queuedMessages.get(2)).getMessage()).usingRecursiveComparison().isEqualTo((Object)message3);
        Assertions.assertThat((CharSequence)((QueuedMessage)queuedMessages.get(2)).getId()).isEqualTo((Object)idMsg3);
        Assertions.assertThat((CharSequence)((QueuedMessage)queuedMessages.get(2)).getQueueName()).isEqualTo((Object)queueName);
        Assertions.assertThat((OffsetDateTime)((QueuedMessage)queuedMessages.get(2)).getAddedTimestamp()).isBefore(OffsetDateTime.now(Clock.systemUTC()));
        Assertions.assertThat((OffsetDateTime)((QueuedMessage)queuedMessages.get(2)).getNextDeliveryTimestamp()).isBefore(OffsetDateTime.now(Clock.systemUTC()));
        Assertions.assertThat((boolean)((QueuedMessage)queuedMessages.get(2)).isDeadLetterMessage()).isFalse();
        Assertions.assertThat((String)((QueuedMessage)queuedMessages.get(2)).getLastDeliveryError()).isEqualTo(null);
        Assertions.assertThat((int)((QueuedMessage)queuedMessages.get(2)).getRedeliveryAttempts()).isEqualTo(0);
        Assertions.assertThat((int)((QueuedMessage)queuedMessages.get(2)).getTotalDeliveryAttempts()).isEqualTo(0);
        List nextMessages = this.durableQueues.queryForMessagesSoonReadyForDelivery(queueName, Instant.now().minusSeconds(2L), 10);
        Assertions.assertThat((List)nextMessages).hasSize(3);
        Assertions.assertThat((CharSequence)((NextQueuedMessage)nextMessages.get((int)0)).id).isEqualTo((Object)((QueuedMessage)queuedMessages.get(0)).getId());
        Assertions.assertThat((Instant)((NextQueuedMessage)nextMessages.get((int)0)).addedTimestamp).isEqualTo((Object)((QueuedMessage)queuedMessages.get(0)).getAddedTimestamp().toInstant());
        Assertions.assertThat((Instant)((NextQueuedMessage)nextMessages.get((int)0)).nextDeliveryTimestamp).isEqualTo((Object)((QueuedMessage)queuedMessages.get(0)).getNextDeliveryTimestamp().toInstant());
        Assertions.assertThat((CharSequence)((NextQueuedMessage)nextMessages.get((int)1)).id).isEqualTo((Object)((QueuedMessage)queuedMessages.get(1)).getId());
        Assertions.assertThat((Instant)((NextQueuedMessage)nextMessages.get((int)1)).addedTimestamp).isEqualTo((Object)((QueuedMessage)queuedMessages.get(1)).getAddedTimestamp().toInstant());
        Assertions.assertThat((Instant)((NextQueuedMessage)nextMessages.get((int)1)).nextDeliveryTimestamp).isEqualTo((Object)((QueuedMessage)queuedMessages.get(1)).getNextDeliveryTimestamp().toInstant());
        Assertions.assertThat((CharSequence)((NextQueuedMessage)nextMessages.get((int)2)).id).isEqualTo((Object)((QueuedMessage)queuedMessages.get(2)).getId());
        Assertions.assertThat((Instant)((NextQueuedMessage)nextMessages.get((int)2)).addedTimestamp).isEqualTo((Object)((QueuedMessage)queuedMessages.get(2)).getAddedTimestamp().toInstant());
        Assertions.assertThat((Instant)((NextQueuedMessage)nextMessages.get((int)2)).nextDeliveryTimestamp).isEqualTo((Object)((QueuedMessage)queuedMessages.get(2)).getNextDeliveryTimestamp().toInstant());
        nextMessages = this.durableQueues.queryForMessagesSoonReadyForDelivery(queueName, Instant.now().minusSeconds(2L), 2);
        Assertions.assertThat((List)nextMessages).hasSize(2);
        int numberOfDeletedMessages = this.durableQueues.purgeQueue(queueName);
        Assertions.assertThat((int)numberOfDeletedMessages).isEqualTo(3);
        Assertions.assertThat((long)this.durableQueues.getTotalMessagesQueuedFor(queueName)).isEqualTo(0L);
        Assertions.assertThat((Object)this.durableQueues.getQueuedMessageCountsFor(queueName)).isEqualTo((Object)new QueuedMessageCounts(queueName, 0L, 0L));
    }

    @Test
    void verify_queued_messages_are_dequeued_in_order() {
        QueueName queueName = QueueName.of((CharSequence)"TestQueue");
        this.durableQueues.purgeQueue(queueName);
        Message message1 = Message.of((Object)new OrderEvent.OrderAdded(OrderId.random(), CustomerId.random(), 1234L), (MessageMetaData)MessageMetaData.of((String)"correlation_id", (Object)CorrelationId.random(), (String)"trace_id", (Object)UUID.randomUUID().toString()));
        QueueEntryId idMsg1 = this.withDurableQueue(() -> this.durableQueues.queueMessage(queueName, message1));
        Assertions.assertThat((Collection)this.durableQueues.getQueueNames()).isEqualTo(Set.of(queueName));
        Message message2 = Message.of((Object)new OrderEvent.ProductAddedToOrder(OrderId.random(), ProductId.random(), 2), (MessageMetaData)MessageMetaData.of((String)"correlation_id", (Object)CorrelationId.random(), (String)"trace_id", (Object)UUID.randomUUID().toString()));
        QueueEntryId idMsg2 = this.withDurableQueue(() -> this.durableQueues.queueMessage(queueName, message2));
        Message message3 = Message.of((Object)new OrderEvent.OrderAccepted(OrderId.random()));
        QueueEntryId idMsg3 = this.withDurableQueue(() -> this.durableQueues.queueMessage(queueName, message3));
        Assertions.assertThat((long)this.durableQueues.getTotalMessagesQueuedFor(queueName)).isEqualTo(3L);
        Assertions.assertThat((long)this.durableQueues.getTotalDeadLetterMessagesQueuedFor(queueName)).isEqualTo(0L);
        Assertions.assertThat((Object)this.durableQueues.getQueuedMessageCountsFor(queueName)).isEqualTo((Object)new QueuedMessageCounts(queueName, 3L, 0L));
        RecordingQueuedMessageHandler recordingQueueMessageHandler = new RecordingQueuedMessageHandler();
        DurableQueueConsumer consumer = this.durableQueues.consumeFromQueue(ConsumeFromQueue.builder().setQueueName(queueName).setRedeliveryPolicy(RedeliveryPolicy.fixedBackoff().setRedeliveryDelay(Duration.ofMillis(200L)).setMaximumNumberOfRedeliveries(5).build()).setParallelConsumers(1).setQueueMessageHandler((QueuedMessageHandler)recordingQueueMessageHandler).build());
        Assertions.assertThat((Collection)this.durableQueues.getQueueNames()).isEqualTo(Set.of(queueName));
        Awaitility.waitAtMost((Duration)Duration.ofSeconds(2L)).untilAsserted(() -> Assertions.assertThat(recordingQueueMessageHandler.messages).hasSize(3));
        Awaitility.waitAtMost((Duration)Duration.ofSeconds(2L)).untilAsserted(() -> Assertions.assertThat((long)this.durableQueues.getTotalMessagesQueuedFor(queueName)).isEqualTo(0L));
        Assertions.assertThat((long)this.durableQueues.getTotalDeadLetterMessagesQueuedFor(queueName)).isEqualTo(0L);
        Assertions.assertThat((Object)this.durableQueues.getQueuedMessageCountsFor(queueName)).isEqualTo((Object)new QueuedMessageCounts(queueName, 0L, 0L));
        ArrayList<Message> messages = new ArrayList<Message>(recordingQueueMessageHandler.messages);
        Assertions.assertThat((Object)messages.get(0)).usingRecursiveComparison().isEqualTo((Object)message1);
        Assertions.assertThat((Object)messages.get(1)).usingRecursiveComparison().isEqualTo((Object)message2);
        Assertions.assertThat((Object)messages.get(2)).usingRecursiveComparison().isEqualTo((Object)message3);
        Assertions.assertThat((Collection)this.durableQueues.getQueueNames()).isEqualTo(Set.of(queueName));
        consumer.cancel();
        Assertions.assertThat((Collection)this.durableQueues.getQueueNames()).isEqualTo(Set.of());
    }

    @Test
    void verify_a_message_queues_as_a_dead_letter_message_is_marked_as_such_and_will_not_be_delivered_to_the_consumer() {
        QueueName queueName = QueueName.of((CharSequence)"TestQueue");
        Message message1 = Message.of((Object)new OrderEvent.OrderAdded(OrderId.random(), CustomerId.random(), 1234L), (MessageMetaData)MessageMetaData.of((String)"correlation_id", (Object)CorrelationId.random(), (String)"trace_id", (Object)UUID.randomUUID().toString()));
        QueueEntryId idMsg1 = this.withDurableQueue(() -> this.durableQueues.queueMessageAsDeadLetterMessage(queueName, message1, (Exception)new RuntimeException("On purpose")));
        Assertions.assertThat((Collection)this.durableQueues.getQueueNames()).isEqualTo(Set.of(queueName));
        Assertions.assertThat((Optional)this.durableQueues.getQueueNameFor(idMsg1)).isEqualTo(Optional.of(queueName));
        Assertions.assertThat((long)this.durableQueues.getTotalMessagesQueuedFor(queueName)).isEqualTo(0L);
        Assertions.assertThat((long)this.durableQueues.getTotalDeadLetterMessagesQueuedFor(queueName)).isEqualTo(1L);
        Assertions.assertThat((Object)this.durableQueues.getQueuedMessageCountsFor(queueName)).isEqualTo((Object)new QueuedMessageCounts(queueName, 0L, 1L));
        List deadLetterMessages = this.durableQueues.getDeadLetterMessages(queueName, DurableQueues.QueueingSortOrder.ASC, 0L, 20L);
        Assertions.assertThat((List)deadLetterMessages).hasSize(1);
        Assertions.assertThat((CharSequence)((QueuedMessage)deadLetterMessages.get(0)).getId()).isEqualTo((Object)idMsg1);
        RecordingQueuedMessageHandler recordingQueueMessageHandler = new RecordingQueuedMessageHandler();
        DurableQueueConsumer consumer = this.durableQueues.consumeFromQueue(queueName, RedeliveryPolicy.fixedBackoff((Duration)Duration.ofMillis(200L), (int)5), 1, (QueuedMessageHandler)recordingQueueMessageHandler);
        Awaitility.await().during(Duration.ofMillis(1990L)).atMost(Duration.ofSeconds(2000L)).until(() -> recordingQueueMessageHandler.messages.size() == 0);
        consumer.cancel();
        Assertions.assertThat((Collection)this.durableQueues.getQueueNames()).isEqualTo(Set.of(queueName));
    }

    @Test
    void verify_a_that_as_long_as_an_ordered_message_with_same_key_and_a_lower_key_order_exists_as_a_dead_letter_message_then_no_further_messages_with_the_same_key_will_be_delivered() {
        QueueName queueName = QueueName.of((CharSequence)"TestQueue");
        String key1 = "Key1";
        List<String> key1Messages = List.of("Key1Msg1", "Key1Msg2", "Key1Msg3", "Key1Msg4", "Key1Msg5");
        String key2 = "Key2";
        List<String> key2Messages = List.of("Key2Msg1", "Key2Msg2", "Key2Msg3", "Key2Msg4", "Key2Msg5");
        this.usingDurableQueue(() -> {
            Lists.toIndexedStream((List)key1Messages).forEach(indexedMessage -> {
                if (((String)indexedMessage._2).equals("Key1Msg3")) {
                    this.durableQueues.queueMessageAsDeadLetterMessage(queueName, (Message)OrderedMessage.of((Object)indexedMessage._2, (String)key1, (long)((Integer)indexedMessage._1).intValue()), (Exception)new RuntimeException("On purpose"));
                } else {
                    this.durableQueues.queueMessage(queueName, (Message)OrderedMessage.of((Object)indexedMessage._2, (String)key1, (long)((Integer)indexedMessage._1).intValue()), Duration.ofMillis(100L));
                }
            });
            Lists.toIndexedStream((List)key2Messages).forEach(indexedMessage -> this.durableQueues.queueMessage(queueName, (Message)OrderedMessage.of((Object)indexedMessage._2, (String)key2, (long)((Integer)indexedMessage._1).intValue()), Duration.ofMillis(100L)));
        });
        int expectedQueueMessageCount = key1Messages.size() + key2Messages.size() - 1;
        Assertions.assertThat((long)this.durableQueues.getTotalMessagesQueuedFor(queueName)).isEqualTo((long)expectedQueueMessageCount);
        List deadLetterMessages = this.durableQueues.getDeadLetterMessages(queueName, DurableQueues.QueueingSortOrder.ASC, 0L, 20L);
        Assertions.assertThat((List)deadLetterMessages).hasSize(1);
        Assertions.assertThat((Object)((QueuedMessage)deadLetterMessages.get(0)).getPayload()).isEqualTo((Object)key1Messages.get(2));
        Assertions.assertThat((Object)this.durableQueues.getQueuedMessageCountsFor(queueName)).isEqualTo((Object)new QueuedMessageCounts(queueName, (long)expectedQueueMessageCount, 1L));
        QueueEntryId key1Msg3EntryId = ((QueuedMessage)deadLetterMessages.get(0)).getId();
        RecordingQueuedMessageHandler recordingQueueMessageHandler = new RecordingQueuedMessageHandler();
        DurableQueueConsumer consumer = this.durableQueues.consumeFromQueue(queueName, RedeliveryPolicy.fixedBackoff((Duration)Duration.ofMillis(200L), (int)1), 2, (QueuedMessageHandler)recordingQueueMessageHandler);
        Awaitility.waitAtMost((Duration)Duration.ofSeconds(5000L)).untilAsserted(() -> Assertions.assertThat(recordingQueueMessageHandler.messages).hasSize(key2Messages.size() + 2));
        Assertions.assertThat(recordingQueueMessageHandler.messages.stream().map(o -> (String)o.getPayload())).containsOnly((Object[])new String[]{"Key1Msg1", "Key1Msg2", "Key2Msg1", "Key2Msg2", "Key2Msg3", "Key2Msg4", "Key2Msg5"});
        recordingQueueMessageHandler.messages.clear();
        this.usingDurableQueue(() -> this.durableQueues.resurrectDeadLetterMessage(key1Msg3EntryId, Duration.ofMillis(10L)));
        Awaitility.waitAtMost((Duration)Duration.ofSeconds(2000L)).untilAsserted(() -> {
            List newDeadLetterMessages = this.durableQueues.getDeadLetterMessages(queueName, DurableQueues.QueueingSortOrder.ASC, 0L, 20L);
            if (!newDeadLetterMessages.isEmpty()) {
                newDeadLetterMessages.forEach(queuedMessage -> {
                    System.out.println("Resurrected new DeadLetterMessage: " + queuedMessage);
                    this.usingDurableQueue(() -> this.durableQueues.resurrectDeadLetterMessage(queuedMessage.getId(), Duration.ofMillis(10L)));
                });
            }
            Assertions.assertThat(recordingQueueMessageHandler.messages).hasSize(3);
        });
        Assertions.assertThat(recordingQueueMessageHandler.messages.stream().map(o -> (String)o.getPayload())).containsOnly((Object[])new String[]{"Key1Msg3", "Key1Msg4", "Key1Msg5"});
        consumer.cancel();
    }

    @Test
    void verify_failed_messages_are_redelivered() {
        QueueName queueName = QueueName.of((CharSequence)"TestQueue");
        Message message1 = Message.of((Object)new OrderEvent.OrderAdded(OrderId.random(), CustomerId.random(), 12345L), (MessageMetaData)MessageMetaData.of((String)"correlation_id", (Object)CorrelationId.random(), (String)"trace_id", (Object)UUID.randomUUID().toString()));
        this.usingDurableQueue(() -> this.durableQueues.queueMessage(queueName, message1));
        Assertions.assertThat((long)this.durableQueues.getTotalMessagesQueuedFor(queueName)).isEqualTo(1L);
        AtomicInteger deliveryCountForMessage1 = new AtomicInteger();
        RecordingQueuedMessageHandler recordingQueueMessageHandler = new RecordingQueuedMessageHandler(msg -> {
            int count = deliveryCountForMessage1.incrementAndGet();
            if (count <= 3) {
                throw new RuntimeException("Thrown on purpose. Delivery count: " + deliveryCountForMessage1);
            }
        });
        DurableQueueConsumer consumer = this.durableQueues.consumeFromQueue(queueName, RedeliveryPolicy.fixedBackoff((Duration)Duration.ofMillis(200L), (int)5), 1, (QueuedMessageHandler)recordingQueueMessageHandler);
        Awaitility.waitAtMost((Duration)Duration.ofSeconds(2L)).untilAsserted(() -> Assertions.assertThat(recordingQueueMessageHandler.messages).hasSize(4));
        ArrayList<Message> messages = new ArrayList<Message>(recordingQueueMessageHandler.messages);
        Assertions.assertThat((Object)messages.get(0)).usingRecursiveComparison().isEqualTo((Object)message1);
        Assertions.assertThat((Object)messages.get(1)).usingRecursiveComparison().isEqualTo((Object)message1);
        Assertions.assertThat((Object)messages.get(2)).usingRecursiveComparison().isEqualTo((Object)message1);
        Assertions.assertThat((Object)messages.get(3)).usingRecursiveComparison().isEqualTo((Object)message1);
        consumer.cancel();
    }

    @Test
    void verify_a_message_that_failed_too_many_times_is_marked_as_dead_letter_message_AND_the_message_can_be_resurrected() {
        QueueName queueName = QueueName.of((CharSequence)"TestQueue");
        Message message1 = Message.of((Object)new OrderEvent.OrderAdded(OrderId.random(), CustomerId.random(), 123456L), (MessageMetaData)MessageMetaData.of((String)"correlation_id", (Object)CorrelationId.random(), (String)"trace_id", (Object)UUID.randomUUID().toString()));
        QueueEntryId message1Id = this.withDurableQueue(() -> this.durableQueues.queueMessage(queueName, message1));
        Assertions.assertThat((long)this.durableQueues.getTotalMessagesQueuedFor(queueName)).isEqualTo(1L);
        AtomicInteger deliveryCountForMessage1 = new AtomicInteger();
        RecordingQueuedMessageHandler recordingQueueMessageHandler = new RecordingQueuedMessageHandler(msg -> {
            int count = deliveryCountForMessage1.incrementAndGet();
            if (count <= 6) {
                throw new RuntimeException("Thrown on purpose. Delivery count: " + deliveryCountForMessage1);
            }
        });
        DurableQueueConsumer consumer = this.durableQueues.consumeFromQueue(queueName, RedeliveryPolicy.fixedBackoff((Duration)Duration.ofMillis(200L), (int)5), 1, (QueuedMessageHandler)recordingQueueMessageHandler);
        Awaitility.waitAtMost((Duration)Duration.ofSeconds(4L)).untilAsserted(() -> Assertions.assertThat(recordingQueueMessageHandler.messages).hasSize(6));
        ArrayList<Message> messages = new ArrayList<Message>(recordingQueueMessageHandler.messages);
        Assertions.assertThat((Object)messages.get(0)).usingRecursiveComparison().isEqualTo((Object)message1);
        Assertions.assertThat((Object)messages.get(1)).usingRecursiveComparison().isEqualTo((Object)message1);
        Assertions.assertThat((Object)messages.get(2)).usingRecursiveComparison().isEqualTo((Object)message1);
        Assertions.assertThat((Object)messages.get(3)).usingRecursiveComparison().isEqualTo((Object)message1);
        Assertions.assertThat((Object)messages.get(4)).usingRecursiveComparison().isEqualTo((Object)message1);
        Assertions.assertThat((Object)messages.get(5)).usingRecursiveComparison().isEqualTo((Object)message1);
        Assertions.assertThat((long)this.durableQueues.getTotalMessagesQueuedFor(queueName)).isEqualTo(0L);
        Optional deadLetterMessage = this.withDurableQueue(() -> this.durableQueues.getDeadLetterMessage(message1Id));
        Assertions.assertThat((Optional)deadLetterMessage).isPresent();
        Assertions.assertThat((Object)((QueuedMessage)deadLetterMessage.get()).getMessage()).usingRecursiveComparison().isEqualTo((Object)message1);
        Assertions.assertThat((int)((QueuedMessage)deadLetterMessage.get()).getTotalDeliveryAttempts()).isEqualTo(6);
        Assertions.assertThat((int)((QueuedMessage)deadLetterMessage.get()).getRedeliveryAttempts()).isEqualTo(5);
        QueuedMessage firstDeadLetterMessage = (QueuedMessage)this.durableQueues.getDeadLetterMessages(queueName, DurableQueues.QueueingSortOrder.ASC, 0L, 20L).get(0);
        Assertions.assertThat((Object)((QueuedMessage)deadLetterMessage.get())).usingRecursiveComparison().isEqualTo((Object)firstDeadLetterMessage);
        Optional wasResurrectedResult = this.withDurableQueue(() -> this.durableQueues.resurrectDeadLetterMessage(message1Id, Duration.ofMillis(1000L)));
        Assertions.assertThat((Optional)wasResurrectedResult).isPresent();
        Assertions.assertThat((long)this.durableQueues.getTotalMessagesQueuedFor(queueName)).isEqualTo(1L);
        Awaitility.waitAtMost((Duration)Duration.ofSeconds(4L)).untilAsserted(() -> Assertions.assertThat(recordingQueueMessageHandler.messages).hasSize(7));
        messages = new ArrayList<Message>(recordingQueueMessageHandler.messages);
        Assertions.assertThat((Object)messages.get(6)).usingRecursiveComparison().isEqualTo((Object)message1);
        Awaitility.waitAtMost((Duration)Duration.ofMillis(500L)).untilAsserted(() -> Assertions.assertThat((List)this.durableQueues.getQueuedMessages(queueName, DurableQueues.QueueingSortOrder.ASC, 0L, 20L)).isEmpty());
        Assertions.assertThat((long)this.durableQueues.getTotalMessagesQueuedFor(queueName)).isEqualTo(0L);
        Assertions.assertThat((List)this.durableQueues.getDeadLetterMessages(queueName, DurableQueues.QueueingSortOrder.ASC, 0L, 20L)).isEmpty();
        consumer.cancel();
    }

    @Test
    void test_two_stage_redelivery_where_a_message_about_to_be_marked_as_a_deadletter_message_is_queued_with_a_redelivery_delay() {
        QueueName queueName = QueueName.of((CharSequence)"TestQueue");
        Message message1 = Message.of((Object)new OrderEvent.OrderAdded(OrderId.random(), CustomerId.random(), 123456L), (MessageMetaData)MessageMetaData.of((String)"correlation_id", (Object)CorrelationId.random(), (String)"trace_id", (Object)UUID.randomUUID().toString()));
        QueueEntryId message1Id = this.withDurableQueue(() -> this.durableQueues.queueMessage(queueName, message1));
        Assertions.assertThat((long)this.durableQueues.getTotalMessagesQueuedFor(queueName)).isEqualTo(1L);
        Assertions.assertThat((Object)this.durableQueues.getQueuedMessageCountsFor(queueName)).isEqualTo((Object)new QueuedMessageCounts(queueName, 1L, 0L));
        AtomicInteger deliveryCountForMessage1 = new AtomicInteger();
        final int totalExpectedDeliveries = 9;
        RecordingQueuedMessageHandler recordingQueueMessageHandler = new RecordingQueuedMessageHandler(msg -> {
            int count = deliveryCountForMessage1.incrementAndGet();
            if (count <= totalExpectedDeliveries) {
                throw new RuntimeException("Thrown on purpose. Delivery count: " + deliveryCountForMessage1);
            }
        });
        this.durableQueues.addInterceptor(new DurableQueuesInterceptor(){
            private DurableQueues durableQueues;

            public void setDurableQueues(DurableQueues durableQueues) {
                this.durableQueues = durableQueues;
            }

            public Optional<QueuedMessage> intercept(MarkAsDeadLetterMessage operation, InterceptorChain<MarkAsDeadLetterMessage, Optional<QueuedMessage>, DurableQueuesInterceptor> interceptorChain) {
                QueuedMessage queuedMessage = (QueuedMessage)this.durableQueues.getQueuedMessage(operation.queueEntryId).get();
                System.out.println("MarkAsDeadLetterMessage: " + queuedMessage);
                if (queuedMessage.getTotalDeliveryAttempts() < totalExpectedDeliveries + 1) {
                    System.out.println("---------------> Overriding decision to mark as dead letter message for message " + operation.queueEntryId + ". Total delivery attempts is " + queuedMessage.getTotalDeliveryAttempts());
                    if (((Optional)interceptorChain.proceed()).isEmpty()) {
                        throw new RuntimeException(MessageFormatter.msg((String)"Failed to mark message '{}' as Dead Letter Message", (Object[])new Object[]{operation.queueEntryId}));
                    }
                    return this.durableQueues.resurrectDeadLetterMessage(queuedMessage.getId(), Duration.ofSeconds(1L));
                }
                System.out.println("---------------> Gave up on Redelivery");
                return (Optional)interceptorChain.proceed();
            }
        });
        DurableQueueConsumer consumer = this.durableQueues.consumeFromQueue(queueName, RedeliveryPolicy.fixedBackoff((Duration)Duration.ofMillis(200L), (int)5), 1, (QueuedMessageHandler)recordingQueueMessageHandler);
        Awaitility.waitAtMost((Duration)Duration.ofSeconds(20L)).untilAsserted(() -> Assertions.assertThat(recordingQueueMessageHandler.messages).hasSize(totalExpectedDeliveries + 1));
        ArrayList<Message> messages = new ArrayList<Message>(recordingQueueMessageHandler.messages);
        Assertions.assertThat((Object)messages.get(0)).usingRecursiveComparison().isEqualTo((Object)message1);
        Assertions.assertThat((Object)messages.get(1)).usingRecursiveComparison().isEqualTo((Object)message1);
        Assertions.assertThat((Object)messages.get(2)).usingRecursiveComparison().isEqualTo((Object)message1);
        Assertions.assertThat((Object)messages.get(3)).usingRecursiveComparison().isEqualTo((Object)message1);
        Assertions.assertThat((Object)messages.get(4)).usingRecursiveComparison().isEqualTo((Object)message1);
        Assertions.assertThat((Object)messages.get(5)).usingRecursiveComparison().isEqualTo((Object)message1);
        Assertions.assertThat((Object)messages.get(6)).usingRecursiveComparison().isEqualTo((Object)message1);
        Assertions.assertThat((Object)messages.get(7)).usingRecursiveComparison().isEqualTo((Object)message1);
        Assertions.assertThat((Object)messages.get(8)).usingRecursiveComparison().isEqualTo((Object)message1);
        Assertions.assertThat((Object)messages.get(9)).usingRecursiveComparison().isEqualTo((Object)message1);
        Assertions.assertThat((long)this.durableQueues.getTotalMessagesQueuedFor(queueName)).isEqualTo(0L);
        Optional deadLetterMessage = this.withDurableQueue(() -> this.durableQueues.getDeadLetterMessage(message1Id));
        Assertions.assertThat((Optional)deadLetterMessage).isEmpty();
        Assertions.assertThat((Object)this.durableQueues.getQueuedMessageCountsFor(queueName)).isEqualTo((Object)new QueuedMessageCounts(queueName, 0L, 0L));
        consumer.cancel();
    }

    @Test
    void verify_a_message_can_manually_be_marked_as_dead_letter_message_AND_the_message_can_afterwards_be_resurrected() {
        QueueName queueName = QueueName.of((CharSequence)"TestQueue");
        Message message1 = Message.of((Object)new OrderEvent.OrderAdded(OrderId.random(), CustomerId.random(), 123456L), (MessageMetaData)MessageMetaData.of((String)"correlation_id", (Object)CorrelationId.random(), (String)"trace_id", (Object)UUID.randomUUID().toString()));
        QueueEntryId message1Id = this.withDurableQueue(() -> this.durableQueues.queueMessage(queueName, message1));
        Assertions.assertThat((long)this.durableQueues.getTotalMessagesQueuedFor(queueName)).isEqualTo(1L);
        AtomicBoolean hasMessageBeenMarkedAsDeadLetterMessage = new AtomicBoolean();
        RecordingQueuedMessageHandler recordingQueueMessageHandler = new RecordingQueuedMessageHandler(msg -> {
            if (!hasMessageBeenMarkedAsDeadLetterMessage.get()) {
                hasMessageBeenMarkedAsDeadLetterMessage.set(true);
                this.durableQueues.markAsDeadLetterMessage(message1Id);
            }
        });
        DurableQueueConsumer consumer = this.durableQueues.consumeFromQueue(queueName, RedeliveryPolicy.fixedBackoff((Duration)Duration.ofMillis(200L), (int)5), 1, (QueuedMessageHandler)recordingQueueMessageHandler);
        Awaitility.waitAtMost((Duration)Duration.ofSeconds(4L)).untilAsserted(() -> Assertions.assertThat(recordingQueueMessageHandler.messages).hasSize(1));
        ArrayList<Message> messages = new ArrayList<Message>(recordingQueueMessageHandler.messages);
        Assertions.assertThat((Object)messages.get(0)).usingRecursiveComparison().isEqualTo((Object)message1);
        Assertions.assertThat((long)this.durableQueues.getTotalMessagesQueuedFor(queueName)).isEqualTo(0L);
        Optional deadLetterMessage = this.withDurableQueue(() -> this.durableQueues.getDeadLetterMessage(message1Id));
        Assertions.assertThat((Optional)deadLetterMessage).isPresent();
        Assertions.assertThat((Object)((QueuedMessage)deadLetterMessage.get()).getMessage()).usingRecursiveComparison().isEqualTo((Object)message1);
        Assertions.assertThat((int)((QueuedMessage)deadLetterMessage.get()).getTotalDeliveryAttempts()).isEqualTo(1);
        Assertions.assertThat((int)((QueuedMessage)deadLetterMessage.get()).getRedeliveryAttempts()).isEqualTo(0);
        Assertions.assertThat((String)((QueuedMessage)deadLetterMessage.get()).getLastDeliveryError()).isNull();
        QueuedMessage firstDeadLetterMessage = (QueuedMessage)this.durableQueues.getDeadLetterMessages(queueName, DurableQueues.QueueingSortOrder.ASC, 0L, 20L).get(0);
        Assertions.assertThat((Object)((QueuedMessage)deadLetterMessage.get())).usingRecursiveComparison().isEqualTo((Object)firstDeadLetterMessage);
        Optional wasResurrectedResult = this.withDurableQueue(() -> this.durableQueues.resurrectDeadLetterMessage(message1Id, Duration.ofMillis(1000L)));
        Assertions.assertThat((Optional)wasResurrectedResult).isPresent();
        Assertions.assertThat((long)this.durableQueues.getTotalMessagesQueuedFor(queueName)).isEqualTo(1L);
        Awaitility.waitAtMost((Duration)Duration.ofSeconds(4L)).untilAsserted(() -> Assertions.assertThat(recordingQueueMessageHandler.messages).hasSize(2));
        messages = new ArrayList<Message>(recordingQueueMessageHandler.messages);
        Assertions.assertThat((Object)messages.get(1)).usingRecursiveComparison().isEqualTo((Object)message1);
        Awaitility.waitAtMost((Duration)Duration.ofMillis(500L)).untilAsserted(() -> Assertions.assertThat((List)this.durableQueues.getQueuedMessages(queueName, DurableQueues.QueueingSortOrder.ASC, 0L, 20L)).isEmpty());
        Assertions.assertThat((long)this.durableQueues.getTotalMessagesQueuedFor(queueName)).isEqualTo(0L);
        Assertions.assertThat((List)this.durableQueues.getDeadLetterMessages(queueName, DurableQueues.QueueingSortOrder.ASC, 0L, 20L)).isEmpty();
        consumer.cancel();
    }

    @Test
    void test_messagehandler_with_call_to_markForRedeliveryIn() {
        QueueName queueName = QueueName.of((CharSequence)"TestQueue");
        Message message1 = Message.of((Object)new OrderEvent.OrderAdded(OrderId.random(), CustomerId.random(), 1234L), (MessageMetaData)MessageMetaData.of((String)"correlation_id", (Object)CorrelationId.random(), (String)"trace_id", (Object)UUID.randomUUID().toString()));
        QueueEntryId idMsg1 = this.withDurableQueue(() -> this.durableQueues.queueMessage(queueName, message1));
        List queuedMessages = this.durableQueues.getQueuedMessages(queueName, DurableQueues.QueueingSortOrder.ASC, 0L, 20L);
        Assertions.assertThat((List)queuedMessages).hasSize(1);
        Assertions.assertThat((Object)((QueuedMessage)this.durableQueues.getQueuedMessage(idMsg1).get())).isEqualTo(queuedMessages.get(0));
        Assertions.assertThat((Object)((QueuedMessage)queuedMessages.get(0)).getMessage()).usingRecursiveComparison().isEqualTo((Object)message1);
        Assertions.assertThat((CharSequence)((QueuedMessage)queuedMessages.get(0)).getId()).isEqualTo((Object)idMsg1);
        Assertions.assertThat((CharSequence)((QueuedMessage)queuedMessages.get(0)).getQueueName()).isEqualTo((Object)queueName);
        Assertions.assertThat((OffsetDateTime)((QueuedMessage)queuedMessages.get(0)).getAddedTimestamp()).isBefore(OffsetDateTime.now(Clock.systemUTC()));
        Assertions.assertThat((OffsetDateTime)((QueuedMessage)queuedMessages.get(0)).getNextDeliveryTimestamp()).isBefore(OffsetDateTime.now(Clock.systemUTC()));
        Assertions.assertThat((boolean)((QueuedMessage)queuedMessages.get(0)).isDeadLetterMessage()).isFalse();
        Assertions.assertThat((String)((QueuedMessage)queuedMessages.get(0)).getLastDeliveryError()).isEqualTo(null);
        Assertions.assertThat((int)((QueuedMessage)queuedMessages.get(0)).getRedeliveryAttempts()).isEqualTo(0);
        Assertions.assertThat((int)((QueuedMessage)queuedMessages.get(0)).getTotalDeliveryAttempts()).isEqualTo(0);
        var msgHandler = new QueuedMessageHandler(){
            List<QueuedMessage> messages = new CopyOnWriteArrayList<QueuedMessage>();

            public void handle(QueuedMessage message) {
                this.messages.add(message);
                if (message.getTotalDeliveryAttempts() < 3) {
                    message.markForRedeliveryIn(Duration.ofMillis(150L));
                }
            }
        };
        this.durableQueues.consumeFromQueue(ConsumeFromQueue.builder().setQueueName(queueName).setRedeliveryPolicy(RedeliveryPolicy.fixedBackoff().setRedeliveryDelay(Duration.ofMillis(200L)).setMaximumNumberOfRedeliveries(5).build()).setParallelConsumers(1).setQueueMessageHandler(msgHandler).build());
        Assertions.assertThat((Collection)this.durableQueues.getQueueNames()).isEqualTo(Set.of(queueName));
        Awaitility.waitAtMost((Duration)Duration.ofSeconds(2L)).untilAsserted(() -> Assertions.assertThat(msgHandler.messages).hasSize(3));
        Awaitility.waitAtMost((Duration)Duration.ofSeconds(2L)).untilAsserted(() -> Assertions.assertThat((long)this.durableQueues.getTotalMessagesQueuedFor(queueName)).isEqualTo(0L));
        Assertions.assertThat((long)this.durableQueues.getTotalDeadLetterMessagesQueuedFor(queueName)).isEqualTo(0L);
        Assertions.assertThat((Object)this.durableQueues.getQueuedMessageCountsFor(queueName)).isEqualTo((Object)new QueuedMessageCounts(queueName, 0L, 0L));
        Assertions.assertThat((String)msgHandler.messages.get(0).getLastDeliveryError()).isNull();
        Assertions.assertThat((boolean)msgHandler.messages.get(0).isManuallyMarkedForRedelivery()).isTrue();
        Assertions.assertThat((boolean)msgHandler.messages.get(1).isManuallyMarkedForRedelivery()).isTrue();
        Assertions.assertThat((String)msgHandler.messages.get(1).getLastDeliveryError()).isEqualTo("Manually requested redelivery");
        Assertions.assertThat((String)msgHandler.messages.get(2).getLastDeliveryError()).isEqualTo("Manually requested redelivery");
        Assertions.assertThat((boolean)msgHandler.messages.get(2).isManuallyMarkedForRedelivery()).isFalse();
    }

    static class RecordingQueuedMessageHandler
    implements QueuedMessageHandler {
        Consumer<Message> functionLogic;
        ConcurrentLinkedQueue<Message> messages = new ConcurrentLinkedQueue();

        RecordingQueuedMessageHandler() {
        }

        RecordingQueuedMessageHandler(Consumer<Message> functionLogic) {
            this.functionLogic = functionLogic;
        }

        public void handle(QueuedMessage message) {
            this.messages.add(message.getMessage());
            if (this.functionLogic != null) {
                this.functionLogic.accept(message.getMessage());
            }
        }
    }
}

