/*
 * Decompiled with CFR 0.152.
 */
package cn.boboweike.carrot.server;

import ch.qos.logback.LoggerAssert;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
import cn.boboweike.carrot.CarrotException;
import cn.boboweike.carrot.configuration.Carrot;
import cn.boboweike.carrot.fixtures.CarrotAssertions;
import cn.boboweike.carrot.fixtures.stubs.StaticTestService;
import cn.boboweike.carrot.fixtures.stubs.TestService;
import cn.boboweike.carrot.fixtures.stubs.TestServiceForIoC;
import cn.boboweike.carrot.fixtures.tasks.TaskTestBuilder;
import cn.boboweike.carrot.fixtures.tasks.stubs.SimpleTaskActivator;
import cn.boboweike.carrot.fixtures.utils.SleepUtils;
import cn.boboweike.carrot.scheduling.BackgroundTask;
import cn.boboweike.carrot.server.BackgroundTaskServer;
import cn.boboweike.carrot.server.BackgroundTaskServerConfiguration;
import cn.boboweike.carrot.server.TaskActivator;
import cn.boboweike.carrot.server.runner.BackgroundStaticTaskWithoutIocRunner;
import cn.boboweike.carrot.server.runner.BackgroundTaskWithIocRunner;
import cn.boboweike.carrot.server.runner.BackgroundTaskWithoutIocRunner;
import cn.boboweike.carrot.storage.BackgroundTaskServerStatus;
import cn.boboweike.carrot.storage.InMemoryPartitionedStorageProvider;
import cn.boboweike.carrot.storage.PartitionedStorageProvider;
import cn.boboweike.carrot.storage.StorageException;
import cn.boboweike.carrot.tasks.Task;
import cn.boboweike.carrot.tasks.TaskId;
import cn.boboweike.carrot.tasks.lambdas.IocTaskLambda;
import cn.boboweike.carrot.tasks.lambdas.TaskLambda;
import cn.boboweike.carrot.tasks.states.ProcessingState;
import cn.boboweike.carrot.tasks.states.StateName;
import cn.boboweike.carrot.tasks.states.TaskState;
import java.io.Serializable;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.Temporal;
import java.time.temporal.TemporalUnit;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectAssert;
import org.assertj.core.data.TemporalOffset;
import org.awaitility.Awaitility;
import org.awaitility.Durations;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.internal.stubbing.answers.AnswersWithDelay;
import org.mockito.internal.stubbing.answers.ThrowsException;
import org.mockito.stubbing.Answer;

public class BackgroundTaskServerTest {
    private TestService testService;
    private PartitionedStorageProvider storageProvider;
    private BackgroundTaskServer backgroundTaskServer;
    private TestServiceForIoC testServiceForIoC;
    private SimpleTaskActivator taskActivator;
    private ListAppender<ILoggingEvent> logger;
    private static final Integer PARTITION0 = 0;

    @BeforeEach
    void setUpTestService() {
        this.testService = new TestService();
        this.testServiceForIoC = new TestServiceForIoC("an argument");
        this.testService.reset();
        this.testServiceForIoC.reset();
        this.storageProvider = (PartitionedStorageProvider)Mockito.spy((Object)new InMemoryPartitionedStorageProvider());
        this.taskActivator = new SimpleTaskActivator(this.testServiceForIoC);
        Carrot.configure().useTaskActivator((TaskActivator)this.taskActivator).useStorageProvider(this.storageProvider).useBackgroundTaskServer(BackgroundTaskServerConfiguration.usingStandardBackgroundTaskServerConfiguration().andPollIntervalInSeconds(5), false).initialize();
        this.backgroundTaskServer = Carrot.getBackgroundTaskServer();
        this.logger = LoggerAssert.initFor(this.backgroundTaskServer);
    }

    @AfterEach
    void stopBackgroundTaskServer() {
        this.backgroundTaskServer.stop();
    }

