/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.processor.strategy;

import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicReference;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.IsCollectionContaining;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.mule.runtime.core.api.DefaultMuleException;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.api.transaction.Transaction;
import org.mule.runtime.core.exception.MessagingException;
import org.mule.runtime.core.processor.strategy.AbstractProcessingStrategyTestCase;
import org.mule.runtime.core.processor.strategy.ProactorStreamProcessingStrategyFactory;
import org.mule.runtime.core.processor.strategy.ReactorStreamProcessingStrategyFactory;
import org.mule.runtime.core.transaction.TransactionCoordination;
import org.mule.tck.junit4.AbstractReactiveProcessorTestCase;
import org.mule.tck.testmodels.mule.TestTransaction;
import ru.yandex.qatools.allure.annotations.Description;
import ru.yandex.qatools.allure.annotations.Features;
import ru.yandex.qatools.allure.annotations.Stories;

@Features(value={"Processing Strategies"})
@Stories(value={"Proactor"})
public class ProactorStreamProcessingStrategyTestCase
extends AbstractProcessingStrategyTestCase {
    public ProactorStreamProcessingStrategyTestCase(AbstractReactiveProcessorTestCase.Mode mode) {
        super(mode);
    }

    @Override
    protected ProcessingStrategy createProcessingStrategy(MuleContext muleContext, String schedulersNamePrefix) {
        return new ProactorStreamProcessingStrategyFactory.ProactorStreamProcessingStrategy(() -> this.ringBuffer, ReactorStreamProcessingStrategyFactory.DEFAULT_BUFFER_SIZE, 1, ReactorStreamProcessingStrategyFactory.DEFAULT_WAIT_STRATEGY, () -> this.cpuLight, () -> this.blocking, () -> this.cpuIntensive, 4);
    }

    @Override
    @Description(value="With the ProactorProcessingStrategy, when all processor are CPU_LIGHT then they are all exectured in a single  cpu light thread.")
    public void singleCpuLight() throws Exception {
        super.singleCpuLight();
        MatcherAssert.assertThat((Object)this.threads, (Matcher)Matchers.hasSize((Matcher)CoreMatchers.equalTo((Object)1)));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("cpuLight")).count(), (Matcher)CoreMatchers.equalTo((Object)1L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"I/O"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuIntensive"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"custom"))));
    }

    @Override
    @Description(value="When ProactorProcessingStrategy is configured, two concurrent requests may be processed by two different  cpu light threads. MULE-11132 is needed for true reactor behaviour.")
    public void singleCpuLightConcurrent() throws Exception {
        super.singleCpuLightConcurrent();
        MatcherAssert.assertThat((Object)this.threads, (Matcher)Matchers.hasSize(ProactorStreamProcessingStrategyTestCase.between(1, 2)));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("cpuLight")).count(), ProactorStreamProcessingStrategyTestCase.between(1L, 2L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"I/O"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuIntensive"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"custom"))));
    }

    @Override
    @Description(value="With the ProactorProcessingStrategy, when all processor are CPU_LIGHT then they are all exectured in a single  cpu light thread.")
    public void multipleCpuLight() throws Exception {
        super.multipleCpuLight();
        MatcherAssert.assertThat((Object)this.threads, (Matcher)Matchers.hasSize((Matcher)CoreMatchers.equalTo((Object)1)));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("cpuLight")).count(), (Matcher)CoreMatchers.equalTo((Object)1L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"I/O"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuIntensive"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"custom"))));
    }

    @Override
    @Description(value="With the ProactorProcessingStrategy, a BLOCKING message processor is scheduled on a IO thread.")
    public void singleBlocking() throws Exception {
        super.singleBlocking();
        MatcherAssert.assertThat((Object)this.threads, (Matcher)Matchers.hasSize((Matcher)CoreMatchers.equalTo((Object)1)));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("I/O")).count(), (Matcher)CoreMatchers.equalTo((Object)1L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuLight"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuIntensive"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"custom"))));
    }

    @Override
    @Description(value="With the ProactorProcessingStrategy, each BLOCKING message processor is scheduled on a IO thread. These may, or may not, be the same thread.")
    public void multipleBlocking() throws Exception {
        super.multipleBlocking();
        MatcherAssert.assertThat((Object)this.threads, (Matcher)Matchers.hasSize(ProactorStreamProcessingStrategyTestCase.between(1, 3)));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("I/O")).count(), ProactorStreamProcessingStrategyTestCase.between(1L, 3L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuLight"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuIntensive"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"custom"))));
    }

    @Override
    @Description(value="With the ProactorProcessingStrategy, a CPU_INTENSIVE message processor is scheduled on a CPU intensive thread.")
    public void singleCpuIntensive() throws Exception {
        super.singleCpuIntensive();
        MatcherAssert.assertThat((Object)this.threads, (Matcher)Matchers.hasSize((Matcher)CoreMatchers.equalTo((Object)1)));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("cpuIntensive")).count(), (Matcher)CoreMatchers.equalTo((Object)1L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"I/O"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuLight"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"custom"))));
    }

    @Override
    @Description(value="With the ProactorProcessingStrategy, each CPU_INTENSIVE message processor is scheduled on a CPU Intensive thread. These may, or may not, be the same thread.")
    public void multipleCpuIntensive() throws Exception {
        super.multipleCpuIntensive();
        MatcherAssert.assertThat((Object)this.threads, (Matcher)Matchers.hasSize(ProactorStreamProcessingStrategyTestCase.between(1, 3)));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("cpuIntensive")).count(), ProactorStreamProcessingStrategyTestCase.between(1L, 3L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"I/O"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuLight"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"custom"))));
    }

    @Override
    @Description(value="With the ProactorProcessingStrategy, when there is a mix of processor processing types, each processor is scheduled on the correct scheduler.")
    public void mix() throws Exception {
        super.mix();
        MatcherAssert.assertThat((Object)this.threads, (Matcher)Matchers.hasSize((Matcher)CoreMatchers.equalTo((Object)3)));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("cpuIntensive")).count(), (Matcher)CoreMatchers.equalTo((Object)1L));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("I/O")).count(), (Matcher)CoreMatchers.equalTo((Object)1L));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("cpuLight")).count(), (Matcher)CoreMatchers.equalTo((Object)1L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"custom"))));
    }

    @Override
    @Description(value="With the ProactorProcessingStrategy, when there is a mix of processor processing types, each processor is scheduled on the correct scheduler.")
    public void mix2() throws Exception {
        super.mix2();
        MatcherAssert.assertThat((Object)this.threads, (Matcher)Matchers.hasSize(ProactorStreamProcessingStrategyTestCase.between(3, 7)));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("cpuIntensive")).count(), ProactorStreamProcessingStrategyTestCase.between(1L, 2L));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("I/O")).count(), ProactorStreamProcessingStrategyTestCase.between(1L, 2L));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("cpuLight")).count(), ProactorStreamProcessingStrategyTestCase.between(1L, 3L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"custom"))));
    }

    @Override
    @Description(value="When the ProactorProcessingStrategy is configured and a transaction is active processing fails with an error")
    public void tx() throws Exception {
        this.flow.setMessageProcessors(Arrays.asList(this.cpuLightProcessor, this.cpuIntensiveProcessor, this.blockingProcessor));
        this.flow.initialise();
        this.flow.start();
        TransactionCoordination.getInstance().bindTransaction((Transaction)new TestTransaction(muleContext));
        this.expectedException.expect(MessagingException.class);
        this.expectedException.expectCause(CoreMatchers.instanceOf(DefaultMuleException.class));
        this.expectedException.expectCause(ThrowableMessageMatcher.hasMessage((Matcher)CoreMatchers.equalTo((Object)"Unable to process a transactional flow asynchronously")));
        this.process((Processor)this.flow, this.testEvent());
    }

    @Override
    @Description(value="When the ReactorProcessingStrategy is configured and a transaction is active processing fails with an error")
    public void asyncCpuLight() throws Exception {
        super.asyncCpuLight();
        MatcherAssert.assertThat((Object)this.threads, (Matcher)Matchers.hasSize(ProactorStreamProcessingStrategyTestCase.between(1, 2)));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("cpuLight")).count(), ProactorStreamProcessingStrategyTestCase.between(1L, 2L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"I/O"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuIntensive"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"custom"))));
    }

    @Override
    @Description(value="Concurrent stream with concurrency of 8 only uses two CPU_LIGHT threads.")
    public void concurrentStream() throws Exception {
        super.concurrentStream();
        MatcherAssert.assertThat((Object)this.threads, (Matcher)Matchers.hasSize((int)2));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("cpuLight")).count(), (Matcher)CoreMatchers.equalTo((Object)2L));
    }

    @Test
    @Description(value="If IO pool is busy OVERLOAD error is thrown")
    public void blockingRejectedExecution() throws Exception {
        this.flow.setProcessingStrategyFactory((context, prefix) -> new ProactorStreamProcessingStrategyFactory.ProactorStreamProcessingStrategy(() -> this.ringBuffer, ReactorStreamProcessingStrategyFactory.DEFAULT_BUFFER_SIZE, 1, ReactorStreamProcessingStrategyFactory.DEFAULT_WAIT_STRATEGY, () -> this.cpuLight, () -> new AbstractProcessingStrategyTestCase.RejectingScheduler(), () -> this.cpuIntensive, 4));
        this.flow.setMessageProcessors(Collections.singletonList(this.blockingProcessor));
        this.flow.initialise();
        this.flow.start();
        this.expectRejected();
        this.process((Processor)this.flow, this.testEvent());
    }

    @Test
    @Description(value="If CPU INTENSIVE pool is busy OVERLOAD error is thrown")
    public void cpuIntensiveRejectedExecution() throws Exception {
        this.flow.setProcessingStrategyFactory((context, prefix) -> new ProactorStreamProcessingStrategyFactory.ProactorStreamProcessingStrategy(() -> this.ringBuffer, ReactorStreamProcessingStrategyFactory.DEFAULT_BUFFER_SIZE, 1, ReactorStreamProcessingStrategyFactory.DEFAULT_WAIT_STRATEGY, () -> this.cpuLight, () -> this.blocking, () -> new AbstractProcessingStrategyTestCase.RejectingScheduler(), 4));
        this.flow.setMessageProcessors(Collections.singletonList(this.cpuIntensiveProcessor));
        this.flow.initialise();
        this.flow.start();
        this.expectRejected();
        this.process((Processor)this.flow, this.testEvent());
    }

    @Test
    @Description(value="If max concurrency is 1, only 1 thread is used for CPU_LITE processors and further requests blocks. When maxConcurrency < subscribers processing is done on ring-buffer thread.")
    public void singleCpuLightConcurrentMaxConcurrency1() throws Exception {
        this.flow.setProcessingStrategyFactory((context, prefix) -> new ProactorStreamProcessingStrategyFactory.ProactorStreamProcessingStrategy(() -> this.ringBuffer, ReactorStreamProcessingStrategyFactory.DEFAULT_BUFFER_SIZE, 1, ReactorStreamProcessingStrategyFactory.DEFAULT_WAIT_STRATEGY, () -> this.cpuLight, () -> this.blocking, () -> this.cpuIntensive, 1));
        this.internalConcurrent(true, ReactiveProcessor.ProcessingType.CPU_LITE, 1, new Processor[0]);
        MatcherAssert.assertThat((Object)this.threads, (Matcher)Matchers.hasSize((int)1));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("ringBuffer")).count(), (Matcher)CoreMatchers.equalTo((Object)1L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuLight"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"I/O"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuIntensive"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"custom"))));
    }

    @Test
    @Description(value="If max concurrency is 1, only 1 thread is used for BLOCKING processors and further requests blocks. When maxConcurrency < subscribers processing is done on ring-buffer thread.")
    public void singleBlockingConcurrentMaxConcurrency1() throws Exception {
        this.flow.setProcessingStrategyFactory((context, prefix) -> new ProactorStreamProcessingStrategyFactory.ProactorStreamProcessingStrategy(() -> this.ringBuffer, ReactorStreamProcessingStrategyFactory.DEFAULT_BUFFER_SIZE, 1, ReactorStreamProcessingStrategyFactory.DEFAULT_WAIT_STRATEGY, () -> this.cpuLight, () -> this.blocking, () -> this.cpuIntensive, 1));
        this.internalConcurrent(true, ReactiveProcessor.ProcessingType.BLOCKING, 1, new Processor[0]);
        MatcherAssert.assertThat((Object)this.threads, (Matcher)Matchers.hasSize((int)1));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("ringBuffer")).count(), (Matcher)CoreMatchers.equalTo((Object)1L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuLight"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"I/O"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuIntensive"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"custom"))));
    }

    @Test
    @Description(value="If max concurrency is 2, only 2 threads are used for BLOCKING processors and further requests blocks.")
    public void singleBlockingConcurrentMaxConcurrency2() throws Exception {
        this.flow.setProcessingStrategyFactory((context, prefix) -> new ProactorStreamProcessingStrategyFactory.ProactorStreamProcessingStrategy(() -> this.ringBuffer, ReactorStreamProcessingStrategyFactory.DEFAULT_BUFFER_SIZE, 1, ReactorStreamProcessingStrategyFactory.DEFAULT_WAIT_STRATEGY, () -> this.cpuLight, () -> this.blocking, () -> this.cpuIntensive, 2));
        this.internalConcurrent(true, ReactiveProcessor.ProcessingType.BLOCKING, 2, new Processor[0]);
        MatcherAssert.assertThat((Object)this.threads, (Matcher)Matchers.hasSize((int)2));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("I/O")).count(), (Matcher)CoreMatchers.equalTo((Object)2L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuLight"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuIntensive"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"custom"))));
    }

    @Test
    @Description(value="Notifications are invoked on CPU_LITE thread")
    public void asyncProcessorNotificationExecutionThreads() throws Exception {
        AtomicReference<Thread> beforeThread = new AtomicReference<Thread>();
        AtomicReference<Thread> afterThread = new AtomicReference<Thread>();
        this.testAsyncCpuLightNotificationThreads(beforeThread, afterThread);
        MatcherAssert.assertThat((Object)beforeThread.get().getName(), (Matcher)Matchers.startsWith((String)"cpuLight"));
        MatcherAssert.assertThat((Object)afterThread.get().getName(), (Matcher)Matchers.startsWith((String)"cpuLight"));
    }
}

