/*
 * Decompiled with CFR 0.152.
 */
package dk.cloudcreate.essentials.components.foundation.test.messaging.queue;

import dk.cloudcreate.essentials.components.foundation.messaging.RedeliveryPolicy;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueueConsumer;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueues;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.Message;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.MessageMetaData;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueueName;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueuedMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueuedMessageCounts;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueuedMessageHandler;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.TransactionalMode;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.ConsumeFromQueue;
import dk.cloudcreate.essentials.components.foundation.test.messaging.queue.test_data.CustomerId;
import dk.cloudcreate.essentials.components.foundation.test.messaging.queue.test_data.OrderEvent;
import dk.cloudcreate.essentials.components.foundation.test.messaging.queue.test_data.OrderId;
import dk.cloudcreate.essentials.components.foundation.test.messaging.queue.test_data.ProductId;
import dk.cloudcreate.essentials.components.foundation.transaction.UnitOfWork;
import dk.cloudcreate.essentials.components.foundation.transaction.UnitOfWorkFactory;
import dk.cloudcreate.essentials.components.foundation.types.CorrelationId;
import dk.cloudcreate.essentials.shared.MessageFormatter;
import dk.cloudcreate.essentials.shared.concurrent.ThreadFactoryBuilder;
import dk.cloudcreate.essentials.shared.time.StopWatch;
import dk.cloudcreate.essentials.shared.time.Timing;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.SoftAssertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public abstract class LocalCompetingConsumersDurableQueueIT<DURABLE_QUEUES extends DurableQueues, UOW extends UnitOfWork, UOW_FACTORY extends UnitOfWorkFactory<UOW>> {
    public static final int NUMBER_OF_MESSAGES = 2000;
    public static final int PARALLEL_CONSUMERS = 20;
    private UOW_FACTORY unitOfWorkFactory;
    private DURABLE_QUEUES durableQueues;

    @BeforeEach
    void setup() {
        this.unitOfWorkFactory = this.createUnitOfWorkFactory();
        this.resetQueueStorage(this.unitOfWorkFactory);
        this.durableQueues = this.createDurableQueues(this.unitOfWorkFactory);
        this.durableQueues.start();
    }

    @AfterEach
    void cleanup() {
        if (this.durableQueues != null) {
            this.durableQueues.stop();
        }
    }

    protected abstract DURABLE_QUEUES createDurableQueues(UOW_FACTORY var1);

    protected abstract UOW_FACTORY createUnitOfWorkFactory();

    protected abstract void resetQueueStorage(UOW_FACTORY var1);

    protected Timing usingDurableQueue(String description, Runnable action) {
        StopWatch stopWatch = StopWatch.start((String)description);
        if (this.durableQueues.getTransactionalMode() == TransactionalMode.FullyTransactional) {
            this.unitOfWorkFactory.usingUnitOfWork(uow -> action.run());
        } else {
            action.run();
        }
        Timing timing = stopWatch.stop();
        System.out.println(timing);
        return timing;
    }

    @Test
    void verify_queued_messages_are_dequeued_in_order() throws InterruptedException {
        Random random = new Random();
        QueueName queueName = QueueName.of((CharSequence)"TestQueue");
        this.durableQueues.purgeQueue(queueName);
        int numberOfMessages = 2000;
        ArrayList<Message> messages = new ArrayList<Message>(numberOfMessages);
        for (int i = 0; i < numberOfMessages; ++i) {
            Message message = i % 2 == 0 ? Message.of((Object)new OrderEvent.OrderAdded(OrderId.random(), CustomerId.random(), random.nextInt()), (MessageMetaData)MessageMetaData.of((String)"correlation_id", (Object)CorrelationId.random(), (String)"trace_id", (Object)UUID.randomUUID().toString())) : (i % 3 == 0 ? Message.of((Object)new OrderEvent.ProductAddedToOrder(OrderId.random(), ProductId.random(), random.nextInt()), (MessageMetaData)MessageMetaData.of((String)"correlation_id", (Object)CorrelationId.random(), (String)"trace_id", (Object)UUID.randomUUID().toString())) : Message.of((Object)new OrderEvent.OrderAccepted(OrderId.random()), (MessageMetaData)MessageMetaData.of((String)"correlation_id", (Object)CorrelationId.random(), (String)"trace_id", (Object)UUID.randomUUID().toString())));
            messages.add(message);
        }
        ArrayList<Timing> timings = new ArrayList<Timing>();
        timings.add(this.usingDurableQueue(MessageFormatter.msg((String)"Queuing {} messages", (Object[])new Object[]{numberOfMessages}), () -> this.durableQueues.queueMessages(queueName, (List)messages)));
        Assertions.assertThat((long)this.durableQueues.getTotalMessagesQueuedFor(queueName)).isEqualTo((long)numberOfMessages);
        Assertions.assertThat((Object)this.durableQueues.getQueuedMessageCountsFor(queueName)).isEqualTo((Object)new QueuedMessageCounts(queueName, (long)numberOfMessages, 0L));
        RecordingQueuedMessageHandler recordingQueueMessageHandler = new RecordingQueuedMessageHandler();
        StopWatch stopWatch = StopWatch.start((String)MessageFormatter.msg((String)"Consuming {} messages using {} parallel consumers", (Object[])new Object[]{numberOfMessages, 20}));
        DurableQueueConsumer consumer = this.durableQueues.consumeFromQueue(ConsumeFromQueue.builder().setQueueName(queueName).setRedeliveryPolicy(RedeliveryPolicy.fixedBackoff((Duration)Duration.ofMillis(1L), (int)5)).setQueueMessageHandler((QueuedMessageHandler)recordingQueueMessageHandler).setParallelConsumers(20).setConsumerExecutorService(Executors.newScheduledThreadPool(20, ThreadFactoryBuilder.builder().daemon(true).nameFormat(String.valueOf(queueName) + "-Consume-Messages-%d").build())).build());
        Awaitility.waitAtMost((Duration)Duration.ofSeconds(60L)).untilAsserted(() -> Assertions.assertThat((int)recordingQueueMessageHandler.messages.size()).isEqualTo(2000));
        Timing timing = stopWatch.stop();
        timings.add(timing);
        System.out.println(timing);
        ArrayList<Message> receivedMessages = new ArrayList<Message>(recordingQueueMessageHandler.messages);
        SoftAssertions softAssertions = new SoftAssertions();
        softAssertions.assertThat(receivedMessages.stream().distinct().count()).isEqualTo(2000L);
        softAssertions.assertThat(receivedMessages).containsAll(messages);
        softAssertions.assertThat(messages).containsAll(receivedMessages);
        softAssertions.assertAll();
        Awaitility.await().during(Duration.ofMillis(1990L)).atMost(Duration.ofSeconds(2000L)).until(() -> recordingQueueMessageHandler.messages.size() == 2000);
        consumer.cancel();
        System.out.println(timings);
    }

    private static class RecordingQueuedMessageHandler
    implements QueuedMessageHandler {
        ConcurrentLinkedQueue<Message> messages = new ConcurrentLinkedQueue();

        private RecordingQueuedMessageHandler() {
        }

        public void handle(QueuedMessage message) {
            this.messages.add(message.getMessage());
        }
    }
}

