/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.processor.strategy;

import io.qameta.allure.Description;
import io.qameta.allure.Feature;
import io.qameta.allure.Story;
import java.util.concurrent.atomic.AtomicReference;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.IsCollectionContaining;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.mule.runtime.api.exception.DefaultMuleException;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.construct.Flow;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.api.source.MessageSource;
import org.mule.runtime.core.api.transaction.Transaction;
import org.mule.runtime.core.api.transaction.TransactionCoordination;
import org.mule.runtime.core.internal.exception.MessagingException;
import org.mule.runtime.core.internal.processor.strategy.AbstractProcessingStrategyTestCase;
import org.mule.runtime.core.internal.processor.strategy.ReactorProcessingStrategyFactory;
import org.mule.tck.testmodels.mule.TestTransaction;

@Feature(value="Processing Strategies")
@Story(value="Reactor")
public class ReactorProcessingStrategyTestCase
extends AbstractProcessingStrategyTestCase {
    public ReactorProcessingStrategyTestCase(AbstractProcessingStrategyTestCase.Mode mode) {
        super(mode);
    }

    @Override
    protected ProcessingStrategy createProcessingStrategy(MuleContext muleContext, String schedulersNamePrefix) {
        return new ReactorProcessingStrategyFactory.ReactorProcessingStrategy(() -> this.cpuLight);
    }

    @Override
    @Description(value="Regardless of processor type, when the ReactorProcessingStrategy is configured, the pipeline is executed synchronously in a single cpu light thread.")
    public void singleCpuLight() throws Exception {
        super.singleCpuLight();
        this.assertEverythingOnEventLoop();
    }

    @Override
    @Description(value="When ReactorProcessingStrategy is configured, two concurrent requests may be processed by two different  cpu light threads.")
    public void singleCpuLightConcurrent() throws Exception {
        super.singleCpuLightConcurrent();
        MatcherAssert.assertThat((Object)this.threads, (Matcher)Matchers.hasSize(ReactorProcessingStrategyTestCase.between(1, 2)));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("cpuLight")).count(), ReactorProcessingStrategyTestCase.between(1L, 2L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"I/O"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuIntensive"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"custom"))));
    }

    @Test
    @Description(value="If CPU LIGHT pool has maximum size of 1 only 1 thread is used and further requests block.")
    public void singleCpuLightConcurrentMaxConcurrency1() throws Exception {
        this.internalConcurrent(((Flow.Builder)this.flowBuilder.get()).processingStrategyFactory((context, prefix) -> new ReactorProcessingStrategyFactory.ReactorProcessingStrategy(() -> new AbstractProcessingStrategyTestCase.TestScheduler(1, "cpuLight", true))), true, ReactiveProcessor.ProcessingType.CPU_LITE, 1, new Processor[0]);
        MatcherAssert.assertThat((Object)this.threads, (Matcher)Matchers.hasSize((int)1));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("cpuLight")).count(), (Matcher)CoreMatchers.equalTo((Object)1L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"I/O"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuIntensive"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"custom"))));
    }

    @Override
    @Description(value="Regardless of processor type, when the ReactorProcessingStrategy is configured, the pipeline is executed synchronously in a single cpu light thread.")
    public void multipleCpuLight() throws Exception {
        super.multipleCpuLight();
        this.assertEverythingOnEventLoop();
    }

    @Override
    @Description(value="Regardless of processor type, when the ReactorProcessingStrategy is configured, the pipeline is executed synchronously in a single cpu light thread.")
    public void singleBlocking() throws Exception {
        super.singleBlocking();
        this.assertEverythingOnEventLoop();
    }

    @Override
    @Description(value="Regardless of processor type, when the ReactorProcessingStrategy is configured, the pipeline is executed synchronously in a single cpu light thread.")
    public void multipleBlocking() throws Exception {
        super.multipleBlocking();
        this.assertEverythingOnEventLoop();
    }

    @Override
    @Description(value="Regardless of processor type, when the ReactorProcessingStrategy is configured, the pipeline is executed synchronously in a single cpu light thread.")
    public void singleCpuIntensive() throws Exception {
        super.singleCpuIntensive();
        this.assertEverythingOnEventLoop();
    }

    @Override
    @Description(value="Regardless of processor type, when the ReactorProcessingStrategy is configured, the pipeline is executed synchronously in a single cpu light thread.")
    public void multipleCpuIntensive() throws Exception {
        super.multipleCpuIntensive();
        this.assertEverythingOnEventLoop();
    }

    @Override
    @Description(value="Regardless of processor type, when the ReactorProcessingStrategy is configured, the pipeline is executed synchronously in a single cpu light thread.")
    public void mix() throws Exception {
        super.mix();
        this.assertEverythingOnEventLoop();
    }

    @Override
    @Description(value="Regardless of processor type, when the ReactorProcessingStrategy is configured, the pipeline is executed synchronously in a single cpu light thread.")
    public void mix2() throws Exception {
        super.mix2();
        this.assertEverythingOnEventLoop();
    }

    private void assertEverythingOnEventLoop() {
        MatcherAssert.assertThat((Object)this.threads, (Matcher)Matchers.hasSize((int)1));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("cpuLight")).count(), (Matcher)CoreMatchers.equalTo((Object)1L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"I/O"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuIntensive"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"custom"))));
    }

    @Override
    @Description(value="When the ReactorProcessingStrategy is configured and a transaction is active processing fails with an error")
    public void tx() throws Exception {
        this.flow = ((Flow.Builder)this.flowBuilder.get()).processors(new Processor[]{this.cpuLightProcessor, this.cpuIntensiveProcessor, this.blockingProcessor}).build();
        this.flow.initialise();
        this.flow.start();
        TransactionCoordination.getInstance().bindTransaction((Transaction)new TestTransaction(muleContext));
        this.expectedException.expect(MessagingException.class);
        this.expectedException.expectCause(CoreMatchers.instanceOf(DefaultMuleException.class));
        this.expectedException.expectCause(ThrowableMessageMatcher.hasMessage((Matcher)CoreMatchers.equalTo((Object)"Unable to process a transactional flow asynchronously")));
        this.processFlow(this.testEvent());
    }

    @Override
    @Description(value="When the ReactorProcessingStrategy is configured any async processing will be returned to CPU_LIGHT thread. This helps avoid deadlocks when there are reduced number of threads used by async processor.")
    public void asyncCpuLight() throws Exception {
        super.asyncCpuLight();
        MatcherAssert.assertThat((Object)this.threads.size(), ReactorProcessingStrategyTestCase.between(1, 2));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("cpuLight")).count(), ReactorProcessingStrategyTestCase.between(1L, 2L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"I/O"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuIntensive"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"custom"))));
    }

    @Override
    @Description(value="When the ReactorProcessingStrategy is configured any async processing will be returned to CPU_LIGHT thread. This helps avoid deadlocks when there are reduced number of threads used by async processor.")
    public void asyncCpuLightConcurrent() throws Exception {
        super.asyncCpuLightConcurrent();
        MatcherAssert.assertThat((Object)this.threads, (Matcher)Matchers.hasSize((int)2));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("cpuLight")).count(), (Matcher)CoreMatchers.equalTo((Object)2L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"I/O"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuIntensive"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"custom"))));
    }

    @Override
    @Description(value="Concurrent stream with concurrency of 8 only uses two CPU_LIGHT threads.")
    public void concurrentStream() throws Exception {
        super.concurrentStream();
        MatcherAssert.assertThat((Object)this.threads, (Matcher)Matchers.hasSize((int)2));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("cpuLight")).count(), (Matcher)CoreMatchers.equalTo((Object)2L));
    }

    @Test
    @Description(value="If CPU LITE pool is busy OVERLOAD error is thrown")
    public void cpuLightRejectedExecution() throws Exception {
        this.flow = ((Flow.Builder)this.flowBuilder.get()).processors(new Processor[]{this.blockingProcessor}).processingStrategyFactory((context, prefix) -> new ReactorProcessingStrategyFactory.ReactorProcessingStrategy(() -> new AbstractProcessingStrategyTestCase.RejectingScheduler(this.cpuLight))).build();
        this.flow.initialise();
        this.flow.start();
        this.expectRejected();
        this.processFlow(this.testEvent());
    }

    @Test
    @Description(value="Notifications are invoked on CPU_LITE thread")
    public void asyncProcessorNotificationExecutionThreads() throws Exception {
        AtomicReference<Thread> beforeThread = new AtomicReference<Thread>();
        AtomicReference<Thread> afterThread = new AtomicReference<Thread>();
        this.testAsyncCpuLightNotificationThreads(beforeThread, afterThread);
        MatcherAssert.assertThat((Object)beforeThread.get().getName(), (Matcher)Matchers.startsWith((String)"cpuLight"));
        MatcherAssert.assertThat((Object)afterThread.get().getName(), (Matcher)Matchers.startsWith((String)"cpuLight"));
    }

    @Test
    @Description(value="Regardless of back-pressure strategy this processing strategy blocks and processes all events")
    public void sourceBackPressureWait() throws Exception {
        this.testBackPressure(MessageSource.BackPressureStrategy.WAIT, (Matcher<Integer>)CoreMatchers.equalTo((Object)2000), (Matcher<Integer>)CoreMatchers.equalTo((Object)0), (Matcher<Integer>)CoreMatchers.equalTo((Object)2000));
    }

    @Test
    @Description(value="Regardless of back-pressure strategy this processing strategy blocks and processes all events")
    public void sourceBackPressureFail() throws Exception {
        this.testBackPressure(MessageSource.BackPressureStrategy.FAIL, (Matcher<Integer>)CoreMatchers.equalTo((Object)2000), (Matcher<Integer>)CoreMatchers.equalTo((Object)0), (Matcher<Integer>)CoreMatchers.equalTo((Object)2000));
    }

    @Test
    @Description(value="Regardless of back-pressure strategy this processing strategy blocks and processes all events")
    public void sourceBackPressureDrop() throws Exception {
        this.testBackPressure(MessageSource.BackPressureStrategy.DROP, (Matcher<Integer>)CoreMatchers.equalTo((Object)2000), (Matcher<Integer>)CoreMatchers.equalTo((Object)0), (Matcher<Integer>)CoreMatchers.equalTo((Object)2000));
    }

    @Test
    @Description(value="Regardless of processor type, when the ReactorProcessingStrategy is configured, the pipeline is executed synchronously in a single cpu light thread.")
    public void singleIORW() throws Exception {
        super.singleIORW(() -> this.testEvent());
        this.assertEverythingOnEventLoop();
    }
}

