/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.processor.strategy;

import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.IsCollectionContaining;
import org.junit.Test;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.processor.strategy.ReactorProcessingStrategyTestCase;
import org.mule.runtime.core.processor.strategy.ReactorStreamProcessingStrategyFactory;
import org.mule.tck.junit4.AbstractReactiveProcessorTestCase;
import ru.yandex.qatools.allure.annotations.Description;
import ru.yandex.qatools.allure.annotations.Features;
import ru.yandex.qatools.allure.annotations.Stories;

@Features(value={"Processing Strategies"})
@Stories(value={"Reactor"})
public class ReactorStreamProcessingStrategyTestCase
extends ReactorProcessingStrategyTestCase {
    public ReactorStreamProcessingStrategyTestCase(AbstractReactiveProcessorTestCase.Mode mode) {
        super(mode);
    }

    @Override
    protected ProcessingStrategy createProcessingStrategy(MuleContext muleContext, String schedulersNamePrefix) {
        return new ReactorStreamProcessingStrategyFactory.ReactorStreamProcessingStrategy(() -> this.ringBuffer, ReactorStreamProcessingStrategyFactory.DEFAULT_BUFFER_SIZE, 1, ReactorStreamProcessingStrategyFactory.DEFAULT_WAIT_STRATEGY, () -> this.cpuLight, Integer.MAX_VALUE);
    }

    @Override
    @Test
    @Description(value="If max concurrency is 1, only 1 thread is used for BLOCKING processors and further requests blocks. When maxConcurrency < subscribers processing is done on ring-buffer thread.")
    public void singleCpuLightConcurrentMaxConcurrency1() throws Exception {
        this.flow.setProcessingStrategyFactory((context, prefix) -> new ReactorStreamProcessingStrategyFactory.ReactorStreamProcessingStrategy(() -> this.ringBuffer, ReactorStreamProcessingStrategyFactory.DEFAULT_BUFFER_SIZE, 1, ReactorStreamProcessingStrategyFactory.DEFAULT_WAIT_STRATEGY, () -> this.cpuLight, 1));
        this.internalConcurrent(true, ReactiveProcessor.ProcessingType.CPU_LITE, 1, new Processor[0]);
        MatcherAssert.assertThat((Object)this.threads, (Matcher)Matchers.hasSize((int)1));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("ringBuffer")).count(), (Matcher)CoreMatchers.equalTo((Object)1L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuLight"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"I/O"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuIntensive"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"custom"))));
    }

    @Test
    @Description(value="If max concurrency is 2, only 2 threads are used for BLOCKING processors and further requests blocks.")
    public void singleCpuLightConcurrentMaxConcurrency2() throws Exception {
        this.flow.setProcessingStrategyFactory((context, prefix) -> new ReactorStreamProcessingStrategyFactory.ReactorStreamProcessingStrategy(() -> this.ringBuffer, ReactorStreamProcessingStrategyFactory.DEFAULT_BUFFER_SIZE, 1, ReactorStreamProcessingStrategyFactory.DEFAULT_WAIT_STRATEGY, () -> this.cpuLight, 2));
        this.internalConcurrent(true, ReactiveProcessor.ProcessingType.CPU_LITE, 2, new Processor[0]);
        MatcherAssert.assertThat((Object)this.threads, (Matcher)Matchers.hasSize((int)2));
        MatcherAssert.assertThat((Object)this.threads.stream().filter(name -> name.startsWith("cpuLight")).count(), (Matcher)CoreMatchers.equalTo((Object)2L));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"I/O"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"cpuIntensive"))));
        MatcherAssert.assertThat((Object)this.threads, (Matcher)CoreMatchers.not((Matcher)IsCollectionContaining.hasItem((Matcher)Matchers.startsWith((String)"custom"))));
    }
}

