/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.processor.strategy;

import io.qameta.allure.Description;
import io.qameta.allure.Feature;
import io.qameta.allure.Story;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.IsCollectionContaining;
import org.junit.Test;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.construct.Flow;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.api.source.MessageSource;
import org.mule.runtime.core.internal.processor.strategy.AbstractProcessingStrategyTestCase;
import org.mule.runtime.core.internal.processor.strategy.AbstractStreamProcessingStrategyFactory;
import org.mule.runtime.core.internal.processor.strategy.WorkQueueProcessingStrategyTestCase;
import org.mule.runtime.core.internal.processor.strategy.WorkQueueStreamProcessingStrategyFactory;
import reactor.util.concurrent.Queues;

@Feature(value="Processing Strategies")
@Story(value="Work Queue")
public class WorkQueueStreamProcessingStrategyTestCase
extends WorkQueueProcessingStrategyTestCase {
    public WorkQueueStreamProcessingStrategyTestCase(AbstractProcessingStrategyTestCase.Mode mode) {
        super(mode);
    }

    @Override
    protected ProcessingStrategy createProcessingStrategy(MuleContext muleContext, String schedulersNamePrefix) {
        return new WorkQueueStreamProcessingStrategyFactory.WorkQueueStreamProcessingStrategy(() -> this.blocking, Queues.XS_BUFFER_SIZE, 1, AbstractStreamProcessingStrategyFactory.DEFAULT_WAIT_STRATEGY, () -> this.blocking, 4);
    }

    @Override
    @Test
    @Description(value="If IO pool has maximum size of 1 only 1 thread is used for CPU_LIGHT processor and further requests block.")
    public void singleCpuLightConcurrentMaxConcurrency1() throws Exception {
        this.internalConcurrent(((Flow.Builder)this.flowBuilder.get()).processingStrategyFactory((context, prefix) -> new WorkQueueStreamProcessingStrategyFactory.WorkQueueStreamProcessingStrategy(() -> this.blocking, AbstractStreamProcessingStrategyFactory.DEFAULT_BUFFER_SIZE, 1, AbstractStreamProcessingStrategyFactory.DEFAULT_WAIT_STRATEGY, () -> this.blocking, 1)), true, ReactiveProcessor.ProcessingType.CPU_LITE, 1, new Processor[0]);
        MatcherAssert.assertThat((Object)this.threads, (Matcher)Matchers.hasSize((int)1));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("I/O")).count(), (Matcher)CoreMatchers.equalTo((Object)1L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuLight"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuIntensive"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"custom"))));
    }

    @Override
    @Test
    @Description(value="If IO pool has maximum size of 1 only 1 thread is used for BLOCKING processor and further requests block.")
    public void singleBlockingConcurrentMaxConcurrency1() throws Exception {
        this.internalConcurrent(((Flow.Builder)this.flowBuilder.get()).processingStrategyFactory((context, prefix) -> new WorkQueueStreamProcessingStrategyFactory.WorkQueueStreamProcessingStrategy(() -> this.blocking, AbstractStreamProcessingStrategyFactory.DEFAULT_BUFFER_SIZE, 1, AbstractStreamProcessingStrategyFactory.DEFAULT_WAIT_STRATEGY, () -> this.blocking, 1)), true, ReactiveProcessor.ProcessingType.BLOCKING, 1, new Processor[0]);
        MatcherAssert.assertThat((Object)this.threads, (Matcher)Matchers.hasSize((int)1));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("I/O")).count(), (Matcher)CoreMatchers.equalTo((Object)1L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuLight"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuIntensive"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"custom"))));
    }

    @Override
    @Test
    @Description(value="When back-pressure strategy is 'WAIT' the source thread blocks and all requests are processed.")
    public void sourceBackPressureWait() throws Exception {
        this.testBackPressure(MessageSource.BackPressureStrategy.WAIT, (Matcher<Integer>)CoreMatchers.equalTo((Object)2000), (Matcher<Integer>)CoreMatchers.equalTo((Object)0), (Matcher<Integer>)CoreMatchers.equalTo((Object)2000));
    }

    @Override
    @Test
    @Description(value="When back-pressure strategy is 'FAIL' some requests fail with an OVERLOAD error.")
    public void sourceBackPressureFail() throws Exception {
        this.testBackPressure(MessageSource.BackPressureStrategy.FAIL, (Matcher<Integer>)Matchers.lessThan((Comparable)Integer.valueOf(2000)), (Matcher<Integer>)Matchers.greaterThan((Comparable)Integer.valueOf(0)), (Matcher<Integer>)CoreMatchers.equalTo((Object)2000));
    }

    @Override
    @Test
    @Description(value="When back-pressure strategy is 'DROP' the flow rejects requests in the same way way with 'FAIL. It is the source that handles FAIL and DROP differently.")
    public void sourceBackPressureDrop() throws Exception {
        this.testBackPressure(MessageSource.BackPressureStrategy.DROP, (Matcher<Integer>)Matchers.lessThan((Comparable)Integer.valueOf(2000)), (Matcher<Integer>)Matchers.greaterThan((Comparable)Integer.valueOf(0)), (Matcher<Integer>)CoreMatchers.equalTo((Object)2000));
    }
}