    @Test
    void testStartAndStop() {
        TaskId taskId = BackgroundTask.enqueue((TaskLambda & Serializable)() -> this.testService.doWork());
        Awaitility.await().during(Durations.TWO_SECONDS).atMost(Durations.FIVE_SECONDS).until(() -> this.testService.getProcessedTasks() == 0);
        CarrotAssertions.assertThat(this.storageProvider.getTaskById(taskId)).hasStates(StateName.ENQUEUED);
        this.backgroundTaskServer.start();
        Awaitility.await().atMost(Durations.TEN_SECONDS).untilAsserted(() -> CarrotAssertions.assertThat(this.storageProvider.getTaskById(taskId)).hasStates(StateName.ENQUEUED, StateName.PROCESSING, StateName.SUCCEEDED));
        this.backgroundTaskServer.pauseProcessing();
        TaskId anotherTaskId = BackgroundTask.enqueue((TaskLambda & Serializable)() -> this.testService.doWork());
        Awaitility.await().during(Durations.TWO_SECONDS).atMost(Durations.FIVE_SECONDS).until(() -> this.testService.getProcessedTasks() == 1);
        Awaitility.await().atMost(Durations.TEN_SECONDS).untilAsserted(() -> CarrotAssertions.assertThat(this.storageProvider.getTaskById(anotherTaskId)).hasStates(StateName.ENQUEUED));
        this.backgroundTaskServer.resumeProcessing();
        Awaitility.await().atMost(Durations.TEN_SECONDS).until(() -> this.testService.getProcessedTasks() > 1);
        Awaitility.await().atMost(Durations.TEN_SECONDS).untilAsserted(() -> CarrotAssertions.assertThat(this.storageProvider.getTaskById(anotherTaskId)).hasStates(StateName.ENQUEUED, StateName.PROCESSING, StateName.SUCCEEDED));
        this.backgroundTaskServer.stop();
        CarrotAssertions.assertThat(this.logger).hasInfoMessageContaining("BackgroundTaskServer and BackgroundTaskPerformers - stopping (waiting for all tasks to complete - max 10 seconds)", 1);
        Awaitility.await().atMost(Durations.TEN_SECONDS).untilAsserted(() -> CarrotAssertions.assertThat(Thread.getAllStackTraces()).matches(this::containsNoBackgroundTaskThreads, "Found BackgroundTask Threads: \n\t" + this.getThreadNames(Thread.getAllStackTraces()).collect(Collectors.joining("\n\t"))));
        CarrotAssertions.assertThat(this.logger).hasInfoMessageContaining("BackgroundTaskServer and BackgroundTaskPerformers stopped", 1);
    }

    @Test
    void testOnServerExitCleansUpAllThreads() {
        int amountOfTasks = 10;
        this.backgroundTaskServer.start();
        for (int i = 0; i < 10; ++i) {
            BackgroundTask.enqueue((TaskLambda & Serializable)() -> this.testService.doWork());
        }
        Awaitility.await().atMost(Durations.TEN_SECONDS).untilAsserted(() -> CarrotAssertions.assertThat(this.storageProvider).hasTasks(StateName.SUCCEEDED, 10, PARTITION0));
        Awaitility.await().atMost(Durations.TEN_SECONDS).untilAsserted(() -> CarrotAssertions.assertThat(Thread.getAllStackTraces()).matches(this::containsBackgroundTaskThreads));
        this.backgroundTaskServer.stop();
        Awaitility.await().atMost(Durations.ONE_MINUTE).untilAsserted(() -> CarrotAssertions.assertThat(Thread.getAllStackTraces()).matches(this::containsNoBackgroundTaskThreads, "Found BackgroundTask Threads: \n\t" + this.getThreadNames(Thread.getAllStackTraces()).collect(Collectors.joining("\n\t"))));
    }

