/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.processor.strategy;

import io.qameta.allure.Description;
import io.qameta.allure.Feature;
import io.qameta.allure.Story;
import java.util.OptionalLong;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.input.NullInputStream;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.IsCollectionContaining;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.mule.runtime.api.exception.DefaultMuleException;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.message.Message;
import org.mule.runtime.api.metadata.DataType;
import org.mule.runtime.api.metadata.TypedValue;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.util.DataUnit;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.construct.Flow;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.api.source.MessageSource;
import org.mule.runtime.core.api.transaction.Transaction;
import org.mule.runtime.core.api.transaction.TransactionCoordination;
import org.mule.runtime.core.internal.exception.MessagingException;
import org.mule.runtime.core.internal.processor.strategy.AbstractProcessingStrategyTestCase;
import org.mule.runtime.core.internal.processor.strategy.AbstractStreamProcessingStrategyFactory;
import org.mule.runtime.core.internal.processor.strategy.ProactorStreamProcessingStrategyFactory;
import org.mule.tck.testmodels.mule.TestTransaction;
import reactor.util.concurrent.Queues;

@Feature(value="Processing Strategies")
@Story(value="Proactor")
public class ProactorStreamProcessingStrategyTestCase
extends AbstractProcessingStrategyTestCase {
    public ProactorStreamProcessingStrategyTestCase(AbstractProcessingStrategyTestCase.Mode mode) {
        super(mode);
    }

    @Override
    protected ProcessingStrategy createProcessingStrategy(MuleContext muleContext, String schedulersNamePrefix) {
        return new ProactorStreamProcessingStrategyFactory.ProactorStreamProcessingStrategy(() -> this.ringBuffer, Queues.XS_BUFFER_SIZE, 1, AbstractStreamProcessingStrategyFactory.DEFAULT_WAIT_STRATEGY, () -> this.cpuLight, () -> this.blocking, () -> this.cpuIntensive, AbstractStreamProcessingStrategyFactory.CORES, Integer.MAX_VALUE);
    }

    @Override
    @Description(value="With the ProactorProcessingStrategy, when all processor are CPU_LIGHT then they are all exectured in a single  cpu light thread.")
    public void singleCpuLight() throws Exception {
        super.singleCpuLight();
        MatcherAssert.assertThat((Object)this.threads, (Matcher)org.hamcrest.Matchers.hasSize((Matcher)CoreMatchers.equalTo((Object)1)));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("cpuLight")).count(), (Matcher)CoreMatchers.equalTo((Object)1L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"I/O"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"cpuIntensive"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"custom"))));
    }

    @Override
    @Description(value="When ProactorProcessingStrategy is configured, two concurrent requests may be processed by two different  cpu light threads. MULE-11132 is needed for true reactor behaviour.")
    public void singleCpuLightConcurrent() throws Exception {
        super.singleCpuLightConcurrent();
        MatcherAssert.assertThat((Object)this.threads, (Matcher)org.hamcrest.Matchers.hasSize(ProactorStreamProcessingStrategyTestCase.between(1, 2)));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("cpuLight")).count(), ProactorStreamProcessingStrategyTestCase.between(1L, 2L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"I/O"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"cpuIntensive"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"custom"))));
    }

    @Override
    @Description(value="With the ProactorProcessingStrategy, when all processor are CPU_LIGHT then they are all exectured in a single  cpu light thread.")
    public void multipleCpuLight() throws Exception {
        super.multipleCpuLight();
        MatcherAssert.assertThat((Object)this.threads, (Matcher)org.hamcrest.Matchers.hasSize((Matcher)CoreMatchers.equalTo((Object)1)));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("cpuLight")).count(), (Matcher)CoreMatchers.equalTo((Object)1L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"I/O"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"cpuIntensive"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"custom"))));
    }

    @Override
    @Description(value="With the ProactorProcessingStrategy, a BLOCKING message processor is scheduled on a IO thread.")
    public void singleBlocking() throws Exception {
        super.singleBlocking();
        MatcherAssert.assertThat((Object)this.threads, (Matcher)org.hamcrest.Matchers.hasSize((Matcher)CoreMatchers.equalTo((Object)1)));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("I/O")).count(), (Matcher)CoreMatchers.equalTo((Object)1L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"cpuLight"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"cpuIntensive"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"custom"))));
    }

    @Override
    @Description(value="With the ProactorProcessingStrategy, each BLOCKING message processor is scheduled on a IO thread. These may, or may not, be the same thread.")
    public void multipleBlocking() throws Exception {
        super.multipleBlocking();
        MatcherAssert.assertThat((Object)this.threads, (Matcher)org.hamcrest.Matchers.hasSize(ProactorStreamProcessingStrategyTestCase.between(1, 3)));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("I/O")).count(), ProactorStreamProcessingStrategyTestCase.between(1L, 3L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"cpuLight"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"cpuIntensive"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"custom"))));
    }

    @Override
    @Description(value="With the ProactorProcessingStrategy, a CPU_INTENSIVE message processor is scheduled on a CPU intensive thread.")
    public void singleCpuIntensive() throws Exception {
        super.singleCpuIntensive();
        MatcherAssert.assertThat((Object)this.threads, (Matcher)org.hamcrest.Matchers.hasSize((Matcher)CoreMatchers.equalTo((Object)1)));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("cpuIntensive")).count(), (Matcher)CoreMatchers.equalTo((Object)1L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"I/O"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"cpuLight"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"custom"))));
    }

    @Override
    @Description(value="With the ProactorProcessingStrategy, each CPU_INTENSIVE message processor is scheduled on a CPU Intensive thread. These may, or may not, be the same thread.")
    public void multipleCpuIntensive() throws Exception {
        super.multipleCpuIntensive();
        MatcherAssert.assertThat((Object)this.threads, (Matcher)org.hamcrest.Matchers.hasSize(ProactorStreamProcessingStrategyTestCase.between(1, 3)));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("cpuIntensive")).count(), ProactorStreamProcessingStrategyTestCase.between(1L, 3L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"I/O"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"cpuLight"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"custom"))));
    }

    @Override
    @Description(value="With the ProactorProcessingStrategy, when there is a mix of processor processing types, each processor is scheduled on the correct scheduler.")
    public void mix() throws Exception {
        super.mix();
        MatcherAssert.assertThat((Object)this.threads, (Matcher)org.hamcrest.Matchers.hasSize((Matcher)CoreMatchers.equalTo((Object)3)));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("cpuIntensive")).count(), (Matcher)CoreMatchers.equalTo((Object)1L));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("I/O")).count(), (Matcher)CoreMatchers.equalTo((Object)1L));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("cpuLight")).count(), (Matcher)CoreMatchers.equalTo((Object)1L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"custom"))));
    }

    @Override
    @Description(value="With the ProactorProcessingStrategy, when there is a mix of processor processing types, each processor is scheduled on the correct scheduler.")
    public void mix2() throws Exception {
        super.mix2();
        MatcherAssert.assertThat((Object)this.threads, (Matcher)org.hamcrest.Matchers.hasSize(ProactorStreamProcessingStrategyTestCase.between(3, 7)));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("cpuIntensive")).count(), ProactorStreamProcessingStrategyTestCase.between(1L, 2L));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("I/O")).count(), ProactorStreamProcessingStrategyTestCase.between(1L, 2L));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("cpuLight")).count(), ProactorStreamProcessingStrategyTestCase.between(1L, 3L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"custom"))));
    }

    @Override
    @Description(value="When the ProactorProcessingStrategy is configured and a transaction is active processing fails with an error")
    public void tx() throws Exception {
        this.flow = ((Flow.Builder)this.flowBuilder.get()).processors(new Processor[]{this.cpuLightProcessor, this.cpuIntensiveProcessor, this.blockingProcessor}).build();
        this.flow.initialise();
        this.flow.start();
        TransactionCoordination.getInstance().bindTransaction((Transaction)new TestTransaction(muleContext));
        this.expectedException.expect(MessagingException.class);
        this.expectedException.expectCause(CoreMatchers.instanceOf(DefaultMuleException.class));
        this.expectedException.expectCause(ThrowableMessageMatcher.hasMessage((Matcher)CoreMatchers.equalTo((Object)"Unable to process a transactional flow asynchronously")));
        this.processFlow(this.testEvent());
    }

    @Override
    @Description(value="When the ReactorProcessingStrategy is configured and a transaction is active processing fails with an error")
    public void asyncCpuLight() throws Exception {
        super.asyncCpuLight();
        MatcherAssert.assertThat((Object)this.threads, (Matcher)org.hamcrest.Matchers.hasSize(ProactorStreamProcessingStrategyTestCase.between(1, 2)));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("cpuLight")).count(), ProactorStreamProcessingStrategyTestCase.between(1L, 2L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"I/O"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"cpuIntensive"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"custom"))));
    }

    @Override
    @Description(value="Concurrent stream with concurrency of 8 only uses two CPU_LIGHT threads.")
    public void concurrentStream() throws Exception {
        super.concurrentStream();
        MatcherAssert.assertThat((Object)this.threads, (Matcher)org.hamcrest.Matchers.hasSize((int)2));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("cpuLight")).count(), (Matcher)CoreMatchers.equalTo((Object)2L));
    }

    @Test
    @Description(value="If IO pool is busy OVERLOAD error is thrown")
    public void blockingRejectedExecution() throws Exception {
        Scheduler blockingSchedulerSpy = (Scheduler)Mockito.spy((Object)this.blocking);
        Scheduler rejectingSchedulerSpy = (Scheduler)Mockito.spy((Object)new AbstractProcessingStrategyTestCase.RejectingScheduler(blockingSchedulerSpy));
        this.flow = ((Flow.Builder)this.flowBuilder.get()).processors(new Processor[]{this.blockingProcessor}).processingStrategyFactory((context, prefix) -> new ProactorStreamProcessingStrategyFactory.ProactorStreamProcessingStrategy(() -> this.ringBuffer, AbstractStreamProcessingStrategyFactory.DEFAULT_BUFFER_SIZE, 1, AbstractStreamProcessingStrategyFactory.DEFAULT_WAIT_STRATEGY, () -> this.cpuLight, () -> rejectingSchedulerSpy, () -> this.cpuIntensive, 1, 2)).build();
        this.flow.initialise();
        this.flow.start();
        this.processFlow(this.testEvent());
        ((Scheduler)Mockito.verify((Object)rejectingSchedulerSpy, (VerificationMode)Mockito.times((int)11))).submit((Callable)Matchers.any(Callable.class));
        ((Scheduler)Mockito.verify((Object)blockingSchedulerSpy, (VerificationMode)Mockito.times((int)1))).submit((Callable)Matchers.any(Callable.class));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)org.hamcrest.Matchers.hasSize((int)1));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("I/O")).count(), (Matcher)CoreMatchers.equalTo((Object)1L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"cpuLight"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"cpuIntensive"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"custom"))));
    }

    @Test
    @Description(value="If CPU INTENSIVE pool is busy OVERLOAD error is thrown")
    public void cpuIntensiveRejectedExecution() throws Exception {
        Scheduler cpuIntensiveSchedulerSpy = (Scheduler)Mockito.spy((Object)this.cpuIntensive);
        Scheduler rejectingSchedulerSpy = (Scheduler)Mockito.spy((Object)new AbstractProcessingStrategyTestCase.RejectingScheduler(cpuIntensiveSchedulerSpy));
        this.flow = ((Flow.Builder)this.flowBuilder.get()).processors(new Processor[]{this.cpuIntensiveProcessor}).processingStrategyFactory((context, prefix) -> new ProactorStreamProcessingStrategyFactory.ProactorStreamProcessingStrategy(() -> this.ringBuffer, AbstractStreamProcessingStrategyFactory.DEFAULT_BUFFER_SIZE, 1, AbstractStreamProcessingStrategyFactory.DEFAULT_WAIT_STRATEGY, () -> this.cpuLight, () -> this.blocking, () -> rejectingSchedulerSpy, 1, 2)).build();
        this.flow.initialise();
        this.flow.start();
        this.processFlow(this.testEvent());
        ((Scheduler)Mockito.verify((Object)rejectingSchedulerSpy, (VerificationMode)Mockito.times((int)11))).submit((Callable)Matchers.any(Callable.class));
        ((Scheduler)Mockito.verify((Object)cpuIntensiveSchedulerSpy, (VerificationMode)Mockito.times((int)1))).submit((Callable)Matchers.any(Callable.class));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)org.hamcrest.Matchers.hasSize((int)1));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("cpuIntensive")).count(), (Matcher)CoreMatchers.equalTo((Object)1L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"cpuLight"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"I/O"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"custom"))));
    }

    @Test
    @Description(value="If max concurrency is 1, only 1 thread is used for CPU_LITE processors and further requests blocks. When maxConcurrency < subscribers processing is done on ring-buffer thread.")
    public void singleCpuLightConcurrentMaxConcurrency1() throws Exception {
        this.internalConcurrent(((Flow.Builder)this.flowBuilder.get()).processingStrategyFactory((context, prefix) -> new ProactorStreamProcessingStrategyFactory.ProactorStreamProcessingStrategy(() -> this.ringBuffer, AbstractStreamProcessingStrategyFactory.DEFAULT_BUFFER_SIZE, 1, AbstractStreamProcessingStrategyFactory.DEFAULT_WAIT_STRATEGY, () -> this.cpuLight, () -> this.blocking, () -> this.cpuIntensive, AbstractStreamProcessingStrategyFactory.CORES, 1)), true, ReactiveProcessor.ProcessingType.CPU_LITE, 1, new Processor[0]);
        MatcherAssert.assertThat((Object)this.threads, (Matcher)org.hamcrest.Matchers.hasSize((int)1));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("ringBuffer")).count(), (Matcher)CoreMatchers.equalTo((Object)1L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"cpuLight"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"I/O"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"cpuIntensive"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"custom"))));
    }

    @Test
    @Description(value="If max concurrency is 2, only 2 threads are used for CPU_LITE processors and further requests blocks.")
    public void singleCpuLightConcurrentMaxConcurrency2() throws Exception {
        this.internalConcurrent(((Flow.Builder)this.flowBuilder.get()).processingStrategyFactory((context, prefix) -> new ProactorStreamProcessingStrategyFactory.ProactorStreamProcessingStrategy(() -> this.ringBuffer, AbstractStreamProcessingStrategyFactory.DEFAULT_BUFFER_SIZE, 1, AbstractStreamProcessingStrategyFactory.DEFAULT_WAIT_STRATEGY, () -> this.cpuLight, () -> this.blocking, () -> this.cpuIntensive, AbstractStreamProcessingStrategyFactory.CORES, 2)), true, ReactiveProcessor.ProcessingType.CPU_LITE, 2, new Processor[0]);
        MatcherAssert.assertThat((Object)this.threads, (Matcher)org.hamcrest.Matchers.hasSize((int)2));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"ringBuffer"))));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("cpuLight")).count(), (Matcher)CoreMatchers.equalTo((Object)2L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"I/O"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"cpuIntensive"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"custom"))));
    }

    @Test
    @Description(value="If max concurrency is 1, only 1 thread is used for BLOCKING processors and further requests blocks. When maxConcurrency < subscribers processing is done on ring-buffer thread.")
    public void singleBlockingConcurrentMaxConcurrency1() throws Exception {
        this.internalConcurrent(((Flow.Builder)this.flowBuilder.get()).processingStrategyFactory((context, prefix) -> new ProactorStreamProcessingStrategyFactory.ProactorStreamProcessingStrategy(() -> this.ringBuffer, AbstractStreamProcessingStrategyFactory.DEFAULT_BUFFER_SIZE, 1, AbstractStreamProcessingStrategyFactory.DEFAULT_WAIT_STRATEGY, () -> this.cpuLight, () -> this.blocking, () -> this.cpuIntensive, AbstractStreamProcessingStrategyFactory.CORES, 1)), true, ReactiveProcessor.ProcessingType.BLOCKING, 1, new Processor[0]);
        MatcherAssert.assertThat((Object)this.threads, (Matcher)org.hamcrest.Matchers.hasSize((int)1));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("I/O")).count(), (Matcher)CoreMatchers.equalTo((Object)1L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"cpuLight"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"ringBuffer"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"cpuIntensive"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"custom"))));
    }

    @Test
    @Description(value="If max concurrency is 2, only 2 threads are used for BLOCKING processors and further requests blocks.")
    public void singleBlockingConcurrentMaxConcurrency2() throws Exception {
        this.internalConcurrent(((Flow.Builder)this.flowBuilder.get()).processingStrategyFactory((context, prefix) -> new ProactorStreamProcessingStrategyFactory.ProactorStreamProcessingStrategy(() -> this.ringBuffer, AbstractStreamProcessingStrategyFactory.DEFAULT_BUFFER_SIZE, 1, AbstractStreamProcessingStrategyFactory.DEFAULT_WAIT_STRATEGY, () -> this.cpuLight, () -> this.blocking, () -> this.cpuIntensive, 1, 2)), true, ReactiveProcessor.ProcessingType.BLOCKING, 2, new Processor[0]);
        MatcherAssert.assertThat((Object)this.threads, (Matcher)org.hamcrest.Matchers.hasSize((int)2));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("I/O")).count(), (Matcher)CoreMatchers.equalTo((Object)2L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"cpuLight"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"cpuIntensive"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"custom"))));
    }

    @Test
    @Description(value="Notifications are invoked on CPU_LITE thread")
    public void asyncProcessorNotificationExecutionThreads() throws Exception {
        AtomicReference<Thread> beforeThread = new AtomicReference<Thread>();
        AtomicReference<Thread> afterThread = new AtomicReference<Thread>();
        this.testAsyncCpuLightNotificationThreads(beforeThread, afterThread);
        MatcherAssert.assertThat((Object)beforeThread.get().getName(), (Matcher)org.hamcrest.Matchers.startsWith((String)"cpuLight"));
        MatcherAssert.assertThat((Object)afterThread.get().getName(), (Matcher)org.hamcrest.Matchers.startsWith((String)"cpuLight"));
    }

    @Test
    @Description(value="When back-pressure strategy is 'WAIT' the source thread blocks and all requests are processed.")
    public void sourceBackPressureWait() throws Exception {
        this.testBackPressure(MessageSource.BackPressureStrategy.WAIT, (Matcher<Integer>)CoreMatchers.equalTo((Object)2000), (Matcher<Integer>)CoreMatchers.equalTo((Object)0), (Matcher<Integer>)CoreMatchers.equalTo((Object)2000));
    }

    @Test
    @Description(value="When back-pressure strategy is 'FAIL' some requests fail with an OVERLOAD error.")
    public void sourceBackPressureFail() throws Exception {
        this.testBackPressure(MessageSource.BackPressureStrategy.FAIL, (Matcher<Integer>)org.hamcrest.Matchers.lessThan((Comparable)Integer.valueOf(2000)), (Matcher<Integer>)org.hamcrest.Matchers.greaterThan((Comparable)Integer.valueOf(0)), (Matcher<Integer>)CoreMatchers.equalTo((Object)2000));
    }

    @Test
    @Description(value="When back-pressure strategy is 'DROP' the flow rejects requests in the same way way with 'FAIL. It is the source that handles FAIL and DROP differently.")
    public void sourceBackPressureDrop() throws Exception {
        this.testBackPressure(MessageSource.BackPressureStrategy.DROP, (Matcher<Integer>)org.hamcrest.Matchers.lessThan((Comparable)Integer.valueOf(2000)), (Matcher<Integer>)org.hamcrest.Matchers.greaterThan((Comparable)Integer.valueOf(0)), (Matcher<Integer>)CoreMatchers.equalTo((Object)2000));
    }

    @Test
    @Description(value="When concurrency < parallelism IO threads are still used for blocking processors to avoid cpuLight thread starvation.")
    public void concurrencyLessThanParallelism() throws Exception {
        this.flow = ((Flow.Builder)this.flowBuilder.get()).processingStrategyFactory((context, prefix) -> new ProactorStreamProcessingStrategyFactory.ProactorStreamProcessingStrategy(() -> this.ringBuffer, Queues.XS_BUFFER_SIZE, 1, AbstractStreamProcessingStrategyFactory.DEFAULT_WAIT_STRATEGY, () -> this.cpuLight, () -> this.blocking, () -> this.cpuIntensive, 4, 2)).processors(new Processor[]{this.blockingProcessor}).build();
        this.flow.initialise();
        this.flow.start();
        this.processFlow(this.testEvent());
        MatcherAssert.assertThat((Object)this.threads, (Matcher)org.hamcrest.Matchers.hasSize((Matcher)CoreMatchers.equalTo((Object)1)));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("I/O")).count(), (Matcher)CoreMatchers.equalTo((Object)1L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"cpuLight"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"cpuIntensive"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"custom"))));
    }

    @Test
    @Description(value="If the processing type is IO_RW and the payload is not a stream processing occurs in CPU_LIGHT thread.")
    public void singleIOWRWString() throws Exception {
        super.singleIORW(() -> this.testEvent());
        MatcherAssert.assertThat((Object)this.threads, (Matcher)org.hamcrest.Matchers.hasSize((Matcher)CoreMatchers.equalTo((Object)1)));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("cpuLight")).count(), (Matcher)CoreMatchers.equalTo((Object)1L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"I/O"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"cpuIntensive"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"custom"))));
    }

    @Test
    @Description(value="If the processing type is IO_RW and the payload is a stream with unknown length then processing occurs in BLOCKING thread.")
    public void singleIOWRWUnkownLengthStream() throws Exception {
        super.singleIORW(() -> this.createStreamPayloadEventWithLength(OptionalLong.empty()));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)org.hamcrest.Matchers.hasSize((Matcher)CoreMatchers.equalTo((Object)1)));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("I/O")).count(), (Matcher)CoreMatchers.equalTo((Object)1L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"cpuLight"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"cpuIntensive"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"custom"))));
    }

    @Test
    @Description(value="If the processing type is IO_RW and the payload is a stream shorter that 16KB in length then processing occurs in CPU_LIGHT thread.")
    public void singleIOWRWSmallStream() throws Exception {
        super.singleIORW(() -> this.createStreamPayloadEventWithLength(OptionalLong.of(DataUnit.KB.toBytes(10))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)org.hamcrest.Matchers.hasSize((Matcher)CoreMatchers.equalTo((Object)1)));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("cpuLight")).count(), (Matcher)CoreMatchers.equalTo((Object)1L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"I/O"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"cpuIntensive"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"custom"))));
    }

    @Test
    @Description(value="If the processing type is IO_RW and the payload is a longer than 16KB in length then processing occurs in BLOCKING thread.")
    public void singleIOWRWLargeStream() throws Exception {
        super.singleIORW(() -> this.createStreamPayloadEventWithLength(OptionalLong.of(DataUnit.KB.toBytes(20))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)org.hamcrest.Matchers.hasSize((Matcher)CoreMatchers.equalTo((Object)1)));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("I/O")).count(), (Matcher)CoreMatchers.equalTo((Object)1L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"cpuLight"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"cpuIntensive"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)org.hamcrest.Matchers.startsWith((String)"custom"))));
    }

    private CoreEvent createStreamPayloadEventWithLength(OptionalLong length) throws MuleException {
        return CoreEvent.builder((CoreEvent)this.testEvent()).message(Message.builder().payload(new TypedValue((Object)new NullInputStream(length.orElse(-1L)), DataType.INPUT_STREAM, length)).build()).build();
    }
}

