/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.construct;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.InOrder;
import org.mockito.Matchers;
import org.mockito.MockSettings;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.i18n.I18nMessage;
import org.mule.runtime.api.lifecycle.Disposable;
import org.mule.runtime.api.lifecycle.LifecycleException;
import org.mule.runtime.api.lifecycle.Startable;
import org.mule.runtime.api.lifecycle.Stoppable;
import org.mule.runtime.api.message.Message;
import org.mule.runtime.api.metadata.TypedValue;
import org.mule.runtime.api.scheduler.SchedulerConfig;
import org.mule.runtime.api.scheduler.SchedulerService;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.construct.Flow;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.Sink;
import org.mule.runtime.core.api.processor.strategy.AsyncProcessingStrategyFactory;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategyFactory;
import org.mule.runtime.core.api.source.MessageSource;
import org.mule.runtime.core.internal.construct.AbstractFlowConstruct;
import org.mule.runtime.core.internal.construct.AbstractFlowConstructTestCase;
import org.mule.runtime.core.internal.construct.DefaultFlowBuilder;
import org.mule.runtime.core.internal.processor.ResponseMessageProcessorAdapter;
import org.mule.runtime.core.internal.processor.strategy.BlockingProcessingStrategyFactory;
import org.mule.runtime.core.internal.transformer.simple.StringAppendTransformer;
import org.mule.runtime.core.privileged.event.PrivilegedEvent;
import org.mule.tck.SensingNullMessageProcessor;
import org.mule.tck.core.lifecycle.LifecycleTrackerProcessor;
import org.mule.tck.util.MuleContextUtils;
import reactor.core.publisher.Mono;