    @Test
    void testServerStatusStateMachine() {
        Assertions.assertThat((boolean)this.backgroundTaskServer.isAnnounced()).isFalse();
        Assertions.assertThat((boolean)this.backgroundTaskServer.isUnAnnounced()).isTrue();
        Assertions.assertThat((boolean)this.backgroundTaskServer.isStarted()).isFalse();
        Assertions.assertThat((boolean)this.backgroundTaskServer.isRunning()).isFalse();
        ((PartitionedStorageProvider)Mockito.doAnswer((Answer)new AnswersWithDelay(100L, (Answer)new ThrowsException((Throwable)new IllegalStateException()))).when((Object)this.storageProvider)).announceBackgroundTaskServer((BackgroundTaskServerStatus)ArgumentMatchers.any());
        Assertions.assertThatCode(() -> this.backgroundTaskServer.start()).doesNotThrowAnyException();
        Assertions.assertThat((boolean)this.backgroundTaskServer.isAnnounced()).isFalse();
        Assertions.assertThat((boolean)this.backgroundTaskServer.isUnAnnounced()).isTrue();
        Assertions.assertThat((boolean)this.backgroundTaskServer.isStarted()).isTrue();
        Assertions.assertThat((boolean)this.backgroundTaskServer.isRunning()).isTrue();
        this.backgroundTaskServer.stop();
        Awaitility.await().until(() -> this.backgroundTaskServer.isStopped());
        Mockito.reset((Object[])new PartitionedStorageProvider[]{this.storageProvider});
        Assertions.assertThatCode(() -> this.backgroundTaskServer.start()).doesNotThrowAnyException();
        Awaitility.await().until(() -> this.backgroundTaskServer.isAnnounced());
        Assertions.assertThat((boolean)this.backgroundTaskServer.isAnnounced()).isTrue();
        Assertions.assertThat((boolean)this.backgroundTaskServer.isStarted()).isTrue();
        Assertions.assertThat((boolean)this.backgroundTaskServer.isRunning()).isTrue();
        Assertions.assertThatCode(() -> this.backgroundTaskServer.pauseProcessing()).doesNotThrowAnyException();
        Assertions.assertThat((boolean)this.backgroundTaskServer.isAnnounced()).isTrue();
        Assertions.assertThat((boolean)this.backgroundTaskServer.isStarted()).isTrue();
        Assertions.assertThat((boolean)this.backgroundTaskServer.isRunning()).isFalse();
        Assertions.assertThatCode(() -> this.backgroundTaskServer.pauseProcessing()).doesNotThrowAnyException();
        Assertions.assertThat((boolean)this.backgroundTaskServer.isAnnounced()).isTrue();
        Assertions.assertThat((boolean)this.backgroundTaskServer.isStarted()).isTrue();
        Assertions.assertThat((boolean)this.backgroundTaskServer.isRunning()).isFalse();
        Assertions.assertThatCode(() -> this.backgroundTaskServer.stop()).doesNotThrowAnyException();
        Awaitility.await().until(() -> !this.backgroundTaskServer.isAnnounced());
        Assertions.assertThat((boolean)this.backgroundTaskServer.isAnnounced()).isFalse();
        Assertions.assertThat((boolean)this.backgroundTaskServer.isStarted()).isFalse();
        Assertions.assertThat((boolean)this.backgroundTaskServer.isRunning()).isFalse();
        Assertions.assertThatThrownBy(() -> this.backgroundTaskServer.resumeProcessing()).isInstanceOf(IllegalStateException.class);
        Assertions.assertThat((boolean)this.backgroundTaskServer.isAnnounced()).isFalse();
        Assertions.assertThat((boolean)this.backgroundTaskServer.isStarted()).isFalse();
        Assertions.assertThat((boolean)this.backgroundTaskServer.isRunning()).isFalse();
        Assertions.assertThatThrownBy(() -> this.backgroundTaskServer.pauseProcessing()).isInstanceOf(IllegalStateException.class);
        Assertions.assertThat((boolean)this.backgroundTaskServer.isAnnounced()).isFalse();
        Assertions.assertThat((boolean)this.backgroundTaskServer.isStarted()).isFalse();
        Assertions.assertThat((boolean)this.backgroundTaskServer.isRunning()).isFalse();
        Assertions.assertThatCode(() -> this.backgroundTaskServer.start()).doesNotThrowAnyException();
        Awaitility.await().until(() -> this.backgroundTaskServer.isAnnounced());
        Assertions.assertThat((boolean)this.backgroundTaskServer.isAnnounced()).isTrue();
        Assertions.assertThat((boolean)this.backgroundTaskServer.isStarted()).isTrue();
        Assertions.assertThat((boolean)this.backgroundTaskServer.isRunning()).isTrue();
        Assertions.assertThatCode(() -> this.backgroundTaskServer.start()).doesNotThrowAnyException();
        Assertions.assertThat((boolean)this.backgroundTaskServer.isAnnounced()).isTrue();
        Assertions.assertThat((boolean)this.backgroundTaskServer.isStarted()).isTrue();
        Assertions.assertThat((boolean)this.backgroundTaskServer.isRunning()).isTrue();
        Assertions.assertThatCode(() -> this.backgroundTaskServer.resumeProcessing()).doesNotThrowAnyException();
        Assertions.assertThat((boolean)this.backgroundTaskServer.isAnnounced()).isTrue();
        Assertions.assertThat((boolean)this.backgroundTaskServer.isStarted()).isTrue();
        Assertions.assertThat((boolean)this.backgroundTaskServer.isRunning()).isTrue();
        Assertions.assertThatCode(() -> this.backgroundTaskServer.resumeProcessing()).doesNotThrowAnyException();
        Assertions.assertThat((boolean)this.backgroundTaskServer.isAnnounced()).isTrue();
        Assertions.assertThat((boolean)this.backgroundTaskServer.isStarted()).isTrue();
        Assertions.assertThat((boolean)this.backgroundTaskServer.isRunning()).isTrue();
    }

