/*
 * Decompiled with CFR 0.152.
 */
package dk.cloudcreate.essentials.components.foundation.test.messaging.queue;

import dk.cloudcreate.essentials.components.foundation.messaging.RedeliveryPolicy;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueueConsumer;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueues;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.Message;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.MessageMetaData;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueueName;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueuedMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueuedMessageHandler;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.ConsumeFromQueue;
import dk.cloudcreate.essentials.components.foundation.test.messaging.queue.test_data.CustomerId;
import dk.cloudcreate.essentials.components.foundation.test.messaging.queue.test_data.OrderEvent;
import dk.cloudcreate.essentials.components.foundation.test.messaging.queue.test_data.OrderId;
import dk.cloudcreate.essentials.components.foundation.transaction.UnitOfWork;
import dk.cloudcreate.essentials.components.foundation.transaction.UnitOfWorkFactory;
import dk.cloudcreate.essentials.components.foundation.types.CorrelationId;
import dk.cloudcreate.essentials.shared.MessageFormatter;
import dk.cloudcreate.essentials.shared.time.StopWatch;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public abstract class DurableQueuesLoadIT<DURABLE_QUEUES extends DurableQueues, UOW extends UnitOfWork, UOW_FACTORY extends UnitOfWorkFactory<UOW>> {
    protected UOW_FACTORY unitOfWorkFactory;
    protected DURABLE_QUEUES durableQueues;
    private DurableQueueConsumer consumer;

    @BeforeEach
    void setup() {
        this.unitOfWorkFactory = this.createUnitOfWorkFactory();
        this.resetQueueStorage(this.unitOfWorkFactory);
        this.durableQueues = this.createDurableQueues(this.unitOfWorkFactory);
        this.durableQueues.start();
    }

    @AfterEach
    void cleanup() {
        if (this.consumer != null) {
            this.consumer.cancel();
        }
        if (this.durableQueues != null) {
            this.durableQueues.stop();
        }
    }

    protected abstract DURABLE_QUEUES createDurableQueues(UOW_FACTORY var1);

    protected abstract UOW_FACTORY createUnitOfWorkFactory();

    protected abstract void resetQueueStorage(UOW_FACTORY var1);

    @Test
    void queue_a_large_number_of_messages() {
        QueueName queueName = QueueName.of((CharSequence)"TestQueue");
        Instant now = Instant.now();
        RecordingQueuedMessageHandler msgHandler = new RecordingQueuedMessageHandler();
        this.consumer = this.durableQueues.consumeFromQueue(ConsumeFromQueue.builder().setQueueName(queueName).setRedeliveryPolicy(RedeliveryPolicy.fixedBackoff((Duration)Duration.ofMillis(100L), (int)0)).setParallelConsumers(1).setConsumerName("TestConsumer").setQueueMessageHandler((QueuedMessageHandler)msgHandler).build());
        int count = 20000;
        StopWatch stopwatch = StopWatch.start();
        this.unitOfWorkFactory.usingUnitOfWork(uow -> IntStream.range(0, count).forEach(i -> this.durableQueues.queueMessage(queueName, Message.of((Object)new OrderEvent.OrderAdded(OrderId.random(), CustomerId.random(), count + 1), (MessageMetaData)MessageMetaData.of((String)"correlation_id", (Object)CorrelationId.random(), (String)"trace_id", (Object)UUID.randomUUID().toString())))));
        System.out.println(MessageFormatter.msg((String)"Queueing {} messages took {}", (Object[])new Object[]{count, stopwatch.stop()}));
        Assertions.assertThat((long)this.durableQueues.getTotalMessagesQueuedFor(queueName)).isEqualTo((long)count);
        List nextMessages = this.durableQueues.queryForMessagesSoonReadyForDelivery(queueName, now, 10);
        Assertions.assertThat((List)nextMessages).hasSize(10);
        Awaitility.waitAtMost((Duration)Duration.ofSeconds(5L)).untilAsserted(() -> Assertions.assertThat((long)msgHandler.messagesReceived.get()).isGreaterThan(10L));
        this.consumer.cancel();
        this.consumer = null;
    }

    static class RecordingQueuedMessageHandler
    implements QueuedMessageHandler {
        AtomicLong messagesReceived = new AtomicLong();

        RecordingQueuedMessageHandler() {
        }

        public void handle(QueuedMessage message) {
            this.messagesReceived.getAndIncrement();
        }
    }
}

