/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.processor.strategy;

import io.qameta.allure.Description;
import io.qameta.allure.Feature;
import io.qameta.allure.Story;
import java.util.concurrent.atomic.AtomicReference;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.IsCollectionContaining;
import org.junit.Before;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.mockito.Mockito;
import org.mule.runtime.api.exception.DefaultMuleException;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.construct.Flow;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.api.source.MessageSource;
import org.mule.runtime.core.api.transaction.Transaction;
import org.mule.runtime.core.api.transaction.TransactionCoordination;
import org.mule.runtime.core.internal.exception.MessagingException;
import org.mule.runtime.core.internal.processor.strategy.AbstractProcessingStrategyTestCase;
import org.mule.runtime.core.internal.processor.strategy.WorkQueueProcessingStrategyFactory;
import org.mule.runtime.core.privileged.registry.RegistrationException;
import org.mule.tck.testmodels.mule.TestTransaction;

@Feature(value="Processing Strategies")
@Story(value="Work Queue")
public class WorkQueueProcessingStrategyTestCase
extends AbstractProcessingStrategyTestCase {
    public WorkQueueProcessingStrategyTestCase(AbstractProcessingStrategyTestCase.Mode mode) {
        super(mode);
    }

    @Override
    @Before
    public void before() throws RegistrationException {
        super.before();
        this.blocking = new AbstractProcessingStrategyTestCase.TestScheduler(4, "I/O", false);
    }

    @Override
    protected ProcessingStrategy createProcessingStrategy(MuleContext muleContext, String schedulersNamePrefix) {
        return new WorkQueueProcessingStrategyFactory.WorkQueueProcessingStrategy(() -> this.blocking);
    }

    @Override
    @Description(value="Regardless of processor type, when the WorkQueueProcessingStrategy is configured, the pipeline is executed synchronously in a single IO thead.")
    public void singleCpuLight() throws Exception {
        super.singleCpuLight();
        this.assertSynchronousIOScheduler(1);
    }

    @Override
    @Description(value="Regardless of processor type, when the WorkQueueProcessingStrategy is configured, the pipeline is executed synchronously in a single IO thead.")
    public void singleCpuLightConcurrent() throws Exception {
        super.singleCpuLightConcurrent();
        MatcherAssert.assertThat((Object)this.threads.size(), (Matcher)CoreMatchers.allOf((Matcher)Matchers.greaterThanOrEqualTo((Comparable)Integer.valueOf(1)), (Matcher)Matchers.lessThanOrEqualTo((Comparable)Integer.valueOf(2))));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("I/O")).count(), (Matcher)CoreMatchers.allOf((Matcher)Matchers.greaterThanOrEqualTo((Comparable)Long.valueOf(1L)), (Matcher)Matchers.lessThanOrEqualTo((Comparable)Long.valueOf(2L))));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("cpuLight")).count(), (Matcher)CoreMatchers.equalTo((Object)0L));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("cpuIntensive")).count(), (Matcher)CoreMatchers.equalTo((Object)0L));
    }

    @Override
    @Description(value="Regardless of processor type, when the WorkQueueProcessingStrategy is configured, the pipeline is executed synchronously in a single IO thead.")
    public void multipleCpuLight() throws Exception {
        super.multipleCpuLight();
        this.assertSynchronousIOScheduler(1);
    }

    @Override
    @Description(value="Regardless of processor type, when the WorkQueueProcessingStrategy is configured, the pipeline is executed synchronously in a single IO thead.")
    public void singleBlocking() throws Exception {
        super.singleBlocking();
        this.assertSynchronousIOScheduler(1);
    }

    @Override
    @Description(value="Regardless of processor type, when the WorkQueueProcessingStrategy is configured, the pipeline is executed synchronously in a single IO thead.")
    public void multipleBlocking() throws Exception {
        super.multipleBlocking();
        this.assertSynchronousIOScheduler(1);
    }

    @Override
    @Description(value="Regardless of processor type, when the WorkQueueProcessingStrategy is configured, the pipeline is executed synchronously in a single IO thead.")
    public void singleCpuIntensive() throws Exception {
        super.singleCpuIntensive();
        this.assertSynchronousIOScheduler(1);
    }

    @Override
    @Description(value="Regardless of processor type, when the WorkQueueProcessingStrategy is configured, the pipeline is executed synchronously in a single IO thead.")
    public void multipleCpuIntensive() throws Exception {
        super.multipleCpuIntensive();
        this.assertSynchronousIOScheduler(1);
    }

    @Override
    @Description(value="Regardless of processor type, when the WorkQueueProcessingStrategy is configured, the pipeline is executed synchronously in a single IO thead.")
    public void mix() throws Exception {
        super.mix();
        this.assertSynchronousIOScheduler(1);
    }

    @Override
    @Description(value="Regardless of processor type, when the WorkQueueProcessingStrategy is configured, the pipeline is executed synchronously in a single IO thead.")
    public void mix2() throws Exception {
        super.mix2();
        this.assertSynchronousIOScheduler(1);
    }

    @Override
    @Description(value="When the WorkQueueProcessingStrategy is configured and a transaction is active processing fails with an error")
    public void tx() throws Exception {
        this.flow = ((Flow.Builder)this.flowBuilder.get()).processors(new Processor[]{this.cpuLightProcessor, this.cpuIntensiveProcessor, this.blockingProcessor}).build();
        this.flow.initialise();
        this.flow.start();
        TransactionCoordination.getInstance().bindTransaction((Transaction)new TestTransaction(muleContext));
        this.expectedException.expect(MessagingException.class);
        this.expectedException.expectCause(CoreMatchers.instanceOf(DefaultMuleException.class));
        this.expectedException.expectCause(ThrowableMessageMatcher.hasMessage((Matcher)CoreMatchers.equalTo((Object)"Unable to process a transactional flow asynchronously")));
        this.processFlow(this.testEvent());
    }

    @Override
    @Description(value="When the WorkQueueProcessingStrategy is configured any async processing will be returned to IO thread. This helps avoid deadlocks when there are reduced number of threads used by async processor.")
    public void asyncCpuLight() throws Exception {
        super.asyncCpuLight();
        MatcherAssert.assertThat((Object)this.threads.size(), WorkQueueProcessingStrategyTestCase.between(1, 2));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("I/O")).count(), WorkQueueProcessingStrategyTestCase.between(1L, 2L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuLight"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuIntensive"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"custom"))));
    }

    @Override
    @Description(value="When the WorkQueueProcessingStrategy is configured any async processing will be returned to IO thread. This helps avoid deadlocks when there are reduced number of threads used by async processor.")
    public void asyncCpuLightConcurrent() throws Exception {
        super.asyncCpuLightConcurrent();
        MatcherAssert.assertThat((Object)this.threads.size(), WorkQueueProcessingStrategyTestCase.between(2, 4));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("I/O")).count(), WorkQueueProcessingStrategyTestCase.between(2L, 4L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuLight"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuIntensive"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"custom"))));
    }

    private void assertSynchronousIOScheduler(int concurrency) {
        MatcherAssert.assertThat((Object)this.threads.size(), (Matcher)CoreMatchers.equalTo((Object)concurrency));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("I/O")).count(), (Matcher)CoreMatchers.equalTo((Object)concurrency));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuLight"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuIntensive"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"custom"))));
    }

    @Override
    @Description(value="Concurrent stream with concurrency of 8 only uses four IO threads.")
    public void concurrentStream() throws Exception {
        super.concurrentStream();
        MatcherAssert.assertThat((Object)this.threads, (Matcher)Matchers.hasSize((int)4));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("I/O")).count(), (Matcher)CoreMatchers.equalTo((Object)4L));
    }

    @Test
    @Description(value="If IO pool is busy OVERLOAD error is thrown")
    public void rejectedExecution() throws Exception {
        this.flow = ((Flow.Builder)this.flowBuilder.get()).processors(new Processor[]{this.blockingProcessor}).processingStrategyFactory((context, prefix) -> new WorkQueueProcessingStrategyFactory.WorkQueueProcessingStrategy(() -> new AbstractProcessingStrategyTestCase.RejectingScheduler(this.blocking))).build();
        this.flow.initialise();
        this.flow.start();
        this.expectRejected();
        this.processFlow(this.testEvent());
    }

    @Test
    @Description(value="If IO pool has maximum size of 1 only 1 thread is used for CPU_LIGHT processor and further requests block.")
    public void singleCpuLightConcurrentMaxConcurrency1() throws Exception {
        this.internalConcurrent(((Flow.Builder)this.flowBuilder.get()).processingStrategyFactory((context, prefix) -> new WorkQueueProcessingStrategyFactory.WorkQueueProcessingStrategy(() -> new AbstractProcessingStrategyTestCase.TestScheduler(1, "I/O", true))), true, ReactiveProcessor.ProcessingType.CPU_LITE, 1, new Processor[0]);
        MatcherAssert.assertThat((Object)this.threads, (Matcher)Matchers.hasSize((int)1));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("I/O")).count(), (Matcher)CoreMatchers.equalTo((Object)1L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuLight"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuIntensive"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"custom"))));
    }

    @Test
    @Description(value="If IO pool has maximum size of 1 only 1 thread is used  for BLOCKING processor and further requests block.")
    public void singleBlockingConcurrentMaxConcurrency1() throws Exception {
        this.internalConcurrent(((Flow.Builder)this.flowBuilder.get()).processingStrategyFactory((context, prefix) -> new WorkQueueProcessingStrategyFactory.WorkQueueProcessingStrategy(() -> new AbstractProcessingStrategyTestCase.TestScheduler(1, "I/O", true))), true, ReactiveProcessor.ProcessingType.BLOCKING, 1, new Processor[0]);
        MatcherAssert.assertThat((Object)this.threads, (Matcher)Matchers.hasSize((int)1));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("I/O")).count(), (Matcher)CoreMatchers.equalTo((Object)1L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuLight"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuIntensive"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"custom"))));
    }

    @Test
    @Description(value="Notifications are invoked on IO thread")
    public void asyncProcessorNotificationExecutionThreads() throws Exception {
        AtomicReference<Thread> beforeThread = new AtomicReference<Thread>();
        AtomicReference<Thread> afterThread = new AtomicReference<Thread>();
        this.testAsyncCpuLightNotificationThreads(beforeThread, afterThread);
        MatcherAssert.assertThat((Object)beforeThread.get().getName(), (Matcher)Matchers.startsWith((String)"I/O"));
        MatcherAssert.assertThat((Object)afterThread.get().getName(), (Matcher)Matchers.startsWith((String)"I/O"));
    }

    @Test
    @Description(value="Regardless of back-pressure strategy this processing strategy blocks and processes all events")
    public void sourceBackPressureWait() throws Exception {
        if (this.mode.equals((Object)AbstractProcessingStrategyTestCase.Mode.SOURCE)) {
            this.testBackPressure(MessageSource.BackPressureStrategy.WAIT, (Matcher<Integer>)CoreMatchers.equalTo((Object)2000), (Matcher<Integer>)CoreMatchers.equalTo((Object)0), (Matcher<Integer>)CoreMatchers.equalTo((Object)2000));
        }
    }

    @Test
    @Description(value="Regardless of back-pressure strategy this processing strategy blocks and processes all events")
    public void sourceBackPressureFail() throws Exception {
        if (this.mode.equals((Object)AbstractProcessingStrategyTestCase.Mode.SOURCE)) {
            this.testBackPressure(MessageSource.BackPressureStrategy.FAIL, (Matcher<Integer>)CoreMatchers.equalTo((Object)2000), (Matcher<Integer>)CoreMatchers.equalTo((Object)0), (Matcher<Integer>)CoreMatchers.equalTo((Object)2000));
        }
    }

    @Test
    @Description(value="Regardless of back-pressure strategy this processing strategy blocks and processes all events")
    public void sourceBackPressureDrop() throws Exception {
        if (this.mode.equals((Object)AbstractProcessingStrategyTestCase.Mode.SOURCE)) {
            this.testBackPressure(MessageSource.BackPressureStrategy.DROP, (Matcher<Integer>)CoreMatchers.equalTo((Object)2000), (Matcher<Integer>)CoreMatchers.equalTo((Object)0), (Matcher<Integer>)CoreMatchers.equalTo((Object)2000));
        }
    }

    @Test
    @Description(value="If IO pool is busy OVERLOAD error is thrown")
    public void blockingRejectedExecution() throws Exception {
        Scheduler blockingSchedulerSpy = (Scheduler)Mockito.spy((Object)this.blocking);
        Scheduler rejectingSchedulerSpy = (Scheduler)Mockito.spy((Object)new AbstractProcessingStrategyTestCase.RejectingScheduler(blockingSchedulerSpy));
        this.flow = ((Flow.Builder)this.flowBuilder.get()).processors(new Processor[]{this.blockingProcessor}).processingStrategyFactory((context, prefix) -> new WorkQueueProcessingStrategyFactory.WorkQueueProcessingStrategy(() -> rejectingSchedulerSpy)).build();
        this.flow.initialise();
        this.flow.start();
        this.expectRejected();
        this.processFlow(this.testEvent());
    }

    @Test
    @Description(value="Regardless of processor type, when the WorkQueueProcessingStrategy is configured, the pipeline is executed synchronously in a single IO thead.")
    public void singleIORW() throws Exception {
        super.singleIORW(() -> this.testEvent());
        this.assertSynchronousIOScheduler(1);
    }
}

