/*
 * Decompiled with CFR 0.152.
 */
package dk.cloudcreate.essentials.components.foundation.test.messaging.queue;

import dk.cloudcreate.essentials.components.foundation.messaging.RedeliveryPolicy;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueueConsumer;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueues;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueueName;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueuedMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueuedMessageHandler;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.TransactionalMode;
import dk.cloudcreate.essentials.components.foundation.test.messaging.queue.test_data.CustomerId;
import dk.cloudcreate.essentials.components.foundation.test.messaging.queue.test_data.OrderEvent;
import dk.cloudcreate.essentials.components.foundation.test.messaging.queue.test_data.OrderId;
import dk.cloudcreate.essentials.components.foundation.test.messaging.queue.test_data.ProductId;
import dk.cloudcreate.essentials.components.foundation.transaction.UnitOfWork;
import dk.cloudcreate.essentials.components.foundation.transaction.UnitOfWorkFactory;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.SoftAssertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public abstract class DistributedCompetingConsumersDurableQueuesIT<DURABLE_QUEUES extends DurableQueues, UOW extends UnitOfWork, UOW_FACTORY extends UnitOfWorkFactory<UOW>> {
    public static final int NUMBER_OF_MESSAGES = 1000;
    public static final int PARALLEL_CONSUMERS = 20;
    private UOW_FACTORY unitOfWorkFactory;
    private DURABLE_QUEUES durableQueues1;
    private DURABLE_QUEUES durableQueues2;

    @BeforeEach
    void setup() {
        this.unitOfWorkFactory = this.createUnitOfWorkFactory();
        this.resetQueueStorage(this.unitOfWorkFactory);
        this.durableQueues1 = this.createDurableQueues(this.unitOfWorkFactory);
        this.durableQueues1.start();
        this.durableQueues2 = this.createDurableQueues(this.unitOfWorkFactory);
        this.durableQueues2.start();
    }

    @AfterEach
    void cleanup() {
        if (this.durableQueues1 != null) {
            this.durableQueues1.stop();
        }
        if (this.durableQueues2 != null) {
            this.durableQueues2.stop();
        }
    }

    protected abstract DURABLE_QUEUES createDurableQueues(UOW_FACTORY var1);

    protected abstract UOW_FACTORY createUnitOfWorkFactory();

    protected abstract void resetQueueStorage(UOW_FACTORY var1);

    protected void usingDurableQueue(Runnable action) {
        if (this.durableQueues1.getTransactionalMode() == TransactionalMode.FullyTransactional) {
            this.unitOfWorkFactory.usingUnitOfWork(uow -> action.run());
        } else {
            action.run();
        }
    }

    @Test
    void verify_queued_messages_are_dequeued_in_order() {
        Random random = new Random();
        QueueName queueName = QueueName.of((CharSequence)"TestQueue");
        this.durableQueues1.purgeQueue(queueName);
        int numberOfMessages = 1000;
        ArrayList<OrderEvent.OrderAdded> messages = new ArrayList<OrderEvent.OrderAdded>(numberOfMessages);
        for (int i = 0; i < numberOfMessages; ++i) {
            OrderEvent message = i % 2 == 0 ? new OrderEvent.OrderAdded(OrderId.random(), CustomerId.random(), random.nextInt()) : (i % 3 == 0 ? new OrderEvent.ProductAddedToOrder(OrderId.random(), ProductId.random(), random.nextInt()) : new OrderEvent.OrderAccepted(OrderId.random()));
            messages.add((OrderEvent.OrderAdded)message);
        }
        this.usingDurableQueue(() -> this.durableQueues1.queueMessages(queueName, (List)messages));
        Assertions.assertThat((long)this.durableQueues1.getTotalMessagesQueuedFor(queueName)).isEqualTo((long)numberOfMessages);
        Assertions.assertThat((long)this.durableQueues2.getTotalMessagesQueuedFor(queueName)).isEqualTo((long)numberOfMessages);
        RecordingQueuedMessageHandler recordingQueueMessageHandler1 = new RecordingQueuedMessageHandler();
        RecordingQueuedMessageHandler recordingQueueMessageHandler2 = new RecordingQueuedMessageHandler();
        DurableQueueConsumer consumer1 = this.durableQueues1.consumeFromQueue(queueName, RedeliveryPolicy.fixedBackoff((Duration)Duration.ofMillis(1L), (int)5), 10, (QueuedMessageHandler)recordingQueueMessageHandler1);
        DurableQueueConsumer consumer2 = this.durableQueues2.consumeFromQueue(queueName, RedeliveryPolicy.fixedBackoff((Duration)Duration.ofMillis(1L), (int)5), 10, (QueuedMessageHandler)recordingQueueMessageHandler2);
        Awaitility.waitAtMost((Duration)Duration.ofSeconds(30L)).untilAsserted(() -> Assertions.assertThat((int)(recordingQueueMessageHandler1.messages.size() + recordingQueueMessageHandler2.messages.size())).isEqualTo(1000));
        Assertions.assertThat((int)recordingQueueMessageHandler1.messages.size()).isGreaterThan(0);
        Assertions.assertThat((int)recordingQueueMessageHandler2.messages.size()).isGreaterThan(0);
        ArrayList<OrderEvent> receivedMessages = new ArrayList<OrderEvent>(recordingQueueMessageHandler1.messages);
        receivedMessages.addAll(recordingQueueMessageHandler2.messages);
        SoftAssertions softAssertions = new SoftAssertions();
        softAssertions.assertThat(receivedMessages.stream().distinct().count()).isEqualTo(1000L);
        softAssertions.assertThat(receivedMessages).containsAll(messages);
        softAssertions.assertThat(messages).containsAll(receivedMessages);
        softAssertions.assertAll();
        Awaitility.await().during(Duration.ofMillis(1990L)).atMost(Duration.ofSeconds(2000L)).until(() -> recordingQueueMessageHandler1.messages.size() + recordingQueueMessageHandler2.messages.size() == 1000);
        consumer1.cancel();
        consumer2.cancel();
    }

    private static class RecordingQueuedMessageHandler
    implements QueuedMessageHandler {
        ConcurrentLinkedQueue<OrderEvent> messages = new ConcurrentLinkedQueue();

        private RecordingQueuedMessageHandler() {
        }

        public void handle(QueuedMessage message) {
            this.messages.add((OrderEvent)message.getPayload());
        }
    }
}

