/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.construct;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.MockSettings;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.i18n.I18nMessage;
import org.mule.runtime.api.lifecycle.LifecycleException;
import org.mule.runtime.api.lifecycle.Startable;
import org.mule.runtime.api.lifecycle.Stoppable;
import org.mule.runtime.api.message.Message;
import org.mule.runtime.core.api.Event;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategyFactory;
import org.mule.runtime.core.api.source.MessageSource;
import org.mule.runtime.core.construct.AbstractFlowConstructTestCase;
import org.mule.runtime.core.internal.construct.AbstractFlowConstruct;
import org.mule.runtime.core.internal.construct.DefaultFlowBuilder;
import org.mule.runtime.core.processor.ResponseMessageProcessorAdapter;
import org.mule.runtime.core.processor.strategy.BlockingProcessingStrategyFactory;
import org.mule.runtime.core.transformer.simple.StringAppendTransformer;
import org.mule.tck.SensingNullMessageProcessor;
import org.mule.tck.core.lifecycle.LifecycleTrackerProcessor;
import reactor.core.publisher.Mono;

@RunWith(value=Parameterized.class)
public class DefaultFlowTestCase
extends AbstractFlowConstructTestCase {
    private static final String FLOW_NAME = "test-flow";
    private DefaultFlowBuilder.DefaultFlow flow;
    private SensingNullMessageProcessor sensingMessageProcessor;
    private BiFunction<Processor, Event, Event> triggerFunction;

    public DefaultFlowTestCase(BiFunction<Processor, Event, Event> triggerFunction) {
        this.triggerFunction = triggerFunction;
    }

    @Parameterized.Parameters
    public static Collection<Object[]> parameters() {
        BiFunction<Processor, Event, Event> blocking = (listener, event) -> (Event)Mono.just((Object)event).transform((Function)listener).block();
        BiFunction<Processor, Event, Event> async = (listener, event) -> {
            try {
                return listener.process(event);
            }
            catch (MuleException e) {
                throw new RuntimeException(e);
            }
        };
        return Arrays.asList({blocking}, {async});
    }

    @Override
    protected void doSetUp() throws Exception {
        super.doSetUp();
        this.sensingMessageProcessor = this.getSensingNullMessageProcessor();
        this.flow = new DefaultFlowBuilder.DefaultFlow(FLOW_NAME, muleContext);
        this.flow.setMessageSource((MessageSource)this.directInboundMessageSource);
        ArrayList<Object> processors = new ArrayList<Object>();
        processors.add(new ResponseMessageProcessorAdapter((Processor)new StringAppendTransformer("f")));
        processors.add(new ResponseMessageProcessorAdapter((Processor)new StringAppendTransformer("e")));
        processors.add(new ResponseMessageProcessorAdapter((Processor)new StringAppendTransformer("d")));
        processors.add(new StringAppendTransformer("a"));
        processors.add(new StringAppendTransformer("b"));
        processors.add(new StringAppendTransformer("c"));
        processors.add(event -> Event.builder((Event)event).addVariable("thread", (Object)Thread.currentThread()).build());
        processors.add(this.sensingMessageProcessor);
        this.flow.setMessageProcessors(processors);
    }

    @Override
    protected AbstractFlowConstruct getFlowConstruct() throws Exception {
        return this.flow;
    }

    @After
    public void after() throws MuleException {
        if (this.flow.isStarted()) {
            this.flow.stop();
        }
        if (this.flow.getLifecycleState().isInitialised()) {
            this.flow.dispose();
        }
    }

    @Test
    public void testProcessOneWayEndpoint() throws Exception {
        this.flow.initialise();
        this.flow.start();
        Event event = DefaultFlowTestCase.eventBuilder().message(Message.of((Object)"test")).build();
        Event response = this.triggerFunction.apply(this.directInboundMessageSource.getListener(), event);
        this.assertSucessfulProcessing(response);
    }

    @Test
    public void testProcessRequestResponseEndpoint() throws Exception {
        this.flow.initialise();
        this.flow.start();
        Event response = this.triggerFunction.apply(this.directInboundMessageSource.getListener(), this.testEvent());
        this.assertSucessfulProcessing(response);
    }

    private void assertSucessfulProcessing(Event response) throws MuleException {
        Assert.assertThat((Object)response.getMessageAsString(muleContext), (Matcher)CoreMatchers.equalTo((Object)"testabcdef"));
        Assert.assertThat((Object)response.getVariable("thread").getValue(), (Matcher)CoreMatchers.not((Matcher)CoreMatchers.sameInstance((Object)Thread.currentThread())));
        Assert.assertThat((Object)this.sensingMessageProcessor.event.getMessageAsString(muleContext), (Matcher)CoreMatchers.equalTo((Object)"testabc"));
        Assert.assertThat((Object)this.sensingMessageProcessor.event.getVariable("thread").getValue(), (Matcher)CoreMatchers.not((Matcher)CoreMatchers.sameInstance((Object)Thread.currentThread())));
    }

    @Test
    public void testProcessStopped() throws Exception {
        this.flow.initialise();
        try {
            this.triggerFunction.apply(this.directInboundMessageSource.getListener(), this.testEvent());
            Assert.fail((String)"exception expected");
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void restartWithBlockingProcessingStrategy() throws Exception {
        this.flow.setProcessingStrategyFactory((ProcessingStrategyFactory)new BlockingProcessingStrategyFactory());
        this.flow.initialise();
        this.flow.start();
        this.flow.stop();
        this.flow.start();
        Event response = this.triggerFunction.apply(this.directInboundMessageSource.getListener(), this.testEvent());
        Assert.assertThat((Object)response, (Matcher)CoreMatchers.not((Matcher)CoreMatchers.nullValue()));
    }

    @Test
    public void testFailStartingMessageSourceOnLifecycleShouldStopStartedPipelineProcesses() throws Exception {
        muleContext.start();
        MessageSource mockMessageSource = (MessageSource)Mockito.mock(MessageSource.class, (MockSettings)Mockito.withSettings().extraInterfaces(new Class[]{Startable.class, Stoppable.class}));
        ((Startable)Mockito.doThrow((Throwable)new LifecycleException((I18nMessage)Mockito.mock(I18nMessage.class), (Object)"Error starting component")).when((Object)((Startable)mockMessageSource))).start();
        this.flow.setMessageSource(mockMessageSource);
        Processor mockMessageProcessor = (Processor)Mockito.spy((Object)new LifecycleTrackerProcessor());
        this.flow.getMessageProcessors().add(mockMessageProcessor);
        this.flow.initialise();
        try {
            this.flow.start();
            Assert.fail();
        }
        catch (LifecycleException lifecycleException) {
            // empty catch block
        }
        ((Startable)Mockito.verify((Object)((Startable)mockMessageProcessor), (VerificationMode)Mockito.times((int)1))).start();
        ((Stoppable)Mockito.verify((Object)((Stoppable)mockMessageProcessor), (VerificationMode)Mockito.times((int)1))).stop();
        ((Startable)Mockito.verify((Object)((Startable)mockMessageSource), (VerificationMode)Mockito.times((int)1))).start();
        ((Stoppable)Mockito.verify((Object)((Stoppable)mockMessageSource), (VerificationMode)Mockito.times((int)1))).stop();
    }
}

