/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.processor.strategy;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.hamcrest.TypeSafeMatcher;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mule.runtime.api.component.AbstractComponent;
import org.mule.runtime.api.component.location.ComponentLocation;
import org.mule.runtime.api.event.EventContext;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.message.Error;
import org.mule.runtime.api.notification.IntegerAction;
import org.mule.runtime.api.notification.MessageProcessorNotification;
import org.mule.runtime.api.notification.MessageProcessorNotificationListener;
import org.mule.runtime.api.notification.NotificationListener;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.scheduler.SchedulerService;
import org.mule.runtime.api.util.concurrent.Latch;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.construct.Flow;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.event.EventContextFactory;
import org.mule.runtime.core.api.lifecycle.LifecycleUtils;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.api.source.MessageSource;
import org.mule.runtime.core.api.util.concurrent.NamedThreadFactory;
import org.mule.runtime.core.internal.construct.FlowBackPressureException;
import org.mule.runtime.core.internal.context.MuleContextWithRegistries;
import org.mule.runtime.core.internal.exception.MessagingException;
import org.mule.runtime.core.internal.message.InternalEvent;
import org.mule.runtime.core.internal.util.rx.Operators;
import org.mule.runtime.core.privileged.event.BaseEventContext;
import org.mule.runtime.core.privileged.processor.AnnotatedProcessor;
import org.mule.runtime.core.privileged.processor.InternalProcessor;
import org.mule.runtime.core.privileged.registry.RegistrationException;
import org.mule.tck.TriggerableMessageSource;
import org.mule.tck.junit4.AbstractMuleContextTestCase;
import org.mule.tck.junit4.AbstractMuleTestCase;
import org.mule.tck.probe.JUnitLambdaProbe;
import org.mule.tck.probe.PollingProber;
import org.mule.tck.probe.Probe;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

