/*
 * Decompiled with CFR 0.152.
 */
package org.mule.service.scheduler.internal;

import io.qameta.allure.Description;
import io.qameta.allure.Feature;
import java.io.InputStream;
import java.lang.ref.PhantomReference;
import java.lang.ref.ReferenceQueue;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.hamcrest.core.Is;
import org.hamcrest.core.IsCollectionContaining;
import org.hamcrest.core.IsInstanceOf;
import org.hamcrest.core.StringStartsWith;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Mockito;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.scheduler.SchedulerBusyException;
import org.mule.runtime.api.scheduler.SchedulerConfig;
import org.mule.runtime.api.scheduler.SchedulerPoolsConfig;
import org.mule.runtime.api.util.concurrent.Latch;
import org.mule.runtime.core.api.util.ClassUtils;
import org.mule.runtime.core.api.util.IOUtils;
import org.mule.service.scheduler.internal.ThrottledScheduler;
import org.mule.service.scheduler.internal.config.ContainerThreadPoolsConfig;
import org.mule.service.scheduler.internal.threads.SchedulerThreadPools;
import org.mule.service.scheduler.internal.util.Delegator;
import org.mule.tck.junit4.AbstractMuleTestCase;
import org.mule.tck.probe.JUnitLambdaProbe;
import org.mule.tck.probe.PollingProber;
import org.mule.tck.probe.Probe;