    @Test
    void testStopBackgroundTaskServerWhileProcessing() {
        this.backgroundTaskServer.start();
        TaskId taskId = BackgroundTask.enqueue((TaskLambda & Serializable)() -> this.testService.doWorkThatTakesLong(15));
        Awaitility.await().atMost(6L, TimeUnit.SECONDS).until(() -> this.storageProvider.getTaskById(taskId).hasState(StateName.PROCESSING));
        this.backgroundTaskServer.stop();
        Awaitility.await().atMost(60L, TimeUnit.SECONDS).until(() -> this.storageProvider.getTaskById(taskId).hasState(StateName.FAILED) || this.storageProvider.getTaskById(taskId).hasState(StateName.SCHEDULED));
        this.backgroundTaskServer.start();
        Awaitility.await().atMost(21L, TimeUnit.SECONDS).until(() -> this.storageProvider.getTaskById(taskId).hasState(StateName.SUCCEEDED));
    }

    @Test
    void testBackgroundTaskServerWasKilledWhileProcessing() {
        this.backgroundTaskServer.start();
        Task taskThatWasProcessedButBackgroundTaskServerWasKilled = this.storageProvider.save(TaskTestBuilder.anEnqueuedTask().withState((TaskState)new ProcessingState(this.backgroundTaskServer.getId()), Instant.now().minus(2L, ChronoUnit.MINUTES)).build());
        Awaitility.await().atMost(7L, TimeUnit.SECONDS).untilAsserted(() -> CarrotAssertions.assertThat(this.storageProvider.getTaskById(taskThatWasProcessedButBackgroundTaskServerWasKilled.getId())).hasStates(StateName.ENQUEUED, StateName.PROCESSING, StateName.FAILED, StateName.SCHEDULED));
        Awaitility.await().atMost(7L, TimeUnit.SECONDS).until(() -> this.storageProvider.getTaskById(taskThatWasProcessedButBackgroundTaskServerWasKilled.getId()).hasState(StateName.SUCCEEDED));
    }

    @Test
    void testHeartbeatsAreSentForTasksInProcessingState() {
        this.backgroundTaskServer.start();
        TaskId taskId = BackgroundTask.enqueue((TaskLambda & Serializable)() -> this.testService.doWorkThatTakesLong(16));
        Awaitility.await().pollInterval(150L, TimeUnit.MILLISECONDS).pollDelay(3L, TimeUnit.SECONDS).atMost(7L, TimeUnit.SECONDS).untilAsserted(() -> CarrotAssertions.assertThat(this.storageProvider.getTaskById(taskId)).hasUpdatedAtCloseTo(Instant.now(), (TemporalOffset<Temporal>)Assertions.within((long)500L, (TemporalUnit)ChronoUnit.MILLIS)));
        Awaitility.await().pollInterval(150L, TimeUnit.MILLISECONDS).pollDelay(3L, TimeUnit.SECONDS).atMost(7L, TimeUnit.SECONDS).untilAsserted(() -> CarrotAssertions.assertThat(this.storageProvider.getTaskById(taskId)).hasUpdatedAtCloseTo(Instant.now(), (TemporalOffset<Temporal>)Assertions.within((long)500L, (TemporalUnit)ChronoUnit.MILLIS)));
        Awaitility.await().pollInterval(150L, TimeUnit.MILLISECONDS).pollDelay(3L, TimeUnit.SECONDS).atMost(7L, TimeUnit.SECONDS).untilAsserted(() -> CarrotAssertions.assertThat(this.storageProvider.getTaskById(taskId)).hasUpdatedAtCloseTo(Instant.now(), (TemporalOffset<Temporal>)Assertions.within((long)500L, (TemporalUnit)ChronoUnit.MILLIS)));
    }

    @Test
    void testCanNotStartBackgroundTaskServerTwice() {
        new Thread(() -> this.backgroundTaskServer.start()).start();
        new Thread(() -> this.backgroundTaskServer.start()).start();
        SleepUtils.sleep(200L);
        Awaitility.await().until(() -> this.backgroundTaskServer.isStarted());
        Awaitility.await().untilAsserted(() -> CarrotAssertions.assertThat(this.logger).hasInfoMessageContaining("BackgroundTaskPerformers started successfully", 1).hasNoErrorLogMessages());
    }

