/*
 * Decompiled with CFR 0.152.
 */
package dk.cloudcreate.essentials.components.foundation.test.reactive.command;

import dk.cloudcreate.essentials.components.foundation.messaging.RedeliveryPolicy;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueues;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueuedMessage;
import dk.cloudcreate.essentials.components.foundation.reactive.command.DurableLocalCommandBus;
import dk.cloudcreate.essentials.components.foundation.transaction.UnitOfWork;
import dk.cloudcreate.essentials.components.foundation.transaction.UnitOfWorkFactory;
import dk.cloudcreate.essentials.reactive.command.CommandHandler;
import dk.cloudcreate.essentials.reactive.command.MultipleCommandHandlersFoundException;
import dk.cloudcreate.essentials.reactive.command.NoCommandHandlerFoundException;
import dk.cloudcreate.essentials.reactive.command.SendAndDontWaitErrorHandler;
import dk.cloudcreate.essentials.shared.Exceptions;
import dk.cloudcreate.essentials.shared.collections.Lists;
import java.time.Duration;
import java.util.List;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractDurableLocalCommandBusIT<DURABLE_QUEUES extends DurableQueues, UOW extends UnitOfWork, UOW_FACTORY extends UnitOfWorkFactory<UOW>> {
    public static final String ON_PURPOSE = "On purpose";
    protected UOW_FACTORY unitOfWorkFactory;
    protected DURABLE_QUEUES durableQueues;
    protected DurableLocalCommandBus commandBus;
    protected TestSendAndDontWaitErrorHandler errorHandler;

    @BeforeEach
    void setup() {
        this.unitOfWorkFactory = this.createUnitOfWorkFactory();
        this.durableQueues = this.createDurableQueues(this.unitOfWorkFactory);
        this.durableQueues.start();
        this.errorHandler = new TestSendAndDontWaitErrorHandler();
        this.commandBus = DurableLocalCommandBus.builder().setDurableQueues(this.durableQueues).setSendAndDontWaitErrorHandler((SendAndDontWaitErrorHandler)this.errorHandler).setCommandQueueName(DurableLocalCommandBus.DEFAULT_COMMAND_QUEUE_NAME).setCommandQueueRedeliveryPolicy(RedeliveryPolicy.fixedBackoff((Duration)Duration.ofMillis(100L), (int)1)).build();
        this.commandBus.start();
    }

    @AfterEach
    void cleanup() {
        if (this.commandBus != null) {
            this.commandBus.stop();
        }
        if (this.durableQueues != null) {
            this.durableQueues.stop();
        }
    }

    protected abstract DURABLE_QUEUES createDurableQueues(UOW_FACTORY var1);

    protected abstract UOW_FACTORY createUnitOfWorkFactory();

    @Test
    void test_sync_send() {
        TestCommandHandler cmdHandler = new TestCommandHandler(String.class);
        this.commandBus.addCommandHandler((CommandHandler)cmdHandler);
        Object result = this.commandBus.send((Object)"Hello World");
        Assertions.assertThat((Object)result).isEqualTo((Object)"test");
        Assertions.assertThat((Object)cmdHandler.receivedCommand).isEqualTo((Object)"Hello World");
    }

    @Test
    void test_sync_send_with_command_processing_exception() {
        ExceptionThrowingCommandHandler cmdHandler = new ExceptionThrowingCommandHandler();
        this.commandBus.addCommandHandler((CommandHandler)cmdHandler);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.commandBus.send((Object)"Hello World")).isInstanceOf(RuntimeException.class)).hasMessage(ON_PURPOSE);
    }

    @Test
    void test_async_send() {
        TestCommandHandler cmdHandler = new TestCommandHandler(String.class);
        this.commandBus.addCommandHandler((CommandHandler)cmdHandler);
        Object result = this.commandBus.sendAsync((Object)"Hello World").block(Duration.ofMillis(1000L));
        Assertions.assertThat((Object)result).isEqualTo((Object)"test");
        Assertions.assertThat((Object)cmdHandler.receivedCommand).isEqualTo((Object)"Hello World");
    }

    @Test
    void test_sendAndDontWait_with_managed_transaction() {
        TestCommandHandler cmdHandler = new TestCommandHandler(String.class);
        this.commandBus.addCommandHandler((CommandHandler)cmdHandler);
        this.unitOfWorkFactory.usingUnitOfWork(() -> this.commandBus.sendAndDontWait((Object)"Hello World"));
        Awaitility.waitAtMost((Duration)Duration.ofMillis(500L)).untilAsserted(() -> Assertions.assertThat((Object)cmdHandler.receivedCommand).isEqualTo((Object)"Hello World"));
    }

    @Test
    void test_sendAndDontWait() {
        TestCommandHandler cmdHandler = new TestCommandHandler(String.class);
        this.commandBus.addCommandHandler((CommandHandler)cmdHandler);
        this.commandBus.sendAndDontWait((Object)"Hello World");
        Awaitility.waitAtMost((Duration)Duration.ofMillis(2000L)).untilAsserted(() -> Assertions.assertThat((Object)cmdHandler.receivedCommand).isEqualTo((Object)"Hello World"));
    }

    @Test
    void test_sendAndDontWait_with_managed_transaction_and_with_error() {
        ExceptionThrowingCommandHandler cmdHandler = new ExceptionThrowingCommandHandler();
        this.commandBus.addCommandHandler((CommandHandler)cmdHandler);
        String command = "Hello World";
        int errorQueueSizePrior = this.durableQueues.getDeadLetterMessages(this.commandBus.getCommandQueueName(), DurableQueues.QueueingSortOrder.ASC, 0L, 10L).size();
        this.commandBus.sendAndDontWait((Object)command);
        Awaitility.waitAtMost((Duration)Duration.ofMillis(500L)).untilAsserted(() -> Assertions.assertThat((Throwable)this.errorHandler.exception).isNotNull());
        Assertions.assertThat((Throwable)this.errorHandler.exception).isInstanceOf(RuntimeException.class);
        Assertions.assertThat((Throwable)this.errorHandler.exception).hasMessage(ON_PURPOSE);
        Assertions.assertThat((Object)this.errorHandler.command.getPayload()).isEqualTo((Object)command);
        Assertions.assertThat((Object)this.errorHandler.commandHandler).isEqualTo((Object)cmdHandler);
        Awaitility.waitAtMost((Duration)Duration.ofMillis(1000L)).untilAsserted(() -> Assertions.assertThat((List)this.durableQueues.getDeadLetterMessages(this.commandBus.getCommandQueueName(), DurableQueues.QueueingSortOrder.ASC, 0L, 10L)).hasSize(errorQueueSizePrior + 1));
        Assertions.assertThat((Object)((QueuedMessage)Lists.last((List)this.durableQueues.getDeadLetterMessages(this.commandBus.getCommandQueueName(), DurableQueues.QueueingSortOrder.ASC, 0L, 10L)).get()).getMessage().getPayload()).isEqualTo((Object)command);
    }

    @Test
    void test_sendAndDontWait_with_error() {
        ExceptionThrowingCommandHandler cmdHandler = new ExceptionThrowingCommandHandler();
        this.commandBus.addCommandHandler((CommandHandler)cmdHandler);
        String command = "Hello World";
        int errorQueueSizePrior = this.durableQueues.getDeadLetterMessages(this.commandBus.getCommandQueueName(), DurableQueues.QueueingSortOrder.ASC, 0L, 10L).size();
        this.unitOfWorkFactory.usingUnitOfWork(() -> this.commandBus.sendAndDontWait((Object)command));
        Awaitility.waitAtMost((Duration)Duration.ofMillis(500L)).untilAsserted(() -> Assertions.assertThat((Throwable)this.errorHandler.exception).isNotNull());
        Assertions.assertThat((Throwable)this.errorHandler.exception).isInstanceOf(RuntimeException.class);
        Assertions.assertThat((Throwable)this.errorHandler.exception).hasMessage(ON_PURPOSE);
        Assertions.assertThat((Object)this.errorHandler.command.getPayload()).isEqualTo((Object)command);
        Assertions.assertThat((Object)this.errorHandler.commandHandler).isEqualTo((Object)cmdHandler);
        Awaitility.waitAtMost((Duration)Duration.ofMillis(1000L)).untilAsserted(() -> Assertions.assertThat((List)this.durableQueues.getDeadLetterMessages(this.commandBus.getCommandQueueName(), DurableQueues.QueueingSortOrder.ASC, 0L, 10L)).hasSize(errorQueueSizePrior + 1));
        Assertions.assertThat((Object)((QueuedMessage)Lists.last((List)this.durableQueues.getDeadLetterMessages(this.commandBus.getCommandQueueName(), DurableQueues.QueueingSortOrder.ASC, 0L, 10L)).get()).getMessage().getPayload()).isEqualTo((Object)command);
    }

    @Test
    void test_sendAndDontWait_with_delay() {
        TestCommandHandler cmdHandler = new TestCommandHandler(String.class);
        this.commandBus.addCommandHandler((CommandHandler)cmdHandler);
        this.unitOfWorkFactory.usingUnitOfWork(() -> this.commandBus.sendAndDontWait((Object)"Hello World", Duration.ofMillis(1000L)));
        Awaitility.await().atLeast(Duration.ofMillis(500L)).untilAsserted(() -> Assertions.assertThat((Object)cmdHandler.receivedCommand).isNotNull());
        Awaitility.waitAtMost((Duration)Duration.ofMillis(600L)).untilAsserted(() -> Assertions.assertThat((Object)cmdHandler.receivedCommand).isEqualTo((Object)"Hello World"));
    }

    @Test
    void test_sendAndDontWait_with_delay_and_error() {
        ExceptionThrowingCommandHandler cmdHandler = new ExceptionThrowingCommandHandler();
        this.commandBus.addCommandHandler((CommandHandler)cmdHandler);
        String command = "Hello World with delay";
        int errorQueueSizePrior = this.durableQueues.getDeadLetterMessages(this.commandBus.getCommandQueueName(), DurableQueues.QueueingSortOrder.ASC, 0L, 10L).size();
        this.unitOfWorkFactory.usingUnitOfWork(() -> this.commandBus.sendAndDontWait((Object)command, Duration.ofMillis(1000L)));
        Awaitility.await().atLeast(Duration.ofMillis(500L)).untilAsserted(() -> Assertions.assertThat((Throwable)this.errorHandler.exception).isNotNull());
        Awaitility.waitAtMost((Duration)Duration.ofMillis(600L)).untilAsserted(() -> Assertions.assertThat((Throwable)this.errorHandler.exception).isNotNull());
        Assertions.assertThat((Throwable)this.errorHandler.exception).isInstanceOf(RuntimeException.class);
        Assertions.assertThat((Throwable)this.errorHandler.exception).hasMessage(ON_PURPOSE);
        Assertions.assertThat((Object)this.errorHandler.command.getPayload()).isEqualTo((Object)command);
        Assertions.assertThat((Object)this.errorHandler.commandHandler).isEqualTo((Object)cmdHandler);
        Awaitility.waitAtMost((Duration)Duration.ofMillis(1000L)).untilAsserted(() -> Assertions.assertThat((List)this.durableQueues.getDeadLetterMessages(this.commandBus.getCommandQueueName(), DurableQueues.QueueingSortOrder.ASC, 0L, 10L)).hasSize(errorQueueSizePrior + 1));
        Assertions.assertThat((Object)((QueuedMessage)Lists.last((List)this.durableQueues.getDeadLetterMessages(this.commandBus.getCommandQueueName(), DurableQueues.QueueingSortOrder.ASC, 0L, 10L)).get()).getMessage().getPayload()).isEqualTo((Object)command);
    }

    @Test
    void test_async_send_with_command_processing_exception() {
        ExceptionThrowingCommandHandler cmdHandler = new ExceptionThrowingCommandHandler();
        this.commandBus.addCommandHandler((CommandHandler)cmdHandler);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.commandBus.sendAsync((Object)"Hello World").block(Duration.ofMillis(1000L))).isInstanceOf(RuntimeException.class)).hasMessage(ON_PURPOSE);
    }

    @Test
    void test_no_matching_command_handler() {
        TestCommandHandler longCmdHandler = new TestCommandHandler(Long.class);
        this.commandBus.addCommandHandler((CommandHandler)longCmdHandler);
        Assertions.assertThatThrownBy(() -> this.commandBus.send((Object)"Hello World")).isInstanceOf(NoCommandHandlerFoundException.class);
    }

    @Test
    void test_multiple_matching_command_handlers() {
        TestCommandHandler longCmdHandler = new TestCommandHandler(Long.class);
        this.commandBus.addCommandHandler((CommandHandler)longCmdHandler);
        TestCommandHandler longCmd2Handler = new TestCommandHandler(Long.class);
        this.commandBus.addCommandHandler((CommandHandler)longCmd2Handler);
        Assertions.assertThatThrownBy(() -> this.commandBus.send((Object)10L)).isInstanceOf(MultipleCommandHandlersFoundException.class);
    }

    private static class TestSendAndDontWaitErrorHandler
    implements SendAndDontWaitErrorHandler {
        private Throwable exception;
        private QueuedMessage command;
        private CommandHandler commandHandler;

        private TestSendAndDontWaitErrorHandler() {
        }

        public void handleError(Throwable exception, Object command, CommandHandler commandHandler) {
            this.exception = exception;
            this.command = (QueuedMessage)command;
            this.commandHandler = commandHandler;
            Exceptions.sneakyThrow((Throwable)exception);
        }
    }

    private static class TestCommandHandler
    implements CommandHandler {
        private static Logger log = LoggerFactory.getLogger(TestCommandHandler.class);
        public static final String TEST = "test";
        private final Class<?> canHandleCommandsOfType;
        private Object receivedCommand;

        private TestCommandHandler(Class<?> canHandleCommandsOfType) {
            this.canHandleCommandsOfType = canHandleCommandsOfType;
        }

        public boolean canHandle(Class<?> commandType) {
            return this.canHandleCommandsOfType.isAssignableFrom(commandType);
        }

        public Object handle(Object command) {
            log.info("Received command: {}", command);
            this.receivedCommand = command;
            return TEST;
        }
    }

    private static class ExceptionThrowingCommandHandler
    implements CommandHandler {
        private static Logger log = LoggerFactory.getLogger(ExceptionThrowingCommandHandler.class);

        private ExceptionThrowingCommandHandler() {
        }

        public boolean canHandle(Class<?> commandType) {
            return true;
        }

        public Object handle(Object command) {
            log.info("Received command '{}', will now throw a RuntimeException", command);
            throw new RuntimeException(AbstractDurableLocalCommandBusIT.ON_PURPOSE);
        }
    }
}

