/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.processor.strategy;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.hamcrest.TypeSafeMatcher;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mule.runtime.api.component.location.ComponentLocation;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.message.Error;
import org.mule.runtime.api.meta.AbstractAnnotatedObject;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.core.api.Event;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.construct.Flow;
import org.mule.runtime.core.api.context.notification.MessageProcessorNotificationListener;
import org.mule.runtime.core.api.context.notification.ServerNotificationListener;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.api.registry.RegistrationException;
import org.mule.runtime.core.api.scheduler.SchedulerService;
import org.mule.runtime.core.api.util.concurrent.Latch;
import org.mule.runtime.core.api.util.concurrent.NamedThreadFactory;
import org.mule.runtime.core.context.notification.MessageProcessorNotification;
import org.mule.runtime.core.exception.MessagingException;
import org.mule.runtime.core.internal.util.rx.Operators;
import org.mule.runtime.core.processor.AnnotatedProcessor;
import org.mule.tck.junit4.AbstractMuleTestCase;
import org.mule.tck.junit4.AbstractReactiveProcessorTestCase;
import org.reactivestreams.Publisher;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

public abstract class AbstractProcessingStrategyTestCase
extends AbstractReactiveProcessorTestCase {
    protected static final String CPU_LIGHT = "cpuLight";
    protected static final String IO = "I/O";
    protected static final String CPU_INTENSIVE = "cpuIntensive";
    protected static final String CUSTOM = "custom";
    protected static final String RING_BUFFER = "ringBuffer";
    private static final int STREAM_ITERATIONS = 2000;
    private static final int CONCURRENT_TEST_CONCURRENCY = 8;
    protected Flow flow;
    protected Set<String> threads = Collections.synchronizedSet(new HashSet());
    protected Processor cpuLightProcessor = new ThreadTrackingProcessor(){

        public ReactiveProcessor.ProcessingType getProcessingType() {
            return ReactiveProcessor.ProcessingType.CPU_LITE;
        }
    };
    protected Processor cpuIntensiveProcessor = new ThreadTrackingProcessor(){

        public ReactiveProcessor.ProcessingType getProcessingType() {
            return ReactiveProcessor.ProcessingType.CPU_INTENSIVE;
        }
    };
    protected Processor blockingProcessor = new ThreadTrackingProcessor(){

        public ReactiveProcessor.ProcessingType getProcessingType() {
            return ReactiveProcessor.ProcessingType.BLOCKING;
        }
    };
    protected Processor asyncProcessor = new ThreadTrackingProcessor(){

        public ReactiveProcessor.ProcessingType getProcessingType() {
            return ReactiveProcessor.ProcessingType.CPU_LITE_ASYNC;
        }
    };
    protected Processor annotatedAsyncProcessor = new AnnotatedAsyncProcessor();
    protected Processor failingProcessor = new ThreadTrackingProcessor(){

        @Override
        public Event process(Event event) {
            throw new RuntimeException("FAILURE");
        }
    };
    protected Processor errorSuccessProcessor = new ThreadTrackingProcessor(){
        private AtomicInteger count = new AtomicInteger();

        @Override
        public Event process(Event event) throws MuleException {
            if (this.count.getAndIncrement() % 10 < 5) {
                return super.process(event);
            }
            return AbstractProcessingStrategyTestCase.this.failingProcessor.process(event);
        }
    };
    protected Scheduler cpuLight;
    protected Scheduler blocking;
    protected Scheduler cpuIntensive;
    protected Scheduler custom;
    protected Scheduler ringBuffer;
    protected Scheduler asyncExecutor;
    @Rule
    public ExpectedException expectedException = ExpectedException.none();

    public AbstractProcessingStrategyTestCase(AbstractReactiveProcessorTestCase.Mode mode) {
        super(mode);
    }

    @Before
    public void before() throws RegistrationException {
        this.cpuLight = new TestScheduler(2, CPU_LIGHT);
        this.blocking = new TestScheduler(4, IO);
        this.cpuIntensive = new TestScheduler(2, CPU_INTENSIVE);
        this.custom = new TestScheduler(1, CUSTOM);
        this.ringBuffer = new TestScheduler(1, RING_BUFFER);
        this.asyncExecutor = ((SchedulerService)AbstractProcessingStrategyTestCase.muleContext.getRegistry().lookupObject(SchedulerService.class)).ioScheduler();
        this.flow = Flow.builder((String)"test", (MuleContext)AbstractProcessingStrategyTestCase.muleContext).processingStrategyFactory((muleContext, prefix) -> this.createProcessingStrategy(muleContext, prefix)).messagingExceptionHandler((exception, event) -> event).build();
    }

    protected abstract ProcessingStrategy createProcessingStrategy(MuleContext var1, String var2);

    @After
    public void after() {
        this.flow.dispose();
        this.cpuLight.stop();
        this.blocking.stop();
        this.cpuIntensive.stop();
        this.custom.stop();
        this.asyncExecutor.stop();
    }

    @Test
    public void singleCpuLight() throws Exception {
        this.flow.setMessageProcessors(Collections.singletonList(this.cpuLightProcessor));
        this.flow.initialise();
        this.flow.start();
        this.process((Processor)this.flow, this.testEvent());
    }

    @Test
    public void singleCpuLightConcurrent() throws Exception {
        this.internalConcurrent(false, ReactiveProcessor.ProcessingType.CPU_LITE, 1, new Processor[0]);
    }

    @Test
    public void singleBlockingConcurrent() throws Exception {
        this.internalConcurrent(false, ReactiveProcessor.ProcessingType.BLOCKING, 1, new Processor[0]);
    }

    protected void internalConcurrent(boolean blocks, ReactiveProcessor.ProcessingType processingType, int invocations, Processor ... processorsBeforeLatch) throws MuleException, InterruptedException {
        MultipleInvocationLatchedProcessor latchedProcessor = new MultipleInvocationLatchedProcessor(processingType, invocations);
        ArrayList<Processor> processors = new ArrayList<Processor>(Arrays.asList(processorsBeforeLatch));
        processors.add(latchedProcessor);
        this.flow.setMessageProcessors(processors);
        this.flow.initialise();
        this.flow.start();
        for (int i = 0; i < invocations; ++i) {
            this.asyncExecutor.submit(() -> this.process((Processor)this.flow, this.newEvent()));
        }
        latchedProcessor.getAllLatchedLatch().await();
        this.asyncExecutor.submit(() -> this.process((Processor)this.flow, this.newEvent()));
        Assert.assertThat((Object)latchedProcessor.getUnlatchedInvocationLatch().await(500L, TimeUnit.MILLISECONDS), (Matcher)CoreMatchers.is((Object)(!blocks ? 1 : 0)));
        HashSet<String> threadsBeforeUnlock = new HashSet<String>(this.threads);
        latchedProcessor.release();
        if (blocks) {
            Assert.assertThat((Object)latchedProcessor.getUnlatchedInvocationLatch().await(5000L, TimeUnit.MILLISECONDS), (Matcher)CoreMatchers.is((Object)true));
        }
        this.threads = threadsBeforeUnlock;
    }

    @Test
    public void multipleCpuLight() throws Exception {
        this.flow.setMessageProcessors(Arrays.asList(this.cpuLightProcessor, this.cpuLightProcessor, this.cpuLightProcessor));
        this.flow.initialise();
        this.flow.start();
        this.process((Processor)this.flow, this.testEvent());
    }

    @Test
    public void singleBlocking() throws Exception {
        this.flow.setMessageProcessors(Collections.singletonList(this.blockingProcessor));
        this.flow.initialise();
        this.flow.start();
        this.process((Processor)this.flow, this.testEvent());
    }

    @Test
    public void multipleBlocking() throws Exception {
        this.flow.setMessageProcessors(Arrays.asList(this.blockingProcessor, this.blockingProcessor, this.blockingProcessor));
        this.flow.initialise();
        this.flow.start();
        this.process((Processor)this.flow, this.testEvent());
    }

    @Test
    public void singleCpuIntensive() throws Exception {
        this.flow.setMessageProcessors(Collections.singletonList(this.cpuIntensiveProcessor));
        this.flow.initialise();
        this.flow.start();
        this.process((Processor)this.flow, this.testEvent());
    }

    @Test
    public void multipleCpuIntensive() throws Exception {
        this.flow.setMessageProcessors(Arrays.asList(this.cpuIntensiveProcessor, this.cpuIntensiveProcessor, this.cpuIntensiveProcessor));
        this.flow.initialise();
        this.flow.start();
        this.process((Processor)this.flow, this.testEvent());
    }

    @Test
    public void mix() throws Exception {
        this.flow.setMessageProcessors(Arrays.asList(this.cpuLightProcessor, this.cpuIntensiveProcessor, this.blockingProcessor));
        this.flow.initialise();
        this.flow.start();
        this.process((Processor)this.flow, this.testEvent());
    }

    @Test
    public void mix2() throws Exception {
        this.flow.setMessageProcessors(Arrays.asList(this.cpuLightProcessor, this.cpuLightProcessor, this.blockingProcessor, this.blockingProcessor, this.cpuLightProcessor, this.cpuIntensiveProcessor, this.cpuIntensiveProcessor, this.cpuLightProcessor));
        this.flow.initialise();
        this.flow.start();
        this.process((Processor)this.flow, this.testEvent());
    }

    @Test
    public void asyncCpuLight() throws Exception {
        this.flow.setMessageProcessors(Arrays.asList(this.asyncProcessor, this.cpuLightProcessor));
        this.flow.initialise();
        this.flow.start();
        this.process((Processor)this.flow, this.testEvent());
    }

    @Test
    public void asyncCpuLightConcurrent() throws Exception {
        this.internalConcurrent(false, ReactiveProcessor.ProcessingType.CPU_LITE, 1, this.asyncProcessor);
    }

    @Test
    public void stream() throws Exception {
        this.flow.setMessageProcessors(Arrays.asList(this.cpuLightProcessor));
        this.flow.initialise();
        this.flow.start();
        CountDownLatch latch = new CountDownLatch(2000);
        block4: for (int i = 0; i < 2000; ++i) {
            switch (this.mode) {
                case BLOCKING: {
                    this.flow.process(this.newEvent());
                    latch.countDown();
                    continue block4;
                }
                case NON_BLOCKING: {
                    this.processNonBlocking(this.flow, this.newEvent(), t -> latch.countDown(), response -> Exceptions.bubble((Throwable)((Object)new AssertionError((Object)"Unexpected error"))));
                }
            }
        }
        Assert.assertThat((Object)latch.await(5000L, TimeUnit.MILLISECONDS), (Matcher)CoreMatchers.is((Object)true));
    }

    @Test
    public void concurrentStream() throws Exception {
        this.flow.setMessageProcessors(Arrays.asList(this.cpuLightProcessor));
        this.flow.initialise();
        this.flow.start();
        CountDownLatch latch = new CountDownLatch(2000);
        for (int i = 0; i < 8; ++i) {
            this.asyncExecutor.submit(() -> {
                for (int j = 0; j < 250; ++j) {
                    try {
                        switch (this.mode) {
                            case BLOCKING: {
                                this.flow.process(this.newEvent());
                                latch.countDown();
                                break;
                            }
                            case NON_BLOCKING: {
                                this.processNonBlocking(this.flow, this.newEvent(), t -> latch.countDown(), response -> Exceptions.bubble((Throwable)((Object)new AssertionError((Object)"Unexpected error"))));
                            }
                        }
                        continue;
                    }
                    catch (MuleException e) {
                        throw new RuntimeException(e);
                    }
                }
            });
        }
        Assert.assertThat((Object)latch.await(5000L, TimeUnit.MILLISECONDS), (Matcher)CoreMatchers.is((Object)true));
    }

    @Test
    public void errorsStream() throws Exception {
        this.flow.setMessageProcessors(Arrays.asList(this.failingProcessor));
        this.flow.initialise();
        this.flow.start();
        CountDownLatch latch = new CountDownLatch(2000);
        block6: for (int i = 0; i < 2000; ++i) {
            switch (this.mode) {
                case BLOCKING: {
                    try {
                        this.flow.process(this.newEvent());
                        Assert.fail((String)"Unexpected success");
                    }
                    catch (Throwable t2) {
                        latch.countDown();
                    }
                    continue block6;
                }
                case NON_BLOCKING: {
                    this.processNonBlocking(this.flow, this.newEvent(), response -> Exceptions.bubble((Throwable)((Object)new AssertionError((Object)"Unexpected success"))), t -> latch.countDown());
                }
            }
        }
        Assert.assertThat((Object)latch.await(5000L, TimeUnit.MILLISECONDS), (Matcher)CoreMatchers.is((Object)true));
    }

    @Test
    public void errorSuccessStream() throws Exception {
        this.flow.setMessageProcessors(Arrays.asList(this.errorSuccessProcessor));
        this.flow.initialise();
        this.flow.start();
        CountDownLatch sucessLatch = new CountDownLatch(1000);
        CountDownLatch errorLatch = new CountDownLatch(1000);
        block6: for (int i = 0; i < 2000; ++i) {
            switch (this.mode) {
                case BLOCKING: {
                    try {
                        this.flow.process(this.newEvent());
                        sucessLatch.countDown();
                    }
                    catch (Throwable t2) {
                        errorLatch.countDown();
                    }
                    continue block6;
                }
                case NON_BLOCKING: {
                    this.processNonBlocking(this.flow, this.newEvent(), response -> sucessLatch.countDown(), t -> errorLatch.countDown());
                }
            }
        }
        Assert.assertThat((Object)sucessLatch.await(5000L, TimeUnit.MILLISECONDS), (Matcher)CoreMatchers.is((Object)true));
        Assert.assertThat((Object)errorLatch.await(5000L, TimeUnit.MILLISECONDS), (Matcher)CoreMatchers.is((Object)true));
    }

    protected void processNonBlocking(Flow flow, Event event, Consumer<Event> onResponse, Consumer<Throwable> onError) {
        Mono.just((Object)event).transform((Function)flow).subscribe(Operators.requestUnbounded());
        Flux.from((Publisher)event.getContext().getResponsePublisher()).subscribe(onResponse, onError);
    }

    @Test
    public abstract void tx() throws Exception;

    protected void testAsyncCpuLightNotificationThreads(AtomicReference<Thread> beforeThread, AtomicReference<Thread> afterThread) throws Exception {
        muleContext.getNotificationManager().addInterfaceToType(MessageProcessorNotificationListener.class, MessageProcessorNotification.class);
        muleContext.getNotificationManager().addListener((ServerNotificationListener)((MessageProcessorNotificationListener)notification -> {
            if (notification.getAction() == 1601) {
                beforeThread.set(Thread.currentThread());
            } else if (notification.getAction() == 1602) {
                afterThread.set(Thread.currentThread());
            }
        }));
        this.flow.setMessageProcessors(Collections.singletonList(this.annotatedAsyncProcessor));
        this.flow.initialise();
        this.flow.start();
        this.process((Processor)this.flow, this.testEvent());
    }

    public static Matcher<Integer> between(int min, int max) {
        return CoreMatchers.allOf((Matcher)Matchers.greaterThanOrEqualTo((Comparable)Integer.valueOf(min)), (Matcher)Matchers.lessThanOrEqualTo((Comparable)Integer.valueOf(max)));
    }

    public static Matcher<Long> between(long min, long max) {
        return CoreMatchers.allOf((Matcher)Matchers.greaterThanOrEqualTo((Comparable)Long.valueOf(min)), (Matcher)Matchers.lessThanOrEqualTo((Comparable)Long.valueOf(max)));
    }

    protected void expectRejected() {
        this.expectedException.expect(MessagingException.class);
        this.expectedException.expect(this.overloadErrorTypeMatcher());
        this.expectedException.expectCause(CoreMatchers.instanceOf(RejectedExecutionException.class));
    }

    private TypeSafeMatcher<MessagingException> overloadErrorTypeMatcher() {
        return new TypeSafeMatcher<MessagingException>(){
            private String errorTypeId;

            public void describeTo(Description description) {
                description.appendValue((Object)this.errorTypeId);
            }

            protected boolean matchesSafely(MessagingException item) {
                this.errorTypeId = ((Error)item.getEvent().getError().get()).getErrorType().getIdentifier();
                return "OVERLOAD".equals(this.errorTypeId);
            }
        };
    }

    class AnnotatedAsyncProcessor
    extends AbstractAnnotatedObject
    implements AnnotatedProcessor {
        AnnotatedAsyncProcessor() {
        }

        public Event process(Event event) throws MuleException {
            return AbstractProcessingStrategyTestCase.this.asyncProcessor.process(event);
        }

        public Publisher<Event> apply(Publisher<Event> publisher) {
            return AbstractProcessingStrategyTestCase.this.asyncProcessor.apply(publisher);
        }

        public ComponentLocation getLocation() {
            return AbstractMuleTestCase.TEST_CONNECTOR_LOCATION;
        }

        public ReactiveProcessor.ProcessingType getProcessingType() {
            return AbstractProcessingStrategyTestCase.this.asyncProcessor.getProcessingType();
        }
    }

    class ThreadTrackingProcessor
    implements Processor {
        ThreadTrackingProcessor() {
        }

        public Event process(Event event) throws MuleException {
            AbstractProcessingStrategyTestCase.this.threads.add(Thread.currentThread().getName());
            return event;
        }

        public Publisher<Event> apply(Publisher<Event> publisher) {
            if (this.getProcessingType() == ReactiveProcessor.ProcessingType.CPU_LITE_ASYNC) {
                return Flux.from(publisher).transform(processorPublisher -> super.apply(publisher)).publishOn(Schedulers.fromExecutorService((ExecutorService)AbstractProcessingStrategyTestCase.this.custom));
            }
            return super.apply(publisher);
        }
    }

    static class RejectingScheduler
    extends TestScheduler {
        public RejectingScheduler() {
            super(1, "prefix");
        }

        @Override
        public Future<?> submit(Runnable task) {
            throw new RejectedExecutionException();
        }
    }

    static class TestScheduler
    extends ScheduledThreadPoolExecutor
    implements Scheduler {
        private ExecutorService executor;

        public TestScheduler(int threads, String threadNamePrefix) {
            super(1, (ThreadFactory)new NamedThreadFactory(threadNamePrefix + ".tasks"));
            this.executor = new ThreadPoolExecutor(threads, threads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), (ThreadFactory)new NamedThreadFactory(threadNamePrefix));
        }

        @Override
        public Future<?> submit(Runnable task) {
            return this.executor.submit(task);
        }

        public void stop() {
            this.shutdownNow();
            this.executor.shutdownNow();
        }

        public ScheduledFuture<?> scheduleWithCronExpression(Runnable command, String cronExpression) {
            throw new UnsupportedOperationException("Cron expression scheduling is not supported in unit tests. You need the productive service implementation.");
        }

        public ScheduledFuture<?> scheduleWithCronExpression(Runnable command, String cronExpression, TimeZone timeZone) {
            throw new UnsupportedOperationException("Cron expression scheduling is not supported in unit tests. You need the productive service implementation.");
        }

        public String getName() {
            return TestScheduler.class.getSimpleName();
        }
    }

    class MultipleInvocationLatchedProcessor
    implements Processor {
        private ReactiveProcessor.ProcessingType type;
        private volatile Latch latch = new Latch();
        private volatile CountDownLatch allLatchedLatch;
        private volatile Latch unlatchedInvocationLatch;
        private AtomicInteger invocations;

        public MultipleInvocationLatchedProcessor(ReactiveProcessor.ProcessingType type, int latchedInvocations) {
            this.type = type;
            this.allLatchedLatch = new CountDownLatch(latchedInvocations);
            this.unlatchedInvocationLatch = new Latch();
            this.invocations = new AtomicInteger(latchedInvocations);
        }

        public Event process(Event event) throws MuleException {
            AbstractProcessingStrategyTestCase.this.threads.add(Thread.currentThread().getName());
            if (this.invocations.getAndDecrement() > 0) {
                this.allLatchedLatch.countDown();
                try {
                    this.latch.await();
                }
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            } else {
                this.unlatchedInvocationLatch.countDown();
            }
            return event;
        }

        public ReactiveProcessor.ProcessingType getProcessingType() {
            return this.type;
        }

        public void release() {
            this.latch.release();
        }

        public CountDownLatch getAllLatchedLatch() throws InterruptedException {
            return this.allLatchedLatch;
        }

        public Latch getUnlatchedInvocationLatch() throws InterruptedException {
            return this.unlatchedInvocationLatch;
        }
    }
}

