/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.routing.forkjoin;

import io.qameta.allure.Description;
import io.qameta.allure.Feature;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.mule.runtime.api.component.AbstractComponent;
import org.mule.runtime.api.exception.DefaultMuleException;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.message.Error;
import org.mule.runtime.api.message.ErrorType;
import org.mule.runtime.api.message.Message;
import org.mule.runtime.api.metadata.DataType;
import org.mule.runtime.api.metadata.TypedValue;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.exception.Errors;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.api.rx.Exceptions;
import org.mule.runtime.core.api.util.func.CheckedConsumer;
import org.mule.runtime.core.internal.exception.MessagingException;
import org.mule.runtime.core.internal.routing.ForkJoinStrategy;
import org.mule.runtime.core.privileged.processor.InternalProcessor;
import org.mule.runtime.core.privileged.processor.MessageProcessors;
import org.mule.runtime.core.privileged.processor.chain.MessageProcessorChain;
import org.mule.runtime.core.privileged.routing.CompositeRoutingException;
import org.mule.runtime.core.privileged.routing.RoutingResult;
import org.mule.tck.junit4.AbstractMuleContextTestCase;
import org.mule.tck.testmodels.fruit.Apple;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

@Feature(value="Fork/Join Strategies used by scatter-gather and foreach routers")
public abstract class AbstractForkJoinStrategyTestCase
extends AbstractMuleContextTestCase {
    @Rule
    public ExpectedException expectedException = ExpectedException.none();
    protected ForkJoinStrategy strategy;
    protected ProcessingStrategy processingStrategy;
    protected Scheduler scheduler;
    protected ErrorType timeoutErrorType;

    @Before
    public void setup() {
        this.processingStrategy = (ProcessingStrategy)Mockito.mock(ProcessingStrategy.class);
        Mockito.when((Object)this.processingStrategy.onPipeline((ReactiveProcessor)Matchers.any(ReactiveProcessor.class))).thenAnswer(invocation -> (ReactiveProcessor)invocation.getArgumentAt(0, ReactiveProcessor.class));
        this.scheduler = muleContext.getSchedulerService().ioScheduler();
        this.timeoutErrorType = (ErrorType)muleContext.getErrorTypeRepository().getErrorType(Errors.ComponentIdentifiers.Handleable.TIMEOUT).get();
        this.setupConcurrentProcessingStrategy();
        this.strategy = this.createStrategy(this.processingStrategy, Integer.MAX_VALUE, true, Integer.MAX_VALUE);
    }

    @After
    public void tearDown() {
        this.scheduler.stop();
    }

    protected abstract ForkJoinStrategy createStrategy(ProcessingStrategy var1, int var2, boolean var3, long var4);

    @Test
    @Description(value="When a route timeout occurs a CompositeRoutingException is thrown with details of timeout error in RoutingResult.")
    public void timeout() throws Throwable {
        this.strategy = this.createStrategy(this.processingStrategy, 1, true, 50L);
        this.expectedException.expect(CoreMatchers.instanceOf(CompositeRoutingException.class));
        this.invokeStrategyBlocking(this.strategy, this.testEvent(), Arrays.asList(this.createRoutingPairWithSleep(Message.of((Object)1), 250L)), (CheckedConsumer<Throwable>)((CheckedConsumer)throwable -> {
            CompositeRoutingException compositeRoutingException = this.assertCompositeRoutingException((Throwable)throwable, 1);
            RoutingResult routingResult = this.assertRoutingResult(compositeRoutingException, 0, 1);
            Assert.assertThat((Object)((Error)routingResult.getFailures().get("0")).getCause(), (Matcher)CoreMatchers.instanceOf(TimeoutException.class));
        }));
    }

    @Test
    @Description(value="When a route timeout occurs all routes are still executed and  a CompositeRoutingException is thrown with details of timeout error and successful routes in RoutingResult.")
    public void timeoutDelayed() throws Throwable {
        this.strategy = this.createStrategy(this.processingStrategy, 1, true, 50L);
        Message pair2Result = Message.of((Object)2);
        Processor pair2Processor = this.createProcessorSpy(pair2Result);
        ForkJoinStrategy.RoutingPair pair2 = ForkJoinStrategy.RoutingPair.of((CoreEvent)this.testEvent(), (MessageProcessorChain)this.createChain(pair2Processor));
        this.expectedException.expect(CoreMatchers.instanceOf(CompositeRoutingException.class));
        this.invokeStrategyBlocking(this.strategy, this.testEvent(), Arrays.asList(this.createRoutingPairWithSleep(Message.of((Object)1), 250L), pair2), (CheckedConsumer<Throwable>)((CheckedConsumer)throwable -> {
            ((Processor)Mockito.verify((Object)pair2Processor, (VerificationMode)Mockito.times((int)1))).process((CoreEvent)Matchers.any(CoreEvent.class));
            CompositeRoutingException compositeRoutingException = this.assertCompositeRoutingException((Throwable)throwable, 1);
            RoutingResult routingResult = this.assertRoutingResult(compositeRoutingException, 1, 1);
            Assert.assertThat((Object)((Error)routingResult.getFailures().get("0")).getCause(), (Matcher)CoreMatchers.instanceOf(TimeoutException.class));
            Assert.assertThat(routingResult.getResults().get("1"), (Matcher)CoreMatchers.is((Object)pair2Result));
        }));
    }

    @Test
    @Description(value="When configured with delayErrors='false' the first timeout causes strategy to throw a TimeoutException.")
    public void timeoutEager() throws Throwable {
        this.strategy = this.createStrategy(this.processingStrategy, 1, false, 50L);
        Message pair2Result = Message.of((Object)2);
        Processor pair2Processor = this.createProcessorSpy(pair2Result);
        ForkJoinStrategy.RoutingPair pair2 = ForkJoinStrategy.RoutingPair.of((CoreEvent)this.testEvent(), (MessageProcessorChain)this.createChain(pair2Processor));
        this.expectedException.expect(CoreMatchers.instanceOf(DefaultMuleException.class));
        this.expectedException.expectCause(CoreMatchers.instanceOf(TimeoutException.class));
        this.invokeStrategyBlocking(this.strategy, this.testEvent(), Arrays.asList(this.createRoutingPairWithSleep(Message.of((Object)1), 250L), pair2), (CheckedConsumer<Throwable>)((CheckedConsumer)throwable -> ((Processor)Mockito.verify((Object)pair2Processor, (VerificationMode)Mockito.never())).process((CoreEvent)Matchers.any(CoreEvent.class))));
    }

    @Test
    @Description(value="Errors are thrown via CompositeRoutingException with RoutingResult containing details of failures.")
    public void error() throws Throwable {
        IllegalStateException exception = new IllegalStateException();
        ForkJoinStrategy.RoutingPair failingPair = ForkJoinStrategy.RoutingPair.of((CoreEvent)this.testEvent(), (MessageProcessorChain)this.createFailingRoutingPair(exception));
        this.expectedException.expect(CoreMatchers.instanceOf(CompositeRoutingException.class));
        this.invokeStrategyBlocking(this.strategy, this.testEvent(), Arrays.asList(failingPair), (CheckedConsumer<Throwable>)((CheckedConsumer)throwable -> {
            CompositeRoutingException compositeRoutingException = this.assertCompositeRoutingException((Throwable)throwable, 1);
            RoutingResult routingResult = this.assertRoutingResult(compositeRoutingException, 0, 1);
            Assert.assertThat((Object)((Error)routingResult.getFailures().get("0")).getCause(), (Matcher)CoreMatchers.is((Object)exception));
        }));
    }

    @Test
    @Description(value="When an error occurs all routes are executed regardless and a CompositeRoutingException is thrown containing a RoutingResult with details of both failures and successes.")
    public void errorDelayed() throws Throwable {
        Processor processorSpy = this.createProcessorSpy(this.testEvent().getMessage());
        IllegalStateException exception1 = new IllegalStateException();
        ForkJoinStrategy.RoutingPair failingPair1 = ForkJoinStrategy.RoutingPair.of((CoreEvent)this.testEvent(), (MessageProcessorChain)this.createFailingRoutingPair(exception1));
        UnsupportedOperationException exception2 = new UnsupportedOperationException();
        ForkJoinStrategy.RoutingPair failingPair2 = ForkJoinStrategy.RoutingPair.of((CoreEvent)this.testEvent(), (MessageProcessorChain)this.createFailingRoutingPair(exception2));
        IndexOutOfBoundsException exception3 = new IndexOutOfBoundsException();
        ForkJoinStrategy.RoutingPair failingPair3 = ForkJoinStrategy.RoutingPair.of((CoreEvent)this.testEvent(), (MessageProcessorChain)this.createFailingRoutingPair(exception3));
        ForkJoinStrategy.RoutingPair okPair = ForkJoinStrategy.RoutingPair.of((CoreEvent)this.testEvent(), (MessageProcessorChain)this.createChain(processorSpy));
        this.expectedException.expect(CoreMatchers.instanceOf(CompositeRoutingException.class));
        this.invokeStrategyBlocking(this.strategy, this.testEvent(), Arrays.asList(failingPair1, failingPair2, failingPair3, okPair), (CheckedConsumer<Throwable>)((CheckedConsumer)throwable -> {
            ((Processor)Mockito.verify((Object)processorSpy, (VerificationMode)Mockito.times((int)1))).process((CoreEvent)Matchers.any(CoreEvent.class));
            CompositeRoutingException compositeRoutingException = this.assertCompositeRoutingException((Throwable)throwable, 3);
            RoutingResult routingResult = this.assertRoutingResult(compositeRoutingException, 1, 3);
            Assert.assertThat((Object)((Error)routingResult.getFailures().get("0")).getCause(), (Matcher)CoreMatchers.is((Object)exception1));
            Assert.assertThat((Object)((Error)routingResult.getFailures().get("1")).getCause(), (Matcher)CoreMatchers.is((Object)exception2));
            Assert.assertThat((Object)((Error)routingResult.getFailures().get("2")).getCause(), (Matcher)CoreMatchers.is((Object)exception3));
            Assert.assertThat(routingResult.getFailures().get("3"), (Matcher)CoreMatchers.is((Matcher)CoreMatchers.nullValue()));
        }));
    }

    @Test
    @Description(value="When configured with delayErrors='false' the first errors causes strategy to throw this exception.")
    public void errorEager() throws Throwable {
        this.strategy = this.createStrategy(this.processingStrategy, 1, false, Integer.MAX_VALUE);
        Processor processorSpy = this.createProcessorSpy(Message.of((Object)1));
        IllegalStateException exception = new IllegalStateException();
        ForkJoinStrategy.RoutingPair failingPair = ForkJoinStrategy.RoutingPair.of((CoreEvent)this.testEvent(), (MessageProcessorChain)this.createFailingRoutingPair(exception));
        ForkJoinStrategy.RoutingPair okPair = ForkJoinStrategy.RoutingPair.of((CoreEvent)this.testEvent(), (MessageProcessorChain)this.createChain(processorSpy));
        this.expectedException.expect(CoreMatchers.instanceOf(MessagingException.class));
        this.expectedException.expectCause(CoreMatchers.is((Object)exception));
        this.invokeStrategyBlocking(this.strategy, this.testEvent(), Arrays.asList(failingPair, okPair), (CheckedConsumer<Throwable>)((CheckedConsumer)throwable -> ((Processor)Mockito.verify((Object)processorSpy, (VerificationMode)Mockito.never())).process((CoreEvent)Matchers.any(CoreEvent.class))));
    }

    @Test
    @Description(value="When configured with delayErrors='false' the first errors causes strategy to throw this exception. Other routes may or may not be executed depending on concurrency.")
    public void errorEagerConcurrent() throws Throwable {
        this.strategy = this.createStrategy(this.processingStrategy, 4, false, Integer.MAX_VALUE);
        Processor processorSpy = this.createProcessorSpy(Message.of((Object)1));
        Processor processorSpy2 = this.createProcessorSpy(Message.of((Object)2));
        Processor processorSpy3 = this.createProcessorSpy(Message.of((Object)3));
        CoreEvent orignial = this.testEvent();
        IllegalStateException exception = new IllegalStateException();
        ForkJoinStrategy.RoutingPair failingPair = ForkJoinStrategy.RoutingPair.of((CoreEvent)orignial, (MessageProcessorChain)this.createFailingRoutingPair(exception));
        ForkJoinStrategy.RoutingPair okPair = ForkJoinStrategy.RoutingPair.of((CoreEvent)orignial, (MessageProcessorChain)this.createChain(processorSpy));
        ForkJoinStrategy.RoutingPair okPair2 = ForkJoinStrategy.RoutingPair.of((CoreEvent)orignial, (MessageProcessorChain)this.createChain(processorSpy2));
        ForkJoinStrategy.RoutingPair okPair3 = ForkJoinStrategy.RoutingPair.of((CoreEvent)orignial, (MessageProcessorChain)this.createChain(processorSpy3));
        this.expectedException.expect(CoreMatchers.instanceOf(MessagingException.class));
        this.expectedException.expectCause(CoreMatchers.is((Object)exception));
        this.invokeStrategyBlocking(this.strategy, this.testEvent(), Arrays.asList(failingPair, okPair, okPair2, okPair3), (CheckedConsumer<Throwable>)((CheckedConsumer)throwable -> {
            ((Processor)Mockito.verify((Object)processorSpy, (VerificationMode)Mockito.atMost((int)1))).process((CoreEvent)Matchers.any(CoreEvent.class));
            ((Processor)Mockito.verify((Object)processorSpy2, (VerificationMode)Mockito.atMost((int)1))).process((CoreEvent)Matchers.any(CoreEvent.class));
            ((Processor)Mockito.verify((Object)processorSpy3, (VerificationMode)Mockito.atMost((int)1))).process((CoreEvent)Matchers.any(CoreEvent.class));
        }));
    }

    @Test
    @Description(value="After successful completion of all routes the variables from each route are merged into the result.")
    public void flowVarsMerged() throws Throwable {
        String beforeVarName = "before";
        String beforeVarValue = "beforeValue";
        String beforeVar2Name = "before2";
        String beforeVar2Value = "before2Value";
        String beforeVar2NewValue = "before2NewValue";
        String fooVarName = "foo";
        String fooVarValue = "fooValue1";
        String fooVar2Name = "foo2";
        String fooVar2Value1 = "foo2Value1";
        String fooVar2Value2 = "foo2Value2";
        String fooVar3Name = "foo3";
        String fooVar3Value1 = "foo3Value1";
        Apple fooVar3Value2 = new Apple();
        CoreEvent original = CoreEvent.builder((CoreEvent)this.newEvent()).addVariable("before", (Object)"beforeValue").addVariable("before2", (Object)"before2Value").build();
        ForkJoinStrategy.RoutingPair pair1 = ForkJoinStrategy.RoutingPair.of((CoreEvent)original, (MessageProcessorChain)this.createChain(event -> CoreEvent.builder((CoreEvent)event).addVariable("before2", (Object)"before2NewValue").addVariable("foo", (Object)"fooValue1").addVariable("foo2", (Object)"foo2Value1").addVariable("foo3", (Object)"foo3Value1").build()));
        ForkJoinStrategy.RoutingPair pair2 = ForkJoinStrategy.RoutingPair.of((CoreEvent)original, (MessageProcessorChain)this.createChain(event -> CoreEvent.builder((CoreEvent)event).addVariable("foo2", (Object)"foo2Value2").addVariable("foo3", (Object)fooVar3Value2).build()));
        CoreEvent result = this.invokeStrategyBlocking(this.strategy, original, Arrays.asList(pair1, pair2));
        Assert.assertThat(result.getVariables().keySet(), (Matcher)org.hamcrest.Matchers.hasSize((int)5));
        Assert.assertThat(result.getVariables().keySet(), (Matcher)org.hamcrest.Matchers.hasItems((Object[])new String[]{"before", "before2", "foo", "foo", "foo2", "foo3"}));
        Assert.assertThat((Object)((TypedValue)result.getVariables().get("before")).getValue(), (Matcher)CoreMatchers.equalTo((Object)"beforeValue"));
        Assert.assertThat((Object)((TypedValue)result.getVariables().get("before2")).getValue(), (Matcher)CoreMatchers.equalTo((Object)"before2NewValue"));
        Assert.assertThat((Object)((TypedValue)result.getVariables().get("foo")).getValue(), (Matcher)CoreMatchers.equalTo((Object)"fooValue1"));
        TypedValue fooVar2 = (TypedValue)result.getVariables().get("foo2");
        Assert.assertThat((Object)fooVar2.getDataType(), (Matcher)CoreMatchers.equalTo((Object)DataType.builder().collectionType(List.class).itemType(String.class).build()));
        Assert.assertThat((Object)((List)fooVar2.getValue()), (Matcher)org.hamcrest.Matchers.hasItems((Object[])new String[]{"foo2Value1", "foo2Value2"}));
        TypedValue fooVar3 = (TypedValue)result.getVariables().get("foo3");
        Assert.assertThat((Object)fooVar3.getDataType(), (Matcher)CoreMatchers.equalTo((Object)DataType.builder().collectionType(List.class).itemType(Object.class).build()));
        Assert.assertThat((Object)((List)fooVar3.getValue()), (Matcher)org.hamcrest.Matchers.hasItems((Object[])new Object[]{"foo3Value1", fooVar3Value2}));
    }

    @Test
    @Description(value="When the strategy uses a processing strategy that supports concurrent execution the total processing time is less that sequential processing.")
    public void concurrent() throws Throwable {
        int pairs = 10;
        int processorSleep = 50;
        this.invokeStrategyBlocking(this.strategy, this.testEvent(), this.createRoutingPairs(pairs, processorSleep));
        ((Scheduler)Mockito.verify((Object)this.scheduler, (VerificationMode)Mockito.times((int)pairs))).submit((Callable)Matchers.any(Callable.class));
    }

    @Test
    @Description(value="When executing concurrently the strategy will throw a RejectedExceptionException if the scheduler being used throws a RejectedExceptionException.")
    public void concurrentRejectedExecution() throws Throwable {
        Mockito.when((Object)this.scheduler.submit((Callable)Matchers.any(Callable.class))).thenThrow(new Throwable[]{new RejectedExecutionException()});
        this.setupConcurrentProcessingStrategy();
        this.strategy = this.createStrategy(this.processingStrategy, 4, true, Integer.MAX_VALUE);
        this.expectedException.expect(MessagingException.class);
        this.expectedException.expectCause(CoreMatchers.instanceOf(RejectedExecutionException.class));
        this.invokeStrategyBlocking(this.strategy, this.testEvent(), this.createRoutingPairs(1));
    }

    @Test
    @Description(value="When concurrency is limited to '1' routes execute sequentially and the total processing time is the sum of processing each route, regardless of if the processing strategy supports concurrency.")
    public void sequential() throws Throwable {
        this.setupConcurrentProcessingStrategy();
        this.strategy = this.createStrategy(this.processingStrategy, 1, true, Integer.MAX_VALUE);
        int pairs = 10;
        int processorSleep = 50;
        this.invokeStrategyBlocking(this.strategy, this.testEvent(), this.createRoutingPairs(pairs, processorSleep));
        ((Scheduler)Mockito.verify((Object)this.scheduler, (VerificationMode)Mockito.never())).submit((Runnable)Matchers.any(Runnable.class));
    }

    private void setupConcurrentProcessingStrategy() {
        Function<ReactiveProcessor, ReactiveProcessor> scheduleFunction = processor -> publisher -> Mono.from((Publisher)publisher).publishOn(Schedulers.fromExecutorService((ExecutorService)this.scheduler)).transform((Function)processor);
        Mockito.when((Object)this.processingStrategy.onPipeline((ReactiveProcessor)Matchers.any(ReactiveProcessor.class))).thenAnswer(invocation -> (ReactiveProcessor)scheduleFunction.apply((ReactiveProcessor)invocation.getArgumentAt(0, ReactiveProcessor.class)));
    }

    private CompositeRoutingException assertCompositeRoutingException(Throwable throwable, int errors) {
        Assert.assertThat((Object)throwable, (Matcher)CoreMatchers.instanceOf(CompositeRoutingException.class));
        CompositeRoutingException compositeRoutingException = (CompositeRoutingException)throwable;
        Assert.assertThat((Object)compositeRoutingException.getErrors().size(), (Matcher)CoreMatchers.is((Object)errors));
        return compositeRoutingException;
    }

    private RoutingResult assertRoutingResult(CompositeRoutingException compositeRoutingException, int results, int errors) {
        Assert.assertThat((Object)compositeRoutingException.getErrorMessage().getPayload().getValue(), (Matcher)CoreMatchers.instanceOf(RoutingResult.class));
        RoutingResult routingResult = (RoutingResult)compositeRoutingException.getErrorMessage().getPayload().getValue();
        Assert.assertThat((Object)routingResult.getResults().size(), (Matcher)CoreMatchers.is((Object)results));
        Assert.assertThat((Object)routingResult.getFailures().size(), (Matcher)CoreMatchers.is((Object)errors));
        return routingResult;
    }

    protected CoreEvent invokeStrategyBlocking(ForkJoinStrategy strategy, CoreEvent original, List<ForkJoinStrategy.RoutingPair> routingPairs) throws Throwable {
        return this.invokeStrategyBlocking(strategy, original, routingPairs, (CheckedConsumer<Throwable>)((CheckedConsumer)throwable -> {}));
    }

    protected CoreEvent invokeStrategyBlocking(ForkJoinStrategy strategy, CoreEvent original, List<ForkJoinStrategy.RoutingPair> routingPairs, CheckedConsumer<Throwable> verifyOnError) throws Throwable {
        try {
            return (CoreEvent)Mono.from((Publisher)strategy.forkJoin(original, (Publisher)Flux.fromIterable(routingPairs))).block();
        }
        catch (Throwable throwable2) {
            MuleException throwable2 = Exceptions.rxExceptionToMuleException((Throwable)throwable2);
            verifyOnError.accept((Object)throwable2);
            throw throwable2;
        }
    }

    private MessageProcessorChain createFailingRoutingPair(RuntimeException exception) throws MuleException {
        return this.createChain(event -> {
            throw exception;
        });
    }

    protected Processor createProcessorSpy(final Message result) throws MuleException {
        return (Processor)Mockito.spy((Object)new InternalTestProcessor(){

            public CoreEvent process(CoreEvent event) throws MuleException {
                return CoreEvent.builder((CoreEvent)event).message(result).build();
            }
        });
    }

    protected ForkJoinStrategy.RoutingPair createRoutingPair(Processor processor) throws MuleException {
        return ForkJoinStrategy.RoutingPair.of((CoreEvent)this.testEvent(), (MessageProcessorChain)this.createChain(processor));
    }

    protected ForkJoinStrategy.RoutingPair createRoutingPair(Message result) throws MuleException {
        return this.createRoutingPairWithSleep(result, 0L);
    }

    private ForkJoinStrategy.RoutingPair createRoutingPairWithSleep(Message result, long sleep) throws MuleException {
        return ForkJoinStrategy.RoutingPair.of((CoreEvent)this.testEvent(), (MessageProcessorChain)this.createChain(new SleepingProcessor(result, sleep)));
    }

    private List<ForkJoinStrategy.RoutingPair> createRoutingPairs(int number) {
        return this.createRoutingPairs(number, 0);
    }

    private List<ForkJoinStrategy.RoutingPair> createRoutingPairs(int number, int sleep) {
        return IntStream.range(0, number).mapToObj(i -> {
            try {
                return this.createRoutingPairWithSleep(Message.of((Object)1), sleep);
            }
            catch (MuleException e) {
                throw new RuntimeException(e);
            }
        }).collect(Collectors.toList());
    }

    private MessageProcessorChain createChain(Processor processor) throws MuleException {
        MessageProcessorChain chain = MessageProcessors.newChain(Optional.empty(), (Processor[])new Processor[]{processor});
        chain.setMuleContext(muleContext);
        return chain;
    }

    @FunctionalInterface
    private static interface InternalTestProcessor
    extends Processor,
    InternalProcessor {
    }

    static class SleepingProcessor
    extends AbstractComponent
    implements Processor {
        long sleep;
        Message result;

        public SleepingProcessor(Message result, long sleep) {
            this.result = result;
            this.sleep = sleep;
        }

        public CoreEvent process(CoreEvent event) throws MuleException {
            try {
                Thread.sleep(this.sleep);
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return CoreEvent.builder((CoreEvent)event).message(this.result).build();
        }
    }
}

