/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.processor.strategy;

import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicReference;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.IsCollectionContaining;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.mule.runtime.core.api.DefaultMuleException;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.api.transaction.Transaction;
import org.mule.runtime.core.exception.MessagingException;
import org.mule.runtime.core.processor.strategy.AbstractProcessingStrategyTestCase;
import org.mule.runtime.core.processor.strategy.WorkQueueProcessingStrategyFactory;
import org.mule.runtime.core.transaction.TransactionCoordination;
import org.mule.tck.junit4.AbstractReactiveProcessorTestCase;
import org.mule.tck.testmodels.mule.TestTransaction;
import ru.yandex.qatools.allure.annotations.Description;
import ru.yandex.qatools.allure.annotations.Features;
import ru.yandex.qatools.allure.annotations.Stories;

@Features(value={"Processing Strategies"})
@Stories(value={"Work Queue"})
public class WorkQueueProcessingStrategyTestCase
extends AbstractProcessingStrategyTestCase {
    public WorkQueueProcessingStrategyTestCase(AbstractReactiveProcessorTestCase.Mode mode) {
        super(mode);
    }

    @Override
    protected ProcessingStrategy createProcessingStrategy(MuleContext muleContext, String schedulersNamePrefix) {
        return new WorkQueueProcessingStrategyFactory.WorkQueueProcessingStrategy(() -> this.blocking);
    }

    @Override
    @Description(value="Regardless of processor type, when the WorkQueueProcessingStrategy is configured, the pipeline is executed synchronously in a single IO thead.")
    public void singleCpuLight() throws Exception {
        super.singleCpuLight();
        this.assertSynchronousIOScheduler(1);
    }

    @Override
    @Description(value="Regardless of processor type, when the WorkQueueProcessingStrategy is configured, the pipeline is executed synchronously in a single IO thead.")
    public void singleCpuLightConcurrent() throws Exception {
        super.singleCpuLightConcurrent();
        MatcherAssert.assertThat((Object)this.threads.size(), (Matcher)CoreMatchers.allOf((Matcher)Matchers.greaterThanOrEqualTo((Comparable)Integer.valueOf(1)), (Matcher)Matchers.lessThanOrEqualTo((Comparable)Integer.valueOf(2))));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("I/O")).count(), (Matcher)CoreMatchers.allOf((Matcher)Matchers.greaterThanOrEqualTo((Comparable)Long.valueOf(1L)), (Matcher)Matchers.lessThanOrEqualTo((Comparable)Long.valueOf(2L))));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("cpuLight")).count(), (Matcher)CoreMatchers.equalTo((Object)0L));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("cpuIntensive")).count(), (Matcher)CoreMatchers.equalTo((Object)0L));
    }

    @Override
    @Description(value="Regardless of processor type, when the WorkQueueProcessingStrategy is configured, the pipeline is executed synchronously in a single IO thead.")
    public void multipleCpuLight() throws Exception {
        super.multipleCpuLight();
        this.assertSynchronousIOScheduler(1);
    }

    @Override
    @Description(value="Regardless of processor type, when the WorkQueueProcessingStrategy is configured, the pipeline is executed synchronously in a single IO thead.")
    public void singleBlocking() throws Exception {
        super.singleBlocking();
        this.assertSynchronousIOScheduler(1);
    }

    @Override
    @Description(value="Regardless of processor type, when the WorkQueueProcessingStrategy is configured, the pipeline is executed synchronously in a single IO thead.")
    public void multipleBlocking() throws Exception {
        super.multipleBlocking();
        this.assertSynchronousIOScheduler(1);
    }

    @Override
    @Description(value="Regardless of processor type, when the WorkQueueProcessingStrategy is configured, the pipeline is executed synchronously in a single IO thead.")
    public void singleCpuIntensive() throws Exception {
        super.singleCpuIntensive();
        this.assertSynchronousIOScheduler(1);
    }

    @Override
    @Description(value="Regardless of processor type, when the WorkQueueProcessingStrategy is configured, the pipeline is executed synchronously in a single IO thead.")
    public void multipleCpuIntensive() throws Exception {
        super.multipleCpuIntensive();
        this.assertSynchronousIOScheduler(1);
    }

    @Override
    @Description(value="Regardless of processor type, when the WorkQueueProcessingStrategy is configured, the pipeline is executed synchronously in a single IO thead.")
    public void mix() throws Exception {
        super.mix();
        this.assertSynchronousIOScheduler(1);
    }

    @Override
    @Description(value="Regardless of processor type, when the WorkQueueProcessingStrategy is configured, the pipeline is executed synchronously in a single IO thead.")
    public void mix2() throws Exception {
        super.mix2();
        this.assertSynchronousIOScheduler(1);
    }

    @Override
    @Description(value="When the WorkQueueProcessingStrategy is configured and a transaction is active processing fails with an error")
    public void tx() throws Exception {
        this.flow.setMessageProcessors(Arrays.asList(this.cpuLightProcessor, this.cpuIntensiveProcessor, this.blockingProcessor));
        this.flow.initialise();
        this.flow.start();
        TransactionCoordination.getInstance().bindTransaction((Transaction)new TestTransaction(muleContext));
        this.expectedException.expect(MessagingException.class);
        this.expectedException.expectCause(CoreMatchers.instanceOf(DefaultMuleException.class));
        this.expectedException.expectCause(ThrowableMessageMatcher.hasMessage((Matcher)CoreMatchers.equalTo((Object)"Unable to process a transactional flow asynchronously")));
        this.process((Processor)this.flow, this.testEvent());
    }

    @Override
    @Description(value="When the WorkQueueProcessingStrategy is configured any async processing will be returned to IO thread. This helps avoid deadlocks when there are reduced number of threads used by async processor.")
    public void asyncCpuLight() throws Exception {
        super.asyncCpuLight();
        MatcherAssert.assertThat((Object)this.threads.size(), WorkQueueProcessingStrategyTestCase.between(1, 2));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("I/O")).count(), WorkQueueProcessingStrategyTestCase.between(1L, 2L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuLight"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuIntensive"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"custom"))));
    }

    @Override
    @Description(value="When the WorkQueueProcessingStrategy is configured any async processing will be returned to IO thread. This helps avoid deadlocks when there are reduced number of threads used by async processor.")
    public void asyncCpuLightConcurrent() throws Exception {
        super.asyncCpuLightConcurrent();
        MatcherAssert.assertThat((Object)this.threads.size(), WorkQueueProcessingStrategyTestCase.between(2, 4));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("I/O")).count(), WorkQueueProcessingStrategyTestCase.between(2L, 4L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuLight"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuIntensive"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"custom"))));
    }

    private void assertSynchronousIOScheduler(int concurrency) {
        MatcherAssert.assertThat((Object)this.threads.size(), (Matcher)CoreMatchers.equalTo((Object)concurrency));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("I/O")).count(), (Matcher)CoreMatchers.equalTo((Object)concurrency));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuLight"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuIntensive"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"custom"))));
    }

    @Override
    @Description(value="Concurrent stream with concurrency of 8 only uses four IO threads.")
    public void concurrentStream() throws Exception {
        super.concurrentStream();
        MatcherAssert.assertThat((Object)this.threads, (Matcher)Matchers.hasSize((int)4));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("I/O")).count(), (Matcher)CoreMatchers.equalTo((Object)4L));
    }

    @Test
    @Description(value="If IO pool is busy OVERLOAD error is thrown")
    public void rejectedExecution() throws Exception {
        this.flow.setProcessingStrategyFactory((context, prefix) -> new WorkQueueProcessingStrategyFactory.WorkQueueProcessingStrategy(() -> new AbstractProcessingStrategyTestCase.RejectingScheduler()));
        this.flow.setMessageProcessors(Collections.singletonList(this.blockingProcessor));
        this.flow.initialise();
        this.flow.start();
        this.expectRejected();
        this.process((Processor)this.flow, this.testEvent());
    }

    @Test
    @Description(value="If IO pool has maximum size of 1 only 1 thread is used for CPU_LIGHT processor and further requests block.")
    public void singleCpuLightConcurrentMaxConcurrency1() throws Exception {
        this.flow.setProcessingStrategyFactory((context, prefix) -> new WorkQueueProcessingStrategyFactory.WorkQueueProcessingStrategy(() -> new AbstractProcessingStrategyTestCase.TestScheduler(1, "I/O")));
        this.internalConcurrent(true, ReactiveProcessor.ProcessingType.CPU_LITE, 1, new Processor[0]);
        MatcherAssert.assertThat((Object)this.threads, (Matcher)Matchers.hasSize((int)1));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("I/O")).count(), (Matcher)CoreMatchers.equalTo((Object)1L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuLight"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuIntensive"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"custom"))));
    }

    @Test
    @Description(value="If IO pool has maximum size of 1 only 1 thread is used  for BLOCKING processor and further requests block.")
    public void singleBlockingConcurrentMaxConcurrency1() throws Exception {
        this.flow.setProcessingStrategyFactory((context, prefix) -> new WorkQueueProcessingStrategyFactory.WorkQueueProcessingStrategy(() -> new AbstractProcessingStrategyTestCase.TestScheduler(1, "I/O")));
        this.internalConcurrent(true, ReactiveProcessor.ProcessingType.BLOCKING, 1, new Processor[0]);
        MatcherAssert.assertThat((Object)this.threads, (Matcher)Matchers.hasSize((int)1));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("I/O")).count(), (Matcher)CoreMatchers.equalTo((Object)1L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuLight"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuIntensive"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"custom"))));
    }

    @Test
    @Description(value="Notifications are invoked on IO thread")
    public void asyncProcessorNotificationExecutionThreads() throws Exception {
        AtomicReference<Thread> beforeThread = new AtomicReference<Thread>();
        AtomicReference<Thread> afterThread = new AtomicReference<Thread>();
        this.testAsyncCpuLightNotificationThreads(beforeThread, afterThread);
        MatcherAssert.assertThat((Object)beforeThread.get().getName(), (Matcher)Matchers.startsWith((String)"I/O"));
        MatcherAssert.assertThat((Object)afterThread.get().getName(), (Matcher)Matchers.startsWith((String)"I/O"));
    }
}