@RunWith(value=Parameterized.class)
public abstract class AbstractProcessingStrategyTestCase
extends AbstractMuleContextTestCase {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractProcessingStrategyTestCase.class);
    private static final int CONCURRENT_TEST_CONCURRENCY = 8;
    protected Mode mode;
    protected static final String CPU_LIGHT = "cpuLight";
    protected static final String IO = "I/O";
    protected static final String CPU_INTENSIVE = "cpuIntensive";
    protected static final String CUSTOM = "custom";
    protected static final String RING_BUFFER = "ringBuffer";
    protected static final int STREAM_ITERATIONS = 2000;
    protected Supplier<Flow.Builder> flowBuilder;
    protected Flow flow;
    protected Set<String> threads = Collections.synchronizedSet(new HashSet());
    private TriggerableMessageSource triggerableMessageSource = this.getTriggerableMessageSource();
    protected Processor cpuLightProcessor = new ThreadTrackingProcessor(){

        public ReactiveProcessor.ProcessingType getProcessingType() {
            return ReactiveProcessor.ProcessingType.CPU_LITE;
        }
    };
    protected Processor cpuIntensiveProcessor = new ThreadTrackingProcessor(){

        public ReactiveProcessor.ProcessingType getProcessingType() {
            return ReactiveProcessor.ProcessingType.CPU_INTENSIVE;
        }
    };
    protected Processor blockingProcessor = new ThreadTrackingProcessor(){

        public ReactiveProcessor.ProcessingType getProcessingType() {
            return ReactiveProcessor.ProcessingType.BLOCKING;
        }
    };
    protected Processor asyncProcessor = new ThreadTrackingProcessor(){

        public ReactiveProcessor.ProcessingType getProcessingType() {
            return ReactiveProcessor.ProcessingType.CPU_LITE_ASYNC;
        }
    };
    protected Processor annotatedAsyncProcessor = new AnnotatedAsyncProcessor();
    protected Processor failingProcessor = new ThreadTrackingProcessor(){

        @Override
        public CoreEvent process(CoreEvent event) {
            throw new RuntimeException("FAILURE");
        }
    };
    protected Processor errorSuccessProcessor = new ThreadTrackingProcessor(){
        private AtomicInteger count = new AtomicInteger();

        @Override
        public CoreEvent process(CoreEvent event) throws MuleException {
            if (this.count.getAndIncrement() % 10 < 5) {
                return super.process(event);
            }
            return AbstractProcessingStrategyTestCase.this.failingProcessor.process(event);
        }
    };
    protected Processor ioRWProcessor = new ThreadTrackingProcessor(){

        public ReactiveProcessor.ProcessingType getProcessingType() {
            return ReactiveProcessor.ProcessingType.IO_RW;
        }
    };
    protected Scheduler cpuLight;
    protected Scheduler blocking;
    protected Scheduler cpuIntensive;
    protected Scheduler custom;
    protected Scheduler ringBuffer;
    protected Scheduler asyncExecutor;
    protected ExecutorService cachedThreadPool = Executors.newFixedThreadPool(4);
    @Rule
    public ExpectedException expectedException = ExpectedException.none();

    public AbstractProcessingStrategyTestCase(Mode mode) {
        this.mode = mode;
    }

    @Parameterized.Parameters
    public static Collection<Mode> modeParameters() {
        return Arrays.asList(Mode.FLOW, Mode.SOURCE);
    }

    @Before
    public void before() throws RegistrationException {
        this.cpuLight = new TestScheduler(2, CPU_LIGHT, false);
        this.blocking = new TestScheduler(4, IO, true);
        this.cpuIntensive = new TestScheduler(2, CPU_INTENSIVE, true);
        this.custom = new TestScheduler(4, CUSTOM, true);
        this.ringBuffer = new TestScheduler(1, RING_BUFFER, true);
        this.asyncExecutor = ((SchedulerService)((MuleContextWithRegistries)muleContext).getRegistry().lookupObject(SchedulerService.class)).ioScheduler();
        this.flowBuilder = () -> Flow.builder((String)"test", (MuleContext)muleContext).processingStrategyFactory((muleContext, prefix) -> this.createProcessingStrategy(muleContext, prefix)).source((MessageSource)this.triggerableMessageSource).messagingExceptionHandler((exception, event) -> event);
    }

    protected InternalEvent.Builder getEventBuilder() throws MuleException {
        return InternalEvent.builder((EventContext)EventContextFactory.create((FlowConstruct)this.flow, (ComponentLocation)TEST_CONNECTOR_LOCATION));
    }

    protected abstract ProcessingStrategy createProcessingStrategy(MuleContext var1, String var2);

    @After
    public void after() throws MuleException {
        if (this.flow != null) {
            this.flow.stop();
            this.flow.dispose();
        }
        this.ringBuffer.stop();
        this.cpuLight.stop();
        this.blocking.stop();
        this.cpuIntensive.stop();
        this.custom.stop();
        this.asyncExecutor.stop();
        this.cachedThreadPool.shutdownNow();
    }

    @Test
    public void singleCpuLight() throws Exception {
        this.flow = this.flowBuilder.get().processors(new Processor[]{this.cpuLightProcessor}).build();
        this.flow.initialise();
        this.flow.start();
        this.processFlow(this.testEvent());
    }

    @Test
    public void singleCpuLightConcurrent() throws Exception {
        this.internalConcurrent(this.flowBuilder.get(), false, ReactiveProcessor.ProcessingType.CPU_LITE, 1, new Processor[0]);
    }

    @Test
    public void singleBlockingConcurrent() throws Exception {
        this.internalConcurrent(this.flowBuilder.get(), false, ReactiveProcessor.ProcessingType.BLOCKING, 1, new Processor[0]);
    }

    protected void internalConcurrent(Flow.Builder flowBuilder, boolean blocks, ReactiveProcessor.ProcessingType processingType, int invocations, Processor ... processorsBeforeLatch) throws MuleException, InterruptedException {
        MultipleInvocationLatchedProcessor latchedProcessor = new MultipleInvocationLatchedProcessor(processingType, invocations);
        ArrayList<Processor> processors = new ArrayList<Processor>(Arrays.asList(processorsBeforeLatch));
        processors.add(latchedProcessor);
        this.flow = flowBuilder.processors(processors).build();
        this.flow.initialise();
        this.flow.start();
        for (int i = 0; i < invocations; ++i) {
            this.asyncExecutor.submit(() -> this.processFlow(this.newEvent()));
        }
        latchedProcessor.getAllLatchedLatch().await();
        this.asyncExecutor.submit(() -> this.processFlow(this.newEvent()));
        Assert.assertThat((Object)latchedProcessor.getUnlatchedInvocationLatch().await(500L, TimeUnit.MILLISECONDS), (Matcher)CoreMatchers.is((Object)(!blocks ? 1 : 0)));
        HashSet<String> threadsBeforeUnlock = new HashSet<String>(this.threads);
        latchedProcessor.release();
        if (blocks) {
            Assert.assertThat((Object)latchedProcessor.getUnlatchedInvocationLatch().await(5000L, TimeUnit.MILLISECONDS), (Matcher)CoreMatchers.is((Object)true));
        }
        this.threads = threadsBeforeUnlock;
    }

    @Test
    public void multipleCpuLight() throws Exception {
        this.flow = this.flowBuilder.get().processors(new Processor[]{this.cpuLightProcessor, this.cpuLightProcessor, this.cpuLightProcessor}).build();
        this.flow.initialise();
        this.flow.start();
        this.processFlow(this.testEvent());
    }

    @Test
    public void singleBlocking() throws Exception {
        this.flow = this.flowBuilder.get().processors(new Processor[]{this.blockingProcessor}).build();
        this.flow.initialise();
        this.flow.start();
        this.processFlow(this.testEvent());
    }

    @Test
    public void multipleBlocking() throws Exception {
        this.flow = this.flowBuilder.get().processors(new Processor[]{this.blockingProcessor, this.blockingProcessor, this.blockingProcessor}).build();
        this.flow.initialise();
        this.flow.start();
        this.processFlow(this.testEvent());
    }

    @Test
    public void singleCpuIntensive() throws Exception {
        this.flow = this.flowBuilder.get().processors(new Processor[]{this.cpuIntensiveProcessor}).build();
        this.flow.initialise();
        this.flow.start();
        this.processFlow(this.testEvent());
    }

    @Test
    public void multipleCpuIntensive() throws Exception {
        this.flow = this.flowBuilder.get().processors(new Processor[]{this.cpuIntensiveProcessor, this.cpuIntensiveProcessor, this.cpuIntensiveProcessor}).build();
        this.flow.initialise();
        this.flow.start();
        this.processFlow(this.testEvent());
    }

    @Test
    public void mix() throws Exception {
        this.flow = this.flowBuilder.get().processors(new Processor[]{this.cpuLightProcessor, this.cpuIntensiveProcessor, this.blockingProcessor}).build();
        this.flow.initialise();
        this.flow.start();
        this.processFlow(this.testEvent());
    }

    @Test
    public void mix2() throws Exception {
        this.flow = this.flowBuilder.get().processors(new Processor[]{this.cpuLightProcessor, this.cpuLightProcessor, this.blockingProcessor, this.blockingProcessor, this.cpuLightProcessor, this.cpuIntensiveProcessor, this.cpuIntensiveProcessor, this.cpuLightProcessor}).build();
        this.flow.initialise();
        this.flow.start();
        this.processFlow(this.testEvent());
    }

    @Test
    public void asyncCpuLight() throws Exception {
        this.flow = this.flowBuilder.get().processors(new Processor[]{this.asyncProcessor, this.cpuLightProcessor}).build();
        this.flow.initialise();
        this.flow.start();
        this.processFlow(this.testEvent());
    }

    @Test
    public void asyncCpuLightConcurrent() throws Exception {
        this.internalConcurrent(this.flowBuilder.get(), false, ReactiveProcessor.ProcessingType.CPU_LITE, 1, this.asyncProcessor);
    }

    @Test
    public void stream() throws Exception {
        this.flow = this.flowBuilder.get().processors(new Processor[]{this.cpuLightProcessor}).build();
        this.flow.initialise();
        this.flow.start();
        CountDownLatch latch = new CountDownLatch(2000);
        for (int i = 0; i < 2000; ++i) {
            this.dispatchFlow(this.newEvent(), t -> latch.countDown(), response -> Exceptions.bubble((Throwable)((Object)new AssertionError((Object)"Unexpected error"))));
        }
        Assert.assertThat((Object)latch.await(5000L, TimeUnit.MILLISECONDS), (Matcher)CoreMatchers.is((Object)true));
    }

    @Test
    public void concurrentStream() throws Exception {
        this.flow = this.flowBuilder.get().processors(new Processor[]{this.cpuLightProcessor}).build();
        this.flow.initialise();
        this.flow.start();
        CountDownLatch latch = new CountDownLatch(2000);
        for (int i = 0; i < 8; ++i) {
            this.asyncExecutor.submit(() -> {
                for (int j = 0; j < 250; ++j) {
                    try {
                        this.dispatchFlow(this.newEvent(), t -> latch.countDown(), response -> Exceptions.bubble((Throwable)((Object)new AssertionError((Object)"Unexpected error"))));
                        continue;
                    }
                    catch (MuleException e) {
                        throw new RuntimeException(e);
                    }
                }
            });
        }
        Assert.assertThat((Object)latch.await(5000L, TimeUnit.MILLISECONDS), (Matcher)CoreMatchers.is((Object)true));
    }

    @Test
    public void errorsStream() throws Exception {
        this.flow = this.flowBuilder.get().processors(new Processor[]{this.failingProcessor}).build();
        this.flow.initialise();
        this.flow.start();
        CountDownLatch latch = new CountDownLatch(2000);
        for (int i = 0; i < 2000; ++i) {
            this.dispatchFlow(this.newEvent(), response -> Exceptions.bubble((Throwable)((Object)new AssertionError((Object)"Unexpected success"))), t -> latch.countDown());
        }
        Assert.assertThat((Object)latch.await(5000L, TimeUnit.MILLISECONDS), (Matcher)CoreMatchers.is((Object)true));
    }

    @Test
    public void errorSuccessStream() throws Exception {
        this.flow = this.flowBuilder.get().processors(new Processor[]{this.errorSuccessProcessor}).build();
        this.flow.initialise();
        this.flow.start();
        CountDownLatch sucessLatch = new CountDownLatch(1000);
        CountDownLatch errorLatch = new CountDownLatch(1000);
        for (int i = 0; i < 2000; ++i) {
            this.dispatchFlow(this.newEvent(), response -> sucessLatch.countDown(), t -> errorLatch.countDown());
        }
        Assert.assertThat((Object)sucessLatch.await(5000L, TimeUnit.MILLISECONDS), (Matcher)CoreMatchers.is((Object)true));
        Assert.assertThat((Object)errorLatch.await(5000L, TimeUnit.MILLISECONDS), (Matcher)CoreMatchers.is((Object)true));
    }

    @Test
    public abstract void tx() throws Exception;

    protected void singleIORW(Callable<CoreEvent> eventSupplier) throws Exception {
        this.flow = this.flowBuilder.get().processors(new Processor[]{this.ioRWProcessor}).build();
        this.flow.initialise();
        this.flow.start();
        this.processFlow(eventSupplier.call());
    }

    protected CoreEvent processFlow(CoreEvent event) throws Exception {
        LifecycleUtils.setMuleContextIfNeeded((Object)this.flow, (MuleContext)muleContext);
        switch (this.mode) {
            case FLOW: {
                return this.flow.process(event);
            }
            case SOURCE: {
                Mono.just((Object)event).transform((Function)this.triggerableMessageSource.getListener()).subscribe(Operators.requestUnbounded());
                try {
                    return (CoreEvent)Mono.from((Publisher)((BaseEventContext)event.getContext()).getResponsePublisher()).block();
                }
                catch (Throwable throwable) {
                    throw org.mule.runtime.core.api.rx.Exceptions.rxExceptionToMuleException((Throwable)throwable);
                }
            }
        }
        return null;
    }

    protected void dispatchFlow(CoreEvent event, Consumer<CoreEvent> onSuccess, Consumer<Throwable> onError) {
        LifecycleUtils.setMuleContextIfNeeded((Object)this.flow, (MuleContext)muleContext);
        switch (this.mode) {
            case FLOW: {
                ((BaseEventContext)event.getContext()).onResponse((response, throwable) -> {
                    onSuccess.accept((CoreEvent)response);
                    onError.accept((Throwable)throwable);
                });
                Mono.just((Object)event).transform((Function)this.flow).subscribe(Operators.requestUnbounded());
                break;
            }
            case SOURCE: {
                ((BaseEventContext)event.getContext()).onResponse((response, throwable) -> {
                    onSuccess.accept((CoreEvent)response);
                    onError.accept((Throwable)throwable);
                });
                Mono.just((Object)event).transform((Function)this.triggerableMessageSource.getListener()).subscribe(Operators.requestUnbounded());
            }
        }
    }

    protected void testAsyncCpuLightNotificationThreads(AtomicReference<Thread> beforeThread, AtomicReference<Thread> afterThread) throws Exception {
        muleContext.getNotificationManager().addInterfaceToType(MessageProcessorNotificationListener.class, MessageProcessorNotification.class);
        muleContext.getNotificationManager().addListener((NotificationListener)((MessageProcessorNotificationListener)notification -> {
            if (new IntegerAction(1601).equals((Object)notification.getAction())) {
                beforeThread.set(Thread.currentThread());
            } else if (new IntegerAction(1602).equals((Object)notification.getAction())) {
                afterThread.set(Thread.currentThread());
            }
        }));
        this.flow = this.flowBuilder.get().processors(new Processor[]{this.annotatedAsyncProcessor}).build();
        this.flow.initialise();
        this.flow.start();
        this.processFlow(this.testEvent());
    }

    protected void testBackPressure(MessageSource.BackPressureStrategy backPressureStrategy, Matcher<Integer> processedAssertion, Matcher<Integer> rejectedAssertion, Matcher<Integer> totalAssertion) throws MuleException {
        if (this.mode.equals((Object)Mode.SOURCE)) {
            this.triggerableMessageSource = new TriggerableMessageSource(backPressureStrategy);
            this.flow = this.flowBuilder.get().source((MessageSource)this.triggerableMessageSource).processors(Arrays.asList(this.cpuLightProcessor, new ThreadTrackingProcessor(){

                @Override
                public CoreEvent process(CoreEvent event) throws MuleException {
                    try {
                        Thread.sleep(3L);
                    }
                    catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    return super.process(event);
                }

                public ReactiveProcessor.ProcessingType getProcessingType() {
                    return ReactiveProcessor.ProcessingType.BLOCKING;
                }
            })).maxConcurrency(2).build();
            this.flow.initialise();
            this.flow.start();
            AtomicInteger rejected = new AtomicInteger();
            AtomicInteger processed = new AtomicInteger();
            for (int i = 0; i < 2000; ++i) {
                this.cachedThreadPool.submit(() -> Flux.just((Object)this.newEvent()).cast(CoreEvent.class).transform((Function)this.triggerableMessageSource.getListener()).doOnNext(event -> processed.getAndIncrement()).doOnError(e -> rejected.getAndIncrement()).subscribe());
            }
            new PollingProber(10000L, 100L).check((Probe)new JUnitLambdaProbe(() -> {
                LOGGER.info("DONE " + processed.get() + " , REJECTED " + rejected.get() + ", ");
                return totalAssertion.matches((Object)(rejected.get() + processed.get())) && processedAssertion.matches((Object)processed.get()) && rejectedAssertion.matches((Object)rejected.get());
            }));
        }
    }

    public static Matcher<Integer> between(int min, int max) {
        return CoreMatchers.allOf((Matcher)Matchers.greaterThanOrEqualTo((Comparable)Integer.valueOf(min)), (Matcher)Matchers.lessThanOrEqualTo((Comparable)Integer.valueOf(max)));
    }

    public static Matcher<Long> between(long min, long max) {
        return CoreMatchers.allOf((Matcher)Matchers.greaterThanOrEqualTo((Comparable)Long.valueOf(min)), (Matcher)Matchers.lessThanOrEqualTo((Comparable)Long.valueOf(max)));
    }

    protected void expectRejected() {
        this.expectedException.expect(MessagingException.class);
        this.expectedException.expect(this.overloadErrorTypeMatcher());
        this.expectedException.expectCause(CoreMatchers.instanceOf(FlowBackPressureException.class));
    }

    private TypeSafeMatcher<MessagingException> overloadErrorTypeMatcher() {
        return new TypeSafeMatcher<MessagingException>(){
            private String errorTypeId;

            public void describeTo(Description description) {
                description.appendValue((Object)this.errorTypeId);
            }

            protected boolean matchesSafely(MessagingException item) {
                this.errorTypeId = ((Error)item.getEvent().getError().get()).getErrorType().getIdentifier();
                return "FLOW_BACK_PRESSURE".equals(this.errorTypeId);
            }
        };
    }

    public static enum Mode {
        FLOW,
        SOURCE;

    }

    class AnnotatedAsyncProcessor
    extends AbstractComponent
    implements AnnotatedProcessor {
        AnnotatedAsyncProcessor() {
        }

        public CoreEvent process(CoreEvent event) throws MuleException {
            return AbstractProcessingStrategyTestCase.this.asyncProcessor.process(event);
        }

        public Publisher<CoreEvent> apply(Publisher<CoreEvent> publisher) {
            return AbstractProcessingStrategyTestCase.this.asyncProcessor.apply(publisher);
        }

        public ComponentLocation getLocation() {
            return AbstractMuleTestCase.TEST_CONNECTOR_LOCATION;
        }

        public ReactiveProcessor.ProcessingType getProcessingType() {
            return AbstractProcessingStrategyTestCase.this.asyncProcessor.getProcessingType();
        }
    }

    class ThreadTrackingProcessor
    implements Processor,
    InternalProcessor {
        ThreadTrackingProcessor() {
        }

        public CoreEvent process(CoreEvent event) throws MuleException {
            AbstractProcessingStrategyTestCase.this.threads.add(Thread.currentThread().getName());
            return event;
        }

        public Publisher<CoreEvent> apply(Publisher<CoreEvent> publisher) {
            if (this.getProcessingType() == ReactiveProcessor.ProcessingType.CPU_LITE_ASYNC) {
                return Flux.from(publisher).transform(processorPublisher -> super.apply(publisher)).publishOn(Schedulers.fromExecutorService((ExecutorService)AbstractProcessingStrategyTestCase.this.custom)).errorStrategyStop();
            }
            return super.apply(publisher);
        }
    }

    static class RejectingScheduler
    extends TestScheduler {
        static int REJECTION_COUNT = 10;
        private int rejections;
        private Scheduler delegate;

        public RejectingScheduler(Scheduler delegate) {
            super(1, "prefix", true);
            this.delegate = delegate;
        }

        @Override
        public Future<?> submit(Runnable task) {
            if (this.rejections++ < REJECTION_COUNT) {
                throw new RejectedExecutionException();
            }
            return this.delegate.submit(task);
        }

        @Override
        public Future<?> submit(Callable task) {
            if (this.rejections++ < REJECTION_COUNT) {
                throw new RejectedExecutionException();
            }
            return this.delegate.submit(task);
        }
    }

    static class TestScheduler
    extends ScheduledThreadPoolExecutor
    implements Scheduler {
        private ExecutorService executor;

        public TestScheduler(int threads, String threadNamePrefix, boolean reject) {
            super(1, (ThreadFactory)new NamedThreadFactory(threadNamePrefix + ".tasks"));
            this.executor = new ThreadPoolExecutor(threads, threads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(reject ? threads : Integer.MAX_VALUE), (ThreadFactory)new NamedThreadFactory(threadNamePrefix));
        }

        @Override
        public Future<?> submit(Runnable task) {
            return this.executor.submit(task);
        }

        public Future<?> submit(Callable task) {
            return this.executor.submit(task);
        }

        public void stop() {
            this.shutdown();
            this.executor.shutdown();
        }

        public ScheduledFuture<?> scheduleWithCronExpression(Runnable command, String cronExpression) {
            throw new UnsupportedOperationException("Cron expression scheduling is not supported in unit tests. You need the productive service implementation.");
        }

        public ScheduledFuture<?> scheduleWithCronExpression(Runnable command, String cronExpression, TimeZone timeZone) {
            throw new UnsupportedOperationException("Cron expression scheduling is not supported in unit tests. You need the productive service implementation.");
        }

        public String getName() {
            return TestScheduler.class.getSimpleName();
        }
    }

    class MultipleInvocationLatchedProcessor
    extends AbstractComponent
    implements Processor {
        private ReactiveProcessor.ProcessingType type;
        private volatile Latch latch = new Latch();
        private volatile CountDownLatch allLatchedLatch;
        private volatile Latch unlatchedInvocationLatch;
        private AtomicInteger invocations;

        public MultipleInvocationLatchedProcessor(ReactiveProcessor.ProcessingType type, int latchedInvocations) {
            this.type = type;
            this.allLatchedLatch = new CountDownLatch(latchedInvocations);
            this.unlatchedInvocationLatch = new Latch();
            this.invocations = new AtomicInteger(latchedInvocations);
        }

        public CoreEvent process(CoreEvent event) throws MuleException {
            AbstractProcessingStrategyTestCase.this.threads.add(Thread.currentThread().getName());
            if (this.invocations.getAndDecrement() > 0) {
                this.allLatchedLatch.countDown();
                try {
                    this.latch.await();
                }
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            } else {
                this.unlatchedInvocationLatch.countDown();
            }
            return event;
        }

        public ReactiveProcessor.ProcessingType getProcessingType() {
            return this.type;
        }

        public void release() {
            this.latch.release();
        }

        public CountDownLatch getAllLatchedLatch() throws InterruptedException {
            return this.allLatchedLatch;
        }

        public Latch getUnlatchedInvocationLatch() throws InterruptedException {
            return this.unlatchedInvocationLatch;
        }
    }
}

