/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.processor.strategy;

import io.qameta.allure.Description;
import io.qameta.allure.Feature;
import io.qameta.allure.Story;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.IsCollectionContaining;
import org.junit.Test;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.construct.Flow;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.api.source.MessageSource;
import org.mule.runtime.core.internal.processor.strategy.AbstractProcessingStrategyTestCase;
import org.mule.runtime.core.internal.processor.strategy.AbstractStreamProcessingStrategyFactory;
import org.mule.runtime.core.internal.processor.strategy.ReactorProcessingStrategyTestCase;
import org.mule.runtime.core.internal.processor.strategy.ReactorStreamProcessingStrategyFactory;
import reactor.util.concurrent.Queues;

@Feature(value="Processing Strategies")
@Story(value="Reactor")
public class ReactorStreamProcessingStrategyTestCase
extends ReactorProcessingStrategyTestCase {
    public ReactorStreamProcessingStrategyTestCase(AbstractProcessingStrategyTestCase.Mode mode) {
        super(mode);
    }

    @Override
    protected ProcessingStrategy createProcessingStrategy(MuleContext muleContext, String schedulersNamePrefix) {
        return new ReactorStreamProcessingStrategyFactory.ReactorStreamProcessingStrategy(() -> this.ringBuffer, Queues.XS_BUFFER_SIZE, 1, AbstractStreamProcessingStrategyFactory.DEFAULT_WAIT_STRATEGY, () -> this.cpuLight, AbstractStreamProcessingStrategyFactory.CORES, Integer.MAX_VALUE);
    }

    @Override
    @Test
    @Description(value="If max concurrency is 1, only 1 thread is used for BLOCKING processors and further requests blocks. When maxConcurrency < subscribers processing is done on ring-buffer thread.")
    public void singleCpuLightConcurrentMaxConcurrency1() throws Exception {
        this.internalConcurrent(((Flow.Builder)this.flowBuilder.get()).processingStrategyFactory((context, prefix) -> new ReactorStreamProcessingStrategyFactory.ReactorStreamProcessingStrategy(() -> this.ringBuffer, AbstractStreamProcessingStrategyFactory.DEFAULT_BUFFER_SIZE, 1, AbstractStreamProcessingStrategyFactory.DEFAULT_WAIT_STRATEGY, () -> this.cpuLight, AbstractStreamProcessingStrategyFactory.CORES, 1)), true, ReactiveProcessor.ProcessingType.CPU_LITE, 1, new Processor[0]);
        MatcherAssert.assertThat((Object)this.threads, (Matcher)Matchers.hasSize((int)1));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("ringBuffer")).count(), (Matcher)CoreMatchers.equalTo((Object)1L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuLight"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"I/O"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuIntensive"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"custom"))));
    }

    @Test
    @Description(value="If max concurrency is 2, only 2 threads are used for BLOCKING processors and further requests blocks.")
    public void singleCpuLightConcurrentMaxConcurrency2() throws Exception {
        this.internalConcurrent(((Flow.Builder)this.flowBuilder.get()).processingStrategyFactory((context, prefix) -> new ReactorStreamProcessingStrategyFactory.ReactorStreamProcessingStrategy(() -> this.ringBuffer, AbstractStreamProcessingStrategyFactory.DEFAULT_BUFFER_SIZE, 1, AbstractStreamProcessingStrategyFactory.DEFAULT_WAIT_STRATEGY, () -> this.cpuLight, AbstractStreamProcessingStrategyFactory.CORES, 2)), true, ReactiveProcessor.ProcessingType.CPU_LITE, 2, new Processor[0]);
        MatcherAssert.assertThat((Object)this.threads, (Matcher)Matchers.hasSize((int)2));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("cpuLight")).count(), (Matcher)CoreMatchers.equalTo((Object)2L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"I/O"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuIntensive"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"custom"))));
    }

    @Override
    @Test
    @Description(value="When back-pressure strategy is 'WAIT' the source thread blocks and all requests are processed.")
    public void sourceBackPressureWait() throws Exception {
        if (this.mode.equals((Object)AbstractProcessingStrategyTestCase.Mode.SOURCE)) {
            this.testBackPressure(MessageSource.BackPressureStrategy.WAIT, (Matcher<Integer>)CoreMatchers.equalTo((Object)2000), (Matcher<Integer>)CoreMatchers.equalTo((Object)0), (Matcher<Integer>)CoreMatchers.equalTo((Object)2000));
        }
    }

    @Override
    @Test
    @Description(value="When back-pressure strategy is 'FAIL' some requests fail with an OVERLOAD error.")
    public void sourceBackPressureFail() throws Exception {
        if (this.mode.equals((Object)AbstractProcessingStrategyTestCase.Mode.SOURCE)) {
            this.testBackPressure(MessageSource.BackPressureStrategy.FAIL, (Matcher<Integer>)Matchers.lessThan((Comparable)Integer.valueOf(2000)), (Matcher<Integer>)Matchers.greaterThan((Comparable)Integer.valueOf(0)), (Matcher<Integer>)CoreMatchers.equalTo((Object)2000));
        }
    }

    @Override
    @Test
    @Description(value="When back-pressure strategy is 'DROP' the flow rejects requests in the same way way with 'FAIL. It is the source that handles FAIL and DROP differently.")
    public void sourceBackPressureDrop() throws Exception {
        if (this.mode.equals((Object)AbstractProcessingStrategyTestCase.Mode.SOURCE)) {
            this.testBackPressure(MessageSource.BackPressureStrategy.DROP, (Matcher<Integer>)Matchers.lessThan((Comparable)Integer.valueOf(2000)), (Matcher<Integer>)Matchers.greaterThan((Comparable)Integer.valueOf(0)), (Matcher<Integer>)CoreMatchers.equalTo((Object)2000));
        }
    }
}