    @Test
    void getBackgroundTaskRunnerForIoCTaskWithoutInstance() {
        Task task = TaskTestBuilder.anEnqueuedTask().withTaskDetails((IocTaskLambda & Serializable)x -> x.doWork()).build();
        ((ObjectAssert)Assertions.assertThat((Object)this.backgroundTaskServer.getBackgroundTaskRunner(task)).isNotNull()).isInstanceOf(BackgroundTaskWithIocRunner.class);
    }

    @Test
    void getBackgroundTaskRunnerForIoCTaskWithInstance() {
        Task task = TaskTestBuilder.anEnqueuedTask().withTaskDetails((TaskLambda & Serializable)() -> this.testServiceForIoC.doWork()).build();
        ((ObjectAssert)Assertions.assertThat((Object)this.backgroundTaskServer.getBackgroundTaskRunner(task)).isNotNull()).isInstanceOf(BackgroundTaskWithIocRunner.class);
    }

    @Test
    void getBackgroundJobRunnerForNonIoCJobWithoutInstance() {
        this.taskActivator.clear();
        Task task = TaskTestBuilder.anEnqueuedTask().withTaskDetails((IocTaskLambda & Serializable)x -> x.doWork()).build();
        ((ObjectAssert)Assertions.assertThat((Object)this.backgroundTaskServer.getBackgroundTaskRunner(task)).isNotNull()).isInstanceOf(BackgroundTaskWithoutIocRunner.class);
    }

    @Test
    void getBackgroundTaskRunnerForNonIoCTaskWithInstance() {
        this.taskActivator.clear();
        Task task = TaskTestBuilder.anEnqueuedTask().withTaskDetails((TaskLambda & Serializable)() -> this.testService.doWork()).build();
        ((ObjectAssert)Assertions.assertThat((Object)this.backgroundTaskServer.getBackgroundTaskRunner(task)).isNotNull()).isInstanceOf(BackgroundTaskWithoutIocRunner.class);
    }

    @Test
    void getBackgroundTaskRunnerForNonIoCStaticTaskWithoutInstance() {
        this.taskActivator.clear();
        Task task = TaskTestBuilder.anEnqueuedTask().withTaskDetails(StaticTestService::doWorkInStaticMethodWithoutParameter).build();
        ((ObjectAssert)Assertions.assertThat((Object)this.backgroundTaskServer.getBackgroundTaskRunner(task)).isNotNull()).isInstanceOf(BackgroundStaticTaskWithoutIocRunner.class);
    }

    @Test
    void getBackgroundTaskRunnerForTaskThatCannotBeRun() {
        Task task = TaskTestBuilder.anEnqueuedTask().withTaskDetails((IocTaskLambda & Serializable)x -> x.doWork()).build();
        Assertions.assertThatThrownBy(() -> this.backgroundTaskServer.getBackgroundTaskRunner(task)).isInstanceOf(CarrotException.class);
    }

    @Test
    void ifAnnouncingBackgroundSucceedsStartupMessageIsLogged() {
        this.backgroundTaskServer.start();
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).untilAsserted(() -> CarrotAssertions.assertThat(this.logger).hasInfoMessageContaining("BackgroundTaskPerformers started successfully").hasNoErrorLogMessages());
    }

    @Test
    void ifAnnouncingBackgroundTaskServerFailsThisIsLogged() {
        ((PartitionedStorageProvider)Mockito.doThrow((Throwable[])new Throwable[]{new StorageException("Fail")}).when((Object)this.storageProvider)).announceBackgroundTaskServer((BackgroundTaskServerStatus)ArgumentMatchers.any());
        this.backgroundTaskServer.start();
        Awaitility.await().atMost(12L, TimeUnit.SECONDS).untilAsserted(() -> CarrotAssertions.assertThat(this.logger).hasWarningMessageContaining("Carrot BackgroundTaskServer failed to acquire partition"));
    }

    private boolean containsNoBackgroundTaskThreads(Map<Thread, StackTraceElement[]> threadMap) {
        return this.getThreadNames(threadMap).noneMatch(threadName -> threadName.startsWith("backgroundtask"));
    }

    private boolean containsBackgroundTaskThreads(Map<Thread, StackTraceElement[]> threadMap) {
        return this.getThreadNames(threadMap).anyMatch(threadName -> threadName.startsWith("backgroundtask"));
    }

    private Stream<String> getThreadNames(Map<Thread, StackTraceElement[]> threadMap) {
        return threadMap.keySet().stream().map(Thread::getName);
    }
}