@Feature(value="Scheduler Service")
public class SchedulerThreadPoolsTestCase
extends AbstractMuleTestCase {
    private static final int CORES = Runtime.getRuntime().availableProcessors();
    private static final long GC_POLLING_TIMEOUT = 10000L;
    @Rule
    public ExpectedException expected = ExpectedException.none();
    private ContainerThreadPoolsConfig threadPoolsConfig;
    private SchedulerThreadPools service;

    @Before
    public void before() throws MuleException {
        this.threadPoolsConfig = ContainerThreadPoolsConfig.loadThreadPoolsConfig();
        this.service = new SchedulerThreadPools(SchedulerThreadPoolsTestCase.class.getName(), (SchedulerPoolsConfig)this.threadPoolsConfig);
        this.service.start();
    }

    @After
    public void after() throws MuleException, InterruptedException {
        if (this.service == null) {
            return;
        }
        for (Scheduler scheduler : new ArrayList(this.service.getSchedulers())) {
            scheduler.stop();
        }
        this.service.stop();
    }

    @Test
    @Description(value="Tests that the threads of the SchedulerService are correcly created and destroyed.")
    public void serviceStop() throws MuleException, InterruptedException {
        Assert.assertThat((Object)SchedulerThreadPoolsTestCase.collectThreadNames(), (Matcher)IsCollectionContaining.hasItem((Matcher)StringStartsWith.startsWith((String)"[MuleRuntime].")));
        this.service.stop();
        this.service = null;
        new PollingProber(500L, 50L).check((Probe)new JUnitLambdaProbe(() -> {
            Assert.assertThat((Object)SchedulerThreadPoolsTestCase.collectThreadNames(), (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)StringStartsWith.startsWith((String)"[MuleRuntime]."))));
            return true;
        }));
    }

    @Test
    @Description(value="Tests that dispatching a task to a throttled scheduler already running its maximum tasks throws the appropriate exception.")
    public void executorRejects() throws MuleException, ExecutionException, InterruptedException {
        Latch latch = new Latch();
        Scheduler cpuLight = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), CORES, () -> 1000L);
        Scheduler custom = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), CORES, () -> 1000L);
        custom.execute(() -> this.awaitLatch(latch));
        this.expected.expect(ExecutionException.class);
        this.expected.expectCause(IsInstanceOf.instanceOf(SchedulerBusyException.class));
        Runnable task = () -> {};
        cpuLight.submit(() -> {
            try {
                custom.submit(task);
            }
            finally {
                Assert.assertThat((Object)custom.shutdownNow(), (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Object)task)));
            }
        }).get();
    }

    @Test
    @Description(value="Tests that a dispatched task has inherited the context classloader.")
    public void classLoaderPropagates() throws Exception {
        Scheduler scheduler = this.service.createCpuLightScheduler(SchedulerConfig.config(), CORES, () -> 1000L);
        ClassLoader contextClassLoader = (ClassLoader)Mockito.mock(ClassLoader.class);
        Thread.currentThread().setContextClassLoader(contextClassLoader);
        Future submit = scheduler.submit(() -> Assert.assertThat((Object)Thread.currentThread().getContextClassLoader(), (Matcher)CoreMatchers.sameInstance((Object)contextClassLoader)));
        submit.get(60L, TimeUnit.SECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Description(value="Tests that a scheduled task has inherited the context classloader.")
    public void classLoaderPropagatesScheduled() throws Exception {
        Scheduler scheduler = this.service.createCpuLightScheduler(SchedulerConfig.config(), CORES, () -> 1000L);
        ClassLoader contextClassLoader = (ClassLoader)Mockito.mock(ClassLoader.class);
        Thread.currentThread().setContextClassLoader(contextClassLoader);
        Latch latch = new Latch();
        Future submit = null;
        try {
            submit = scheduler.scheduleWithFixedDelay(() -> {
                Assert.assertThat((Object)Thread.currentThread().getContextClassLoader(), (Matcher)CoreMatchers.sameInstance((Object)contextClassLoader));
                latch.countDown();
            }, 0L, 60L, TimeUnit.SECONDS);
            latch.await(10L, TimeUnit.SECONDS);
            submit.get(10L, TimeUnit.SECONDS);
        }
        finally {
            if (submit != null) {
                submit.cancel(false);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Description(value="Tests that a cron-scheduled task has inherited the context classloader.")
    public void classLoaderPropagatesCron() throws Exception {
        Scheduler scheduler = this.service.createCpuLightScheduler(SchedulerConfig.config(), CORES, () -> 1000L);
        ClassLoader contextClassLoader = (ClassLoader)Mockito.mock(ClassLoader.class);
        Thread.currentThread().setContextClassLoader(contextClassLoader);
        Latch latch = new Latch();
        ScheduledFuture submit = null;
        try {
            submit = scheduler.scheduleWithCronExpression(() -> {
                Assert.assertThat((Object)Thread.currentThread().getContextClassLoader(), (Matcher)CoreMatchers.sameInstance((Object)contextClassLoader));
                latch.countDown();
            }, "*/1 * * ? * *");
            latch.await(10L, TimeUnit.SECONDS);
            submit.get(10L, TimeUnit.SECONDS);
        }
        finally {
            if (submit != null) {
                submit.cancel(false);
            }
        }
    }

    @Test
    @Description(value="Tests that a custom scheduler doesn't hold a reference to the context classloader that was in the context when it was created.")
    public void customPoolThreadsDontReferenceCreatorClassLoader() throws Exception {
        ClassLoader testClassLoader = new ClassLoader(((Object)((Object)this)).getClass().getClassLoader()){};
        PhantomReference<ClassLoader> clRef = new PhantomReference<ClassLoader>(testClassLoader, new ReferenceQueue());
        this.scheduleToCustomWithClassLoader(testClassLoader);
        testClassLoader = null;
        this.assertNoClassLoaderReferenceHeld(clRef, 10000L);
    }

    public void scheduleToCustomWithClassLoader(ClassLoader testClassLoader) throws InterruptedException, ExecutionException {
        AtomicReference scheduler = new AtomicReference();
        ClassUtils.withContextClassLoader((ClassLoader)testClassLoader, () -> {
            scheduler.set(this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), 1, () -> 1000L));
            try {
                ((Scheduler)scheduler.get()).submit(() -> Assert.assertThat((Object)Thread.currentThread().getContextClassLoader(), (Matcher)Is.is((Object)testClassLoader))).get();
            }
            catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
        });
        ((Scheduler)scheduler.get()).submit(() -> Assert.assertThat((Object)Thread.currentThread().getContextClassLoader(), (Matcher)Is.is((Object)testClassLoader.getParent()))).get();
    }

    @Test
    @Description(value="Tests that a scheduler Executor thread doesn't hold a reference to an artifact classloader through the inheritedAccessControlContext.")
    public void threadsDontReferenceClassLoaderFromAccessControlContext() throws Exception {
        Scheduler scheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), 1, () -> 1000L);
        ClassLoader delegatorClassLoader = this.createDelegatorClassLoader();
        PhantomReference<ClassLoader> clRef = new PhantomReference<ClassLoader>(delegatorClassLoader, new ReferenceQueue());
        Consumer delegator = (Consumer)delegatorClassLoader.loadClass(Delegator.class.getName()).newInstance();
        delegator.accept(() -> scheduler.execute(() -> {}));
        delegator = null;
        delegatorClassLoader = null;
        this.assertNoClassLoaderReferenceHeld(clRef, 10000L);
    }

    @Test
    @Description(value="Tests that IO threads in excess of the core size don't hold a reference to an artifact classloader through the inheritedAccessControlContext.")
    public void elasticIoThreadsDontReferenceClassLoaderFromAccessControlContext() throws Exception {
        Assert.assertThat((Object)this.threadPoolsConfig.getIoKeepAlive().getAsLong(), (Matcher)Matchers.greaterThan((Comparable)Long.valueOf(10000L)));
        Scheduler scheduler = this.service.createIoScheduler(SchedulerConfig.config(), this.threadPoolsConfig.getIoCorePoolSize().getAsInt() + 1, () -> 1000L);
        ClassLoader delegatorClassLoader = this.createDelegatorClassLoader();
        PhantomReference<ClassLoader> clRef = new PhantomReference<ClassLoader>(delegatorClassLoader, new ReferenceQueue());
        Consumer delegator = (Consumer)delegatorClassLoader.loadClass(Delegator.class.getName()).newInstance();
        for (int i = 0; i < this.threadPoolsConfig.getIoCorePoolSize().getAsInt() + 1; ++i) {
            delegator.accept(() -> scheduler.execute(() -> {}));
        }
        delegator = null;
        delegatorClassLoader = null;
        this.assertNoClassLoaderReferenceHeld(clRef, 10000L);
    }

    private ClassLoader createDelegatorClassLoader() {
        ClassLoader testClassLoader = new ClassLoader(((Object)((Object)this)).getClass().getClassLoader()){

            @Override
            public Class<?> loadClass(String name) throws ClassNotFoundException {
                if (Delegator.class.getName().equals(name)) {
                    try {
                        byte[] classBytes = IOUtils.toByteArray((InputStream)this.getClass().getResourceAsStream("/org/mule/service/scheduler/internal/util/Delegator.class"));
                        return this.defineClass(null, classBytes, 0, classBytes.length);
                    }
                    catch (Exception e) {
                        return super.loadClass(name);
                    }
                }
                return super.loadClass(name);
            }
        };
        return testClassLoader;
    }

    private void assertNoClassLoaderReferenceHeld(PhantomReference<ClassLoader> clRef, long timeoutMillis) {
        new PollingProber(timeoutMillis, 100L).check((Probe)new JUnitLambdaProbe(() -> {
            System.gc();
            Assert.assertThat((Object)clRef.isEnqueued(), (Matcher)Is.is((Object)true));
            return true;
        }, "A hard reference is being mantained to the child ClassLoader."));
    }

    @Test
    public void threadGroupOfCustomSchedulerNotLeakedAfterShutdown() throws InterruptedException, ExecutionException, TimeoutException {
        Scheduler scheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), 1, () -> 1000L);
        List<PhantomReference> references = this.recordReferences(scheduler);
        scheduler.shutdown();
        scheduler = null;
        this.assertNoThreadGroupReferenceHeld(references);
    }

    @Test
    public void threadGroupOfCustomSchedulerNotLeakedAfterShutdownNow() throws InterruptedException, ExecutionException, TimeoutException {
        Scheduler scheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), 1, () -> 1000L);
        List<PhantomReference> references = this.recordReferences(scheduler);
        scheduler.shutdownNow();
        scheduler = null;
        this.assertNoThreadGroupReferenceHeld(references);
    }

    @Test
    public void threadGroupOfCustomSchedulerNotLeakedAfterStop() throws InterruptedException, ExecutionException, TimeoutException {
        Scheduler scheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), 1, () -> 1000L);
        List<PhantomReference> references = this.recordReferences(scheduler);
        scheduler.stop();
        scheduler = null;
        this.assertNoThreadGroupReferenceHeld(references);
    }

    private List<PhantomReference> recordReferences(Scheduler scheduler) throws InterruptedException, ExecutionException, TimeoutException {
        ArrayList<PhantomReference> references = new ArrayList<PhantomReference>();
        scheduler.submit(() -> {
            references.add(new PhantomReference<Thread>(Thread.currentThread(), new ReferenceQueue()));
            references.add(new PhantomReference<ThreadGroup>(Thread.currentThread().getThreadGroup(), new ReferenceQueue()));
            return true;
        }).get(5L, TimeUnit.SECONDS);
        return references;
    }

    private void assertNoThreadGroupReferenceHeld(List<PhantomReference> references) {
        new PollingProber(10000L, 100L).check((Probe)new JUnitLambdaProbe(() -> {
            System.gc();
            references.forEach(ref -> Assert.assertThat((String)ref.toString(), (Object)ref.isEnqueued(), (Matcher)Is.is((Object)true)));
            return true;
        }, "A hard reference is being mantained to the scheduler threads/thread group."));
    }

    @Test
    public void customSchedulerShutodownFromWithin() throws InterruptedException, ExecutionException, TimeoutException {
        Scheduler scheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), 1, () -> 1000L);
        Future stopSubmit = scheduler.submit(() -> scheduler.stop());
        this.expected.expect(CancellationException.class);
        try {
            stopSubmit.get(10L, TimeUnit.SECONDS);
        }
        finally {
            Assert.assertThat((Object)scheduler.isShutdown(), (Matcher)Is.is((Object)true));
            Assert.assertThat((Object)scheduler.isTerminated(), (Matcher)Is.is((Object)true));
        }
    }

    @Test
    public void onlyCustomMayConfigureWaitCpuLight() {
        this.expected.expect(IllegalArgumentException.class);
        this.expected.expectMessage("Only custom schedulers may define 'waitAllowed' behaviour");
        this.service.createCpuLightScheduler(SchedulerConfig.config().withWaitAllowed(true), CORES, () -> 1000L);
    }

    @Test
    public void onlyCustomMayConfigureWaitCpuIntensive() {
        this.expected.expect(IllegalArgumentException.class);
        this.expected.expectMessage("Only custom schedulers may define 'waitAllowed' behaviour");
        this.service.createCpuIntensiveScheduler(SchedulerConfig.config().withWaitAllowed(true), CORES, () -> 1000L);
    }

    @Test
    public void onlyCustomMayConfigureWaitIO() {
        this.expected.expect(IllegalArgumentException.class);
        this.expected.expectMessage("Only custom schedulers may define 'waitAllowed' behaviour");
        this.service.createIoScheduler(SchedulerConfig.config().withWaitAllowed(true), CORES, () -> 1000L);
    }

    @Test
    public void onlyCustomMayConfigureDirectRunCpuLightWhenTargetBusyCpuLight() {
        this.expected.expect(IllegalArgumentException.class);
        this.expected.expectMessage("Only custom schedulers may define 'directRunCpuLightWhenTargetBusy' behaviour");
        this.service.createCpuLightScheduler(SchedulerConfig.config().withDirectRunCpuLightWhenTargetBusy(true), CORES, () -> 1000L);
    }

    @Test
    public void onlyCustomMayConfigureDirectRunCpuLightWhenTargetBusyCpuIntensive() {
        this.expected.expect(IllegalArgumentException.class);
        this.expected.expectMessage("Only custom schedulers may define 'directRunCpuLightWhenTargetBusy' behaviour");
        this.service.createCpuIntensiveScheduler(SchedulerConfig.config().withDirectRunCpuLightWhenTargetBusy(true), CORES, () -> 1000L);
    }

    @Test
    public void onlyCustomMayConfigureDirectRunCpuLightWhenTargetBusyIO() {
        this.expected.expect(IllegalArgumentException.class);
        this.expected.expectMessage("Only custom schedulers may define 'directRunCpuLightWhenTargetBusy' behaviour");
        this.service.createIoScheduler(SchedulerConfig.config().withDirectRunCpuLightWhenTargetBusy(true), CORES, () -> 1000L);
    }

    @Test
    @Description(value="Tests that tasks dispatched from a CPU Light thread to a busy Scheduler are rejected.")
    public void rejectionPolicyCpuLight() throws MuleException, InterruptedException, ExecutionException, TimeoutException {
        Scheduler sourceScheduler = this.service.createCpuLightScheduler(SchedulerConfig.config(), CORES, () -> 1000L);
        Scheduler targetScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), CORES, () -> 1000L);
        Latch latch = new Latch();
        Future submit = sourceScheduler.submit(this.threadsConsumer(targetScheduler, latch));
        this.expected.expect(ExecutionException.class);
        this.expected.expectCause(IsInstanceOf.instanceOf(SchedulerBusyException.class));
        submit.get(60L, TimeUnit.SECONDS);
    }

    @Test
    @Description(value="Tests that tasks dispatched from a CPU Intensive thread to a busy Scheduler are rejected.")
    public void rejectionPolicyCpuIntensive() throws MuleException, InterruptedException, ExecutionException, TimeoutException {
        Scheduler sourceScheduler = this.service.createCpuIntensiveScheduler(SchedulerConfig.config(), CORES, () -> 1000L);
        Scheduler targetScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), CORES, () -> 1000L);
        Latch latch = new Latch();
        Future submit = sourceScheduler.submit(this.threadsConsumer(targetScheduler, latch));
        this.expected.expect(ExecutionException.class);
        this.expected.expectCause(IsInstanceOf.instanceOf(SchedulerBusyException.class));
        submit.get(60L, TimeUnit.SECONDS);
    }

    @Test
    @Description(value="Tests that tasks dispatched from an IO thread to a busy Scheduler waits for execution.")
    public void rejectionPolicyIO() throws MuleException, InterruptedException, ExecutionException, TimeoutException {
        Scheduler sourceScheduler = this.service.createIoScheduler(SchedulerConfig.config(), CORES, () -> 1000L);
        Scheduler targetScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), CORES, () -> 1000L);
        Latch latch = new Latch();
        Future submit = sourceScheduler.submit(this.threadsConsumer(targetScheduler, latch));
        try {
            submit.get(1L, TimeUnit.SECONDS);
            Assert.fail();
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
        latch.countDown();
        submit.get(5L, TimeUnit.SECONDS);
    }

    @Test
    @Description(value="Tests that when the IO pool is full, any task dispatched from IO to IO runs in the caller thread instead of being queued, which can cause a deadlock.")
    public void ioToFullIoDoesntWait() throws InterruptedException, ExecutionException {
        Scheduler ioScheduler = this.service.createIoScheduler(SchedulerConfig.config(), CORES, () -> 1000L);
        Latch outerLatch = new Latch();
        Latch innerLatch = new Latch();
        for (int i = 0; i < this.threadPoolsConfig.getIoMaxPoolSize().getAsInt() - 1; ++i) {
            this.consumeThread(ioScheduler, outerLatch);
        }
        AtomicReference callerThread = new AtomicReference();
        AtomicReference executingThread = new AtomicReference();
        Future submitted = ioScheduler.submit(() -> {
            callerThread.set(Thread.currentThread());
            ioScheduler.submit(() -> {
                executingThread.set(Thread.currentThread());
                innerLatch.countDown();
            });
            return this.awaitLatch(outerLatch);
        });
        Assert.assertThat((Object)innerLatch.await(5L, TimeUnit.SECONDS), (Matcher)Is.is((Object)true));
        outerLatch.countDown();
        Assert.assertThat(submitted.get(), (Matcher)Is.is((Object)true));
        Assert.assertThat(executingThread.get(), (Matcher)Is.is(callerThread.get()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Description(value="Tests that when the IO pool is full, any task dispatched from a CUSTOM pool with WAIT rejection action to IO is queued.")
    public void customWaitToFullIoWaits() throws InterruptedException, ExecutionException, TimeoutException {
        Scheduler customScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1).withWaitAllowed(true), CORES, () -> 1000L);
        Scheduler ioScheduler = this.service.createIoScheduler(SchedulerConfig.config(), CORES, () -> 1000L);
        Latch latch = new Latch();
        for (int i = 0; i < this.threadPoolsConfig.getIoMaxPoolSize().getAsInt(); ++i) {
            this.consumeThread(ioScheduler, latch);
        }
        Future submitted = customScheduler.submit(() -> {
            ioScheduler.submit(() -> {});
            Assert.fail((String)"Didn't wait");
            return null;
        });
        this.expected.expect(TimeoutException.class);
        try {
            submitted.get(5L, TimeUnit.SECONDS);
        }
        finally {
            latch.countDown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Description(value="Tests that when the CPU-lite pool is full, any task dispatched from a CUSTOM pool with DirectRunToFullCpuLight falg to CPU-lite is run directlyi in the caller thread.")
    public void customDirectRunToFullCpuLight() throws InterruptedException, ExecutionException, TimeoutException {
        Scheduler customScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1).withDirectRunCpuLightWhenTargetBusy(true), CORES, () -> 1000L);
        Scheduler cpuLightScheduler = this.service.createCpuLightScheduler(SchedulerConfig.config(), CORES, () -> 1000L);
        Latch latch = new Latch();
        for (int i = 0; i < this.threadPoolsConfig.getCpuLightPoolSize().getAsInt() + this.threadPoolsConfig.getCpuLightQueueSize().getAsInt(); ++i) {
            this.consumeThread(cpuLightScheduler, latch);
        }
        AtomicReference callerThread = new AtomicReference();
        AtomicReference taskRunThread = new AtomicReference();
        Future submitted = customScheduler.submit(() -> {
            callerThread.set(Thread.currentThread());
            cpuLightScheduler.submit(() -> taskRunThread.set(Thread.currentThread()));
            return null;
        });
        try {
            submitted.get(5L, TimeUnit.SECONDS);
        }
        finally {
            latch.countDown();
        }
        Assert.assertThat(taskRunThread.get(), (Matcher)CoreMatchers.sameInstance(callerThread.get()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Description(value="Tests that the behavior of combining runCpuLightWhenTargetBusy and waitAllowed depends on the target thread.")
    public void customWaitToFullIoWaitsAndWaitToFullIoWaits() throws InterruptedException, ExecutionException, TimeoutException {
        int i;
        Scheduler customScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1).withWaitAllowed(true).withDirectRunCpuLightWhenTargetBusy(true), CORES, () -> 1000L);
        Scheduler ioScheduler = this.service.createIoScheduler(SchedulerConfig.config(), CORES, () -> 1000L);
        Scheduler cpuLightScheduler = this.service.createCpuLightScheduler(SchedulerConfig.config(), CORES, () -> 1000L);
        Latch latch = new Latch();
        for (i = 0; i < this.threadPoolsConfig.getIoMaxPoolSize().getAsInt(); ++i) {
            this.consumeThread(ioScheduler, latch);
        }
        for (i = 0; i < this.threadPoolsConfig.getCpuLightPoolSize().getAsInt() + this.threadPoolsConfig.getCpuLightQueueSize().getAsInt(); ++i) {
            this.consumeThread(cpuLightScheduler, latch);
        }
        AtomicReference callerThread = new AtomicReference();
        AtomicReference taskRunThread = new AtomicReference();
        Future submittedCpuLight = customScheduler.submit(() -> {
            callerThread.set(Thread.currentThread());
            cpuLightScheduler.submit(() -> taskRunThread.set(Thread.currentThread()));
            return null;
        });
        Future submittedIo = customScheduler.submit(() -> {
            ioScheduler.submit(() -> {});
            Assert.fail((String)"Didn't wait");
            return null;
        });
        try {
            submittedCpuLight.get(5L, TimeUnit.SECONDS);
            Assert.assertThat(taskRunThread.get(), (Matcher)CoreMatchers.sameInstance(callerThread.get()));
            this.expected.expect(TimeoutException.class);
            submittedIo.get(5L, TimeUnit.SECONDS);
        }
        finally {
            latch.countDown();
        }
    }

    @Test
    @Description(value="Tests that periodic tasks scheduled to a busy Scheduler are skipped but the job continues executing.")
    public void rejectionPolicyScheduledPeriodic() throws MuleException, InterruptedException, ExecutionException, TimeoutException {
        Scheduler sourceScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(2), CORES, () -> 1000L);
        Scheduler targetScheduler = this.service.createCpuLightScheduler(SchedulerConfig.config(), CORES, () -> 1000L);
        Latch latch = new Latch();
        Future submit = sourceScheduler.submit(this.threadsConsumer(targetScheduler, latch));
        try {
            submit.get(1L, TimeUnit.SECONDS);
            Assert.fail();
        }
        catch (ExecutionException e) {
            Assert.assertThat((Object)e.getCause(), (Matcher)IsInstanceOf.instanceOf(SchedulerBusyException.class));
        }
        CountDownLatch scheduledTaskLatch = new CountDownLatch(2);
        AtomicReference<Object> scheduledTask = new AtomicReference<Object>(null);
        sourceScheduler.submit(() -> {
            scheduledTask.set(targetScheduler.scheduleWithFixedDelay(() -> scheduledTaskLatch.countDown(), 0L, 1L, TimeUnit.SECONDS));
            return null;
        });
        new PollingProber().check((Probe)new JUnitLambdaProbe(() -> {
            Assert.assertThat((Object)((ScheduledFuture)scheduledTask.get()).isDone(), (Matcher)Is.is((Object)true));
            return true;
        }));
        latch.countDown();
        Assert.assertThat((Object)scheduledTaskLatch.await(5L, TimeUnit.SECONDS), (Matcher)Is.is((Object)true));
    }

    @Test
    @Description(value="Tests that tasks dispatched from a Custom scheduler thread to a busy Scheduler waits for execution.")
    public void rejectionPolicyCustom() throws MuleException, InterruptedException, ExecutionException, TimeoutException {
        Scheduler sourceScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), CORES, () -> 1000L);
        Scheduler targetScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), CORES, () -> 1000L);
        Latch latch = new Latch();
        Future submit = sourceScheduler.submit(this.threadsConsumer(targetScheduler, latch));
        this.expected.expect(ExecutionException.class);
        this.expected.expectCause(IsInstanceOf.instanceOf(SchedulerBusyException.class));
        submit.get(60L, TimeUnit.SECONDS);
    }

    @Test
    @Description(value="Tests that tasks dispatched from a Custom scheduler with 'Wait' allowed thread to a busy Scheduler waits for execution.")
    public void rejectionPolicyCustomWithConfig() throws MuleException, InterruptedException, ExecutionException, TimeoutException {
        Scheduler sourceScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withWaitAllowed(true).withMaxConcurrentTasks(1), CORES, () -> 1000L, 1);
        Scheduler targetScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), CORES, () -> 1000L);
        Latch latch = new Latch();
        Future submit = sourceScheduler.submit(this.threadsConsumer(targetScheduler, latch));
        try {
            submit.get(1L, TimeUnit.SECONDS);
            Assert.fail();
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
        latch.countDown();
        submit.get(5L, TimeUnit.SECONDS);
    }

    @Test
    @Description(value="Tests that tasks dispatched from any other thread to a busy Scheduler are rejected.")
    public void rejectionPolicyOther() throws MuleException, InterruptedException, ExecutionException, TimeoutException {
        ExecutorService sourceExecutor = Executors.newSingleThreadExecutor();
        Scheduler targetScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), CORES, () -> 1000L);
        Latch latch = new Latch();
        Future<Object> submit = sourceExecutor.submit(this.threadsConsumer(targetScheduler, latch));
        try {
            submit.get(1L, TimeUnit.SECONDS);
            Assert.fail();
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
        latch.countDown();
        submit.get(5L, TimeUnit.SECONDS);
    }

    @Test
    @Description(value="Tests that ThrottledScheduler is not used for CPU light schedulers unless maxConcurrency is less than backing pool max size.")
    public void maxCpuLightConcurrencyMoreThanMaxPoolSizeDoesntUseThrottlingScheduler() {
        Assert.assertThat((Object)this.service.createCpuLightScheduler(SchedulerConfig.config().withMaxConcurrentTasks(this.threadPoolsConfig.getCpuLightPoolSize().getAsInt()), 1, () -> 1L), (Matcher)CoreMatchers.not((Matcher)IsInstanceOf.instanceOf(ThrottledScheduler.class)));
        Assert.assertThat((Object)this.service.createCpuLightScheduler(SchedulerConfig.config().withMaxConcurrentTasks(this.threadPoolsConfig.getCpuLightPoolSize().getAsInt() - 1), 1, () -> 1L), (Matcher)IsInstanceOf.instanceOf(ThrottledScheduler.class));
    }

    @Test
    @Description(value="Tests that ThrottledScheduler is not used for CPU intensive schedulers unless maxConcurrency is less than backing pool max size.")
    public void maxCpuIntensiveConcurrencyMoreThanMaxPoolSizeDoesntUseThrottlingScheduler() {
        Assert.assertThat((Object)this.service.createCpuIntensiveScheduler(SchedulerConfig.config().withMaxConcurrentTasks(this.threadPoolsConfig.getCpuIntensivePoolSize().getAsInt()), 1, () -> 1L), (Matcher)CoreMatchers.not((Matcher)IsInstanceOf.instanceOf(ThrottledScheduler.class)));
        Assert.assertThat((Object)this.service.createCpuIntensiveScheduler(SchedulerConfig.config().withMaxConcurrentTasks(this.threadPoolsConfig.getCpuIntensivePoolSize().getAsInt() - 1), 1, () -> 1L), (Matcher)IsInstanceOf.instanceOf(ThrottledScheduler.class));
    }

    @Test
    @Description(value="Tests that ThrottledScheduler is not used for IO schedulers unless maxConcurrency is less than backing pool max size.")
    public void maxIOConcurrencyMoreThanMaxPoolSizeDoesntUseThrottlingScheduler() {
        Assert.assertThat((Object)this.service.createIoScheduler(SchedulerConfig.config().withMaxConcurrentTasks(this.threadPoolsConfig.getIoMaxPoolSize().getAsInt()), 1, () -> 1L), (Matcher)CoreMatchers.not((Matcher)IsInstanceOf.instanceOf(ThrottledScheduler.class)));
        Assert.assertThat((Object)this.service.createIoScheduler(SchedulerConfig.config().withMaxConcurrentTasks(this.threadPoolsConfig.getIoMaxPoolSize().getAsInt() - 1), 1, () -> 1L), (Matcher)IsInstanceOf.instanceOf(ThrottledScheduler.class));
    }

    private Callable<Object> threadsConsumer(Scheduler targetScheduler, Latch latch) {
        return () -> {
            while (latch.getCount() > 0L) {
                this.consumeThread(targetScheduler, latch);
            }
            return null;
        };
    }

    private void consumeThread(Scheduler scheduler, Latch latch) {
        scheduler.submit(() -> this.awaitLatch(latch));
    }

    private boolean awaitLatch(Latch latch) {
        try {
            return latch.await((long)this.getTestTimeoutSecs(), TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }
}