@RunWith(value=Parameterized.class)
public class DefaultFlowTestCase
extends AbstractFlowConstructTestCase {
    private static final String FLOW_NAME = "test-flow";
    private DefaultFlowBuilder.DefaultFlow flow;
    private DefaultFlowBuilder.DefaultFlow stoppedFlow;
    private SensingNullMessageProcessor sensingMessageProcessor;
    private BiFunction<Processor, CoreEvent, CoreEvent> triggerFunction;
    @Rule
    public ExpectedException expectedException = ExpectedException.none();

    public DefaultFlowTestCase(BiFunction<Processor, CoreEvent, CoreEvent> triggerFunction) {
        this.triggerFunction = triggerFunction;
    }

    @Parameterized.Parameters
    public static Collection<Object[]> parameters() {
        BiFunction<Processor, CoreEvent, CoreEvent> blocking = (listener, event) -> (CoreEvent)Mono.just((Object)event).transform((Function)listener).block();
        BiFunction<Processor, CoreEvent, CoreEvent> async = (listener, event) -> {
            try {
                return listener.process(event);
            }
            catch (MuleException e) {
                throw new RuntimeException(e);
            }
        };
        return Arrays.asList({blocking}, {async});
    }

    @Override
    protected void doSetUp() throws Exception {
        super.doSetUp();
        this.sensingMessageProcessor = this.getSensingNullMessageProcessor();
        ArrayList<Object> processors = new ArrayList<Object>();
        processors.add(new ResponseMessageProcessorAdapter((Processor)new StringAppendTransformer("f")));
        processors.add(new ResponseMessageProcessorAdapter((Processor)new StringAppendTransformer("e")));
        processors.add(new ResponseMessageProcessorAdapter((Processor)new StringAppendTransformer("d")));
        processors.add(new StringAppendTransformer("a"));
        processors.add(new StringAppendTransformer("b"));
        processors.add(new StringAppendTransformer("c"));
        processors.add(event -> CoreEvent.builder((CoreEvent)event).addVariable("thread", (Object)Thread.currentThread()).build());
        processors.add(this.sensingMessageProcessor);
        this.flow = (DefaultFlowBuilder.DefaultFlow)Flow.builder((String)FLOW_NAME, (MuleContext)muleContext).source((MessageSource)this.directInboundMessageSource).processors(processors).build();
        this.stoppedFlow = (DefaultFlowBuilder.DefaultFlow)Flow.builder((String)FLOW_NAME, (MuleContext)muleContext).source((MessageSource)this.directInboundMessageSource).processors(processors).initialState("stopped").build();
    }

    @Override
    protected AbstractFlowConstruct getFlowConstruct() throws Exception {
        return this.flow;
    }

    @Override
    protected AbstractFlowConstruct getStoppedFlowConstruct() throws Exception {
        return this.stoppedFlow;
    }

    @After
    public void after() throws MuleException {
        if (this.flow.isStarted()) {
            this.flow.stop();
        }
        if (this.flow.getLifecycleState().isInitialised()) {
            this.flow.dispose();
        }
        if (this.stoppedFlow.getLifecycleState().isInitialised()) {
            this.stoppedFlow.dispose();
        }
    }

    @Test
    public void testProcessOneWayEndpoint() throws Exception {
        this.flow.initialise();
        this.flow.start();
        CoreEvent event = MuleContextUtils.eventBuilder((MuleContext)muleContext).message(Message.of((Object)"test")).build();
        CoreEvent response = this.triggerFunction.apply(this.directInboundMessageSource.getListener(), event);
        this.assertSucessfulProcessing((PrivilegedEvent)response);
    }

    @Test
    public void testProcessRequestResponseEndpoint() throws Exception {
        this.flow.initialise();
        this.flow.start();
        CoreEvent response = this.triggerFunction.apply(this.directInboundMessageSource.getListener(), this.testEvent());
        this.assertSucessfulProcessing((PrivilegedEvent)response);
    }

    private void assertSucessfulProcessing(PrivilegedEvent response) throws MuleException {
        Assert.assertThat((Object)response.getMessageAsString(muleContext), (Matcher)CoreMatchers.equalTo((Object)"testabcdef"));
        Assert.assertThat((Object)((TypedValue)response.getVariables().get("thread")).getValue(), (Matcher)CoreMatchers.not((Matcher)CoreMatchers.sameInstance((Object)Thread.currentThread())));
        Assert.assertThat((Object)((PrivilegedEvent)this.sensingMessageProcessor.event).getMessageAsString(muleContext), (Matcher)CoreMatchers.equalTo((Object)"testabc"));
        Assert.assertThat((Object)((TypedValue)this.sensingMessageProcessor.event.getVariables().get("thread")).getValue(), (Matcher)CoreMatchers.not((Matcher)CoreMatchers.sameInstance((Object)Thread.currentThread())));
    }

    @Test
    public void testProcessStopped() throws Exception {
        this.flow.initialise();
        try {
            this.triggerFunction.apply(this.directInboundMessageSource.getListener(), this.testEvent());
            Assert.fail((String)"exception expected");
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void restartWithBlockingProcessingStrategy() throws Exception {
        this.after();
        this.flow = (DefaultFlowBuilder.DefaultFlow)Flow.builder((String)FLOW_NAME, (MuleContext)muleContext).source(this.flow.getSource()).processors(this.flow.getProcessors()).processingStrategyFactory((ProcessingStrategyFactory)new BlockingProcessingStrategyFactory()).build();
        this.flow.initialise();
        this.flow.start();
        this.flow.stop();
        this.flow.start();
        CoreEvent response = this.triggerFunction.apply(this.directInboundMessageSource.getListener(), this.testEvent());
        Assert.assertThat((Object)response, (Matcher)CoreMatchers.not((Matcher)CoreMatchers.nullValue()));
    }

    @Test
    public void testFailStartingMessageSourceOnLifecycleShouldStopStartedPipelineProcesses() throws Exception {
        muleContext.start();
        MessageSource mockMessageSource = (MessageSource)Mockito.mock(MessageSource.class, (MockSettings)Mockito.withSettings().extraInterfaces(new Class[]{Startable.class, Stoppable.class}));
        ((Startable)Mockito.doThrow((Throwable)new LifecycleException((I18nMessage)Mockito.mock(I18nMessage.class), (Object)mockMessageSource)).when((Object)((Startable)mockMessageSource))).start();
        ArrayList<Processor> processors = new ArrayList<Processor>(this.flow.getProcessors());
        Processor mockMessageProcessor = (Processor)Mockito.spy((Object)new LifecycleTrackerProcessor());
        processors.add(mockMessageProcessor);
        this.after();
        this.flow = (DefaultFlowBuilder.DefaultFlow)Flow.builder((String)FLOW_NAME, (MuleContext)muleContext).source(mockMessageSource).processors(processors).build();
        this.flow.initialise();
        try {
            this.flow.start();
            Assert.fail();
        }
        catch (LifecycleException lifecycleException) {
            // empty catch block
        }
        ((Startable)Mockito.verify((Object)((Startable)mockMessageProcessor), (VerificationMode)Mockito.times((int)1))).start();
        ((Stoppable)Mockito.verify((Object)((Stoppable)mockMessageProcessor), (VerificationMode)Mockito.times((int)1))).stop();
        ((Startable)Mockito.verify((Object)((Startable)mockMessageSource), (VerificationMode)Mockito.times((int)1))).start();
        ((Stoppable)Mockito.verify((Object)((Stoppable)mockMessageSource), (VerificationMode)Mockito.times((int)1))).stop();
    }

    @Test
    public void defaultMaxConcurrency() throws Exception {
        this.flow.initialise();
        this.flow.start();
        Assert.assertThat((Object)this.flow.getMaxConcurrency(), (Matcher)CoreMatchers.equalTo((Object)AsyncProcessingStrategyFactory.DEFAULT_MAX_CONCURRENCY));
        ((SchedulerService)Mockito.verify((Object)muleContext.getSchedulerService())).ioScheduler((SchedulerConfig)Matchers.eq((Object)muleContext.getSchedulerBaseConfig().withName(this.flow.getName() + "." + ReactiveProcessor.ProcessingType.BLOCKING.name())));
    }

    @Test
    public void customMaxConcurrency() throws Exception {
        int customMaxConcurrency = 1;
        Flow customFlow = Flow.builder((String)FLOW_NAME, (MuleContext)muleContext).source((MessageSource)this.directInboundMessageSource).processors(new Processor[]{this.getSensingNullMessageProcessor()}).maxConcurrency(customMaxConcurrency).build();
        try {
            customFlow.initialise();
            customFlow.start();
            Assert.assertThat((Object)customFlow.getMaxConcurrency(), (Matcher)CoreMatchers.equalTo((Object)customMaxConcurrency));
            ((SchedulerService)Mockito.verify((Object)muleContext.getSchedulerService())).ioScheduler((SchedulerConfig)Matchers.eq((Object)muleContext.getSchedulerBaseConfig().withName(this.flow.getName() + "." + ReactiveProcessor.ProcessingType.BLOCKING.name())));
            customFlow.stop();
        }
        finally {
            customFlow.dispose();
        }
    }

    @Test
    public void illegalCustomMaxConcurrency() {
        this.expectedException.expect(IllegalArgumentException.class);
        Flow.builder((String)FLOW_NAME, (MuleContext)muleContext).source((MessageSource)this.directInboundMessageSource).processors(new Processor[]{this.getSensingNullMessageProcessor()}).maxConcurrency(0).build();
    }

    @Test
    public void lifecycleOrder() throws MuleException {
        Sink sink = (Sink)Mockito.mock(Sink.class, (MockSettings)Mockito.withSettings().extraInterfaces(new Class[]{Disposable.class}));
        Processor processor = (Processor)Mockito.mock(Processor.class, (MockSettings)Mockito.withSettings().extraInterfaces(new Class[]{Startable.class, Stoppable.class}));
        ProcessingStrategy processingStrategy = (ProcessingStrategy)Mockito.mock(ProcessingStrategy.class, (MockSettings)Mockito.withSettings().extraInterfaces(new Class[]{Startable.class, Stoppable.class}));
        Mockito.when((Object)processingStrategy.createSink((FlowConstruct)Matchers.any(FlowConstruct.class), (ReactiveProcessor)Matchers.any(ReactiveProcessor.class))).thenReturn((Object)sink);
        this.flow = (DefaultFlowBuilder.DefaultFlow)Flow.builder((String)FLOW_NAME, (MuleContext)DefaultFlowTestCase.muleContext).source((MessageSource)this.directInboundMessageSource).processors(Collections.singletonList(processor)).processingStrategyFactory((muleContext, s) -> processingStrategy).build();
        this.flow.initialise();
        this.flow.start();
        InOrder inOrder = Mockito.inOrder((Object[])new Object[]{sink, processor, processingStrategy});
        ((Startable)inOrder.verify((Object)((Startable)processingStrategy))).start();
        ((ProcessingStrategy)inOrder.verify((Object)processingStrategy)).createSink((FlowConstruct)Matchers.any(FlowConstruct.class), (ReactiveProcessor)Matchers.any(ReactiveProcessor.class));
        ((Startable)inOrder.verify((Object)((Startable)processor))).start();
        this.flow.stop();
        ((Disposable)inOrder.verify((Object)((Disposable)sink))).dispose();
        ((Stoppable)inOrder.verify((Object)((Stoppable)processor))).stop();
        ((Stoppable)inOrder.verify((Object)((Stoppable)processingStrategy))).stop();
    }
}

