/*
 * Decompiled with CFR 0.152.
 */
package org.mule.service.scheduler.internal;

import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.hamcrest.core.IsCollectionContaining;
import org.hamcrest.core.IsInstanceOf;
import org.hamcrest.core.StringStartsWith;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Mockito;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.core.api.scheduler.SchedulerBusyException;
import org.mule.runtime.core.api.scheduler.SchedulerConfig;
import org.mule.runtime.core.api.scheduler.SchedulerPoolsConfig;
import org.mule.runtime.core.api.util.concurrent.Latch;
import org.mule.service.scheduler.internal.ThrottledScheduler;
import org.mule.service.scheduler.internal.config.ContainerThreadPoolsConfig;
import org.mule.service.scheduler.internal.threads.SchedulerThreadPools;
import org.mule.tck.junit4.AbstractMuleTestCase;
import org.mule.tck.probe.JUnitLambdaProbe;
import org.mule.tck.probe.PollingProber;
import org.mule.tck.probe.Probe;
import ru.yandex.qatools.allure.annotations.Description;
import ru.yandex.qatools.allure.annotations.Features;

@Features(value={"Scheduler Service"})
public class SchedulerThreadPoolsTestCase
extends AbstractMuleTestCase {
    private static final int CORES = Runtime.getRuntime().availableProcessors();
    @Rule
    public ExpectedException expected = ExpectedException.none();
    private ContainerThreadPoolsConfig threadPoolsConfig;
    private SchedulerThreadPools service;

    @Before
    public void before() throws MuleException {
        this.threadPoolsConfig = ContainerThreadPoolsConfig.loadThreadPoolsConfig();
        this.service = new SchedulerThreadPools(SchedulerThreadPoolsTestCase.class.getName(), (SchedulerPoolsConfig)this.threadPoolsConfig);
        this.service.start();
    }

    @After
    public void after() throws MuleException, InterruptedException {
        if (this.service == null) {
            return;
        }
        for (Scheduler scheduler : new ArrayList(this.service.getSchedulers())) {
            scheduler.stop();
        }
        this.service.stop();
    }

    @Test
    @Description(value="Tests that the threads of the SchedulerService are correcly created and destroyed.")
    public void serviceStop() throws MuleException, InterruptedException {
        Assert.assertThat((Object)SchedulerThreadPoolsTestCase.collectThreadNames(), (Matcher)IsCollectionContaining.hasItem((Matcher)StringStartsWith.startsWith((String)"[MuleRuntime].")));
        this.service.stop();
        this.service = null;
        new PollingProber(500L, 50L).check((Probe)new JUnitLambdaProbe(() -> {
            Assert.assertThat((Object)SchedulerThreadPoolsTestCase.collectThreadNames(), (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)StringStartsWith.startsWith((String)"[MuleRuntime]."))));
            return true;
        }));
    }

    @Test
    @Description(value="Tests that dispatching a task to a throttled scheduler already running its maximum tasks throws the appropriate exception.")
    public void executorRejects() throws MuleException, ExecutionException, InterruptedException {
        Latch latch = new Latch();
        Scheduler cpuLight = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), CORES, () -> 1000L);
        Scheduler custom = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), CORES, () -> 1000L);
        custom.execute(() -> this.awaitLatch(latch));
        this.expected.expect(ExecutionException.class);
        this.expected.expectCause(IsInstanceOf.instanceOf(SchedulerBusyException.class));
        Runnable task = () -> {};
        cpuLight.submit(() -> {
            try {
                custom.submit(task);
            }
            finally {
                Assert.assertThat((Object)custom.shutdownNow(), (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Object)task)));
            }
        }).get();
    }

    @Test
    @Description(value="Tests that a dispatched task has inherited the context classloader.")
    public void classLoaderPropagates() throws Exception {
        Scheduler scheduler = this.service.createCpuLightScheduler(SchedulerConfig.config(), CORES, () -> 1000L);
        ClassLoader contextClassLoader = (ClassLoader)Mockito.mock(ClassLoader.class);
        Thread.currentThread().setContextClassLoader(contextClassLoader);
        Future submit = scheduler.submit(() -> Assert.assertThat((Object)Thread.currentThread().getContextClassLoader(), (Matcher)CoreMatchers.sameInstance((Object)contextClassLoader)));
        submit.get(60L, TimeUnit.SECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Description(value="Tests that a scheduled task has inherited the context classloader.")
    public void classLoaderPropagatesScheduled() throws Exception {
        Scheduler scheduler = this.service.createCpuLightScheduler(SchedulerConfig.config(), CORES, () -> 1000L);
        ClassLoader contextClassLoader = (ClassLoader)Mockito.mock(ClassLoader.class);
        Thread.currentThread().setContextClassLoader(contextClassLoader);
        Latch latch = new Latch();
        Future submit = null;
        try {
            submit = scheduler.scheduleWithFixedDelay(() -> {
                Assert.assertThat((Object)Thread.currentThread().getContextClassLoader(), (Matcher)CoreMatchers.sameInstance((Object)contextClassLoader));
                latch.countDown();
            }, 0L, 60L, TimeUnit.SECONDS);
            latch.await(10L, TimeUnit.SECONDS);
            submit.get(10L, TimeUnit.SECONDS);
        }
        finally {
            if (submit != null) {
                submit.cancel(false);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Description(value="Tests that a cron-scheduled task has inherited the context classloader.")
    public void classLoaderPropagatesCron() throws Exception {
        Scheduler scheduler = this.service.createCpuLightScheduler(SchedulerConfig.config(), CORES, () -> 1000L);
        ClassLoader contextClassLoader = (ClassLoader)Mockito.mock(ClassLoader.class);
        Thread.currentThread().setContextClassLoader(contextClassLoader);
        Latch latch = new Latch();
        ScheduledFuture submit = null;
        try {
            submit = scheduler.scheduleWithCronExpression(() -> {
                Assert.assertThat((Object)Thread.currentThread().getContextClassLoader(), (Matcher)CoreMatchers.sameInstance((Object)contextClassLoader));
                latch.countDown();
            }, "*/1 * * ? * *");
            latch.await(10L, TimeUnit.SECONDS);
            submit.get(10L, TimeUnit.SECONDS);
        }
        finally {
            if (submit != null) {
                submit.cancel(false);
            }
        }
    }

    @Test
    public void onlyCustomMayConfigureWaitCpuLight() {
        this.expected.expect(IllegalArgumentException.class);
        this.expected.expectMessage("Only custom schedulers may define 'waitAllowed' behaviour");
        this.service.createCpuLightScheduler(SchedulerConfig.config().withWaitAllowed(true), CORES, () -> 1000L);
    }

    @Test
    public void onlyCustomMayConfigureWaitCpuIntensive() {
        this.expected.expect(IllegalArgumentException.class);
        this.expected.expectMessage("Only custom schedulers may define 'waitAllowed' behaviour");
        this.service.createCpuIntensiveScheduler(SchedulerConfig.config().withWaitAllowed(true), CORES, () -> 1000L);
    }

    @Test
    public void onlyCustomMayConfigureWaitIO() {
        this.expected.expect(IllegalArgumentException.class);
        this.expected.expectMessage("Only custom schedulers may define 'waitAllowed' behaviour");
        this.service.createIoScheduler(SchedulerConfig.config().withWaitAllowed(true), CORES, () -> 1000L);
    }

    @Test
    @Description(value="Tests that tasks dispatched from a CPU Light thread to a busy Scheduler are rejected.")
    public void rejectionPolicyCpuLight() throws MuleException, InterruptedException, ExecutionException, TimeoutException {
        Scheduler sourceScheduler = this.service.createCpuLightScheduler(SchedulerConfig.config(), CORES, () -> 1000L);
        Scheduler targetScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), CORES, () -> 1000L);
        Latch latch = new Latch();
        Future submit = sourceScheduler.submit(this.threadsConsumer(targetScheduler, latch));
        this.expected.expect(ExecutionException.class);
        this.expected.expectCause(IsInstanceOf.instanceOf(SchedulerBusyException.class));
        submit.get(60L, TimeUnit.SECONDS);
    }

    @Test
    @Description(value="Tests that tasks dispatched from a CPU Intensive thread to a busy Scheduler are rejected.")
    public void rejectionPolicyCpuIntensive() throws MuleException, InterruptedException, ExecutionException, TimeoutException {
        Scheduler sourceScheduler = this.service.createCpuIntensiveScheduler(SchedulerConfig.config(), CORES, () -> 1000L);
        Scheduler targetScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), CORES, () -> 1000L);
        Latch latch = new Latch();
        Future submit = sourceScheduler.submit(this.threadsConsumer(targetScheduler, latch));
        this.expected.expect(ExecutionException.class);
        this.expected.expectCause(IsInstanceOf.instanceOf(SchedulerBusyException.class));
        submit.get(60L, TimeUnit.SECONDS);
    }

    @Test
    @Description(value="Tests that tasks dispatched from an IO thread to a busy Scheduler waits for execution.")
    public void rejectionPolicyIO() throws MuleException, InterruptedException, ExecutionException, TimeoutException {
        Scheduler sourceScheduler = this.service.createIoScheduler(SchedulerConfig.config(), CORES, () -> 1000L);
        Scheduler targetScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), CORES, () -> 1000L);
        Latch latch = new Latch();
        Future submit = sourceScheduler.submit(this.threadsConsumer(targetScheduler, latch));
        try {
            submit.get(1L, TimeUnit.SECONDS);
            Assert.fail();
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
        latch.countDown();
        submit.get(5L, TimeUnit.SECONDS);
    }

    @Test
    @Description(value="Tests that when the IO pool is full, any task dispatched from IO to IO runs in the caller thread instead of being queued, which can cause a deadlock.")
    public void ioToFullIoDoesntWait() throws InterruptedException, ExecutionException {
        Scheduler ioScheduler = this.service.createIoScheduler(SchedulerConfig.config(), CORES, () -> 1000L);
        Latch outerLatch = new Latch();
        Latch innerLatch = new Latch();
        for (int i = 0; i < this.threadPoolsConfig.getIoMaxPoolSize().getAsInt() - 1; ++i) {
            this.consumeThread(ioScheduler, outerLatch);
        }
        AtomicReference callerThread = new AtomicReference();
        AtomicReference executingThread = new AtomicReference();
        Future submitted = ioScheduler.submit(() -> {
            callerThread.set(Thread.currentThread());
            ioScheduler.submit(() -> {
                executingThread.set(Thread.currentThread());
                innerLatch.countDown();
            });
            return this.awaitLatch(outerLatch);
        });
        Assert.assertThat((Object)innerLatch.await(5L, TimeUnit.SECONDS), (Matcher)Matchers.is((Object)true));
        outerLatch.countDown();
        Assert.assertThat(submitted.get(), (Matcher)Matchers.is((Object)true));
        Assert.assertThat(executingThread.get(), (Matcher)Matchers.is(callerThread.get()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Description(value="Tests that when the IO pool is full, any task dispatched from a CUSTOM pool with WAIT rejection action to IO is queued.")
    public void customWaitToFullIoWaits() throws InterruptedException, ExecutionException, TimeoutException {
        Scheduler customScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1).withWaitAllowed(true), CORES, () -> 1000L);
        Scheduler ioScheduler = this.service.createIoScheduler(SchedulerConfig.config(), CORES, () -> 1000L);
        Latch latch = new Latch();
        for (int i = 0; i < this.threadPoolsConfig.getIoMaxPoolSize().getAsInt(); ++i) {
            this.consumeThread(ioScheduler, latch);
        }
        Future submitted = customScheduler.submit(() -> {
            ioScheduler.submit(() -> {});
            Assert.fail((String)"Didn't wait");
            return null;
        });
        this.expected.expect(TimeoutException.class);
        try {
            submitted.get(5L, TimeUnit.SECONDS);
        }
        finally {
            latch.countDown();
        }
    }

    @Test
    @Description(value="Tests that periodic tasks scheduled to a busy Scheduler are skipped but the job continues executing.")
    public void rejectionPolicyScheduledPeriodic() throws MuleException, InterruptedException, ExecutionException, TimeoutException {
        Scheduler sourceScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(2), CORES, () -> 1000L);
        Scheduler targetScheduler = this.service.createCpuLightScheduler(SchedulerConfig.config(), CORES, () -> 1000L);
        Latch latch = new Latch();
        Future submit = sourceScheduler.submit(this.threadsConsumer(targetScheduler, latch));
        try {
            submit.get(1L, TimeUnit.SECONDS);
            Assert.fail();
        }
        catch (ExecutionException e) {
            Assert.assertThat((Object)e.getCause(), (Matcher)IsInstanceOf.instanceOf(SchedulerBusyException.class));
        }
        CountDownLatch scheduledTaskLatch = new CountDownLatch(2);
        AtomicReference<Object> scheduledTask = new AtomicReference<Object>(null);
        sourceScheduler.submit(() -> {
            scheduledTask.set(targetScheduler.scheduleWithFixedDelay(() -> scheduledTaskLatch.countDown(), 0L, 1L, TimeUnit.SECONDS));
            return null;
        });
        new PollingProber().check((Probe)new JUnitLambdaProbe(() -> {
            Assert.assertThat((Object)((ScheduledFuture)scheduledTask.get()).isDone(), (Matcher)Matchers.is((Object)true));
            return true;
        }));
        latch.countDown();
        Assert.assertThat((Object)scheduledTaskLatch.await(5L, TimeUnit.SECONDS), (Matcher)Matchers.is((Object)true));
    }

    @Test
    @Description(value="Tests that tasks dispatched from a Custom scheduler thread to a busy Scheduler waits for execution.")
    public void rejectionPolicyCustom() throws MuleException, InterruptedException, ExecutionException, TimeoutException {
        Scheduler sourceScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), CORES, () -> 1000L);
        Scheduler targetScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), CORES, () -> 1000L);
        Latch latch = new Latch();
        Future submit = sourceScheduler.submit(this.threadsConsumer(targetScheduler, latch));
        this.expected.expect(ExecutionException.class);
        this.expected.expectCause(IsInstanceOf.instanceOf(SchedulerBusyException.class));
        submit.get(60L, TimeUnit.SECONDS);
    }

    @Test
    @Description(value="Tests that tasks dispatched from a Custom scheduler with 'Wait' allowed thread to a busy Scheduler waits for execution.")
    public void rejectionPolicyCustomWithConfig() throws MuleException, InterruptedException, ExecutionException, TimeoutException {
        Scheduler sourceScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withWaitAllowed(true).withMaxConcurrentTasks(1), CORES, () -> 1000L, 1);
        Scheduler targetScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), CORES, () -> 1000L);
        Latch latch = new Latch();
        Future submit = sourceScheduler.submit(this.threadsConsumer(targetScheduler, latch));
        try {
            submit.get(1L, TimeUnit.SECONDS);
            Assert.fail();
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
        latch.countDown();
        submit.get(5L, TimeUnit.SECONDS);
    }

    @Test
    @Description(value="Tests that tasks dispatched from any other thread to a busy Scheduler are rejected.")
    public void rejectionPolicyOther() throws MuleException, InterruptedException, ExecutionException, TimeoutException {
        ExecutorService sourceExecutor = Executors.newSingleThreadExecutor();
        Scheduler targetScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), CORES, () -> 1000L);
        Latch latch = new Latch();
        Future<Object> submit = sourceExecutor.submit(this.threadsConsumer(targetScheduler, latch));
        try {
            submit.get(1L, TimeUnit.SECONDS);
            Assert.fail();
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
        latch.countDown();
        submit.get(5L, TimeUnit.SECONDS);
    }

    @Test
    @Description(value="Tests that ThrottledScheduler is not used for CPU light schedulers unless maxConcurrency is less than backing pool max size.")
    public void maxCpuLightConcurrencyMoreThanMaxPoolSizeDoesntUseThrottlingScheduler() {
        Assert.assertThat((Object)this.service.createCpuLightScheduler(SchedulerConfig.config().withMaxConcurrentTasks(this.threadPoolsConfig.getCpuLightPoolSize().getAsInt()), 1, () -> 1L), (Matcher)CoreMatchers.not((Matcher)IsInstanceOf.instanceOf(ThrottledScheduler.class)));
        Assert.assertThat((Object)this.service.createCpuLightScheduler(SchedulerConfig.config().withMaxConcurrentTasks(this.threadPoolsConfig.getCpuLightPoolSize().getAsInt() - 1), 1, () -> 1L), (Matcher)IsInstanceOf.instanceOf(ThrottledScheduler.class));
    }

    @Test
    @Description(value="Tests that ThrottledScheduler is not used for CPU intensive schedulers unless maxConcurrency is less than backing pool max size.")
    public void maxCpuIntensiveConcurrencyMoreThanMaxPoolSizeDoesntUseThrottlingScheduler() {
        Assert.assertThat((Object)this.service.createCpuIntensiveScheduler(SchedulerConfig.config().withMaxConcurrentTasks(this.threadPoolsConfig.getCpuIntensivePoolSize().getAsInt()), 1, () -> 1L), (Matcher)CoreMatchers.not((Matcher)IsInstanceOf.instanceOf(ThrottledScheduler.class)));
        Assert.assertThat((Object)this.service.createCpuIntensiveScheduler(SchedulerConfig.config().withMaxConcurrentTasks(this.threadPoolsConfig.getCpuIntensivePoolSize().getAsInt() - 1), 1, () -> 1L), (Matcher)IsInstanceOf.instanceOf(ThrottledScheduler.class));
    }

    @Test
    @Description(value="Tests that ThrottledScheduler is not used for IO schedulers unless maxConcurrency is less than backing pool max size.")
    public void maxIOConcurrencyMoreThanMaxPoolSizeDoesntUseThrottlingScheduler() {
        Assert.assertThat((Object)this.service.createIoScheduler(SchedulerConfig.config().withMaxConcurrentTasks(this.threadPoolsConfig.getIoMaxPoolSize().getAsInt()), 1, () -> 1L), (Matcher)CoreMatchers.not((Matcher)IsInstanceOf.instanceOf(ThrottledScheduler.class)));
        Assert.assertThat((Object)this.service.createIoScheduler(SchedulerConfig.config().withMaxConcurrentTasks(this.threadPoolsConfig.getIoMaxPoolSize().getAsInt() - 1), 1, () -> 1L), (Matcher)IsInstanceOf.instanceOf(ThrottledScheduler.class));
    }

    private Callable<Object> threadsConsumer(Scheduler targetScheduler, Latch latch) {
        return () -> {
            while (latch.getCount() > 0L) {
                this.consumeThread(targetScheduler, latch);
            }
            return null;
        };
    }

    private void consumeThread(Scheduler scheduler, Latch latch) {
        scheduler.submit(() -> this.awaitLatch(latch));
    }

    private boolean awaitLatch(Latch latch) {
        try {
            return latch.await((long)this.getTestTimeoutSecs(), TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }
}

