/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.routing.requestreply;

import java.beans.ExceptionListener;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mockito;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.message.Message;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.core.api.Event;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.construct.Flow;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.lifecycle.LifecycleUtils;
import org.mule.runtime.core.api.processor.MessageProcessors;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.routing.ResponseTimeoutException;
import org.mule.runtime.core.api.source.MessageSource;
import org.mule.runtime.core.api.util.concurrent.Latch;
import org.mule.runtime.core.processor.AsyncDelegateMessageProcessor;
import org.mule.runtime.core.routing.correlation.EventCorrelatorTestCase;
import org.mule.runtime.core.routing.requestreply.AbstractAsyncRequestReplyRequester;
import org.mule.runtime.core.util.store.MuleObjectStoreManager;
import org.mule.tck.SensingNullMessageProcessor;
import org.mule.tck.junit4.AbstractMuleContextTestCase;
import org.mule.tck.probe.JUnitLambdaProbe;
import org.mule.tck.probe.PollingProber;
import org.mule.tck.probe.Probe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncRequestReplyRequesterTestCase
extends AbstractMuleContextTestCase
implements ExceptionListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(EventCorrelatorTestCase.class);
    private Scheduler scheduler;
    private TestAsyncRequestReplyRequester asyncReplyMP;
    private Flow flow;
    private AsyncDelegateMessageProcessor asyncMP;

    protected void doSetUp() throws Exception {
        super.doSetUp();
        muleContext.getRegistry().registerObject("_muleObjectStoreManager", (Object)new MuleObjectStoreManager());
        this.scheduler = muleContext.getSchedulerService().cpuIntensiveScheduler();
        this.flow = Flow.builder((String)"flowName", (MuleContext)muleContext).build();
        this.flow.initialise();
        this.flow.start();
    }

    protected void doTearDown() throws Exception {
        this.scheduler.stop();
        if (this.asyncReplyMP != null) {
            this.asyncReplyMP.stop();
            this.asyncReplyMP.dispose();
        }
        this.flow.stop();
        this.flow.dispose();
        super.doTearDown();
    }

    @Test
    public void testSingleEventNoTimeout() throws Exception {
        this.asyncReplyMP = new TestAsyncRequestReplyRequester(muleContext);
        SensingNullMessageProcessor target = this.getSensingNullMessageProcessor();
        this.asyncReplyMP.setListener((Processor)target);
        this.asyncReplyMP.setReplySource(target.getMessageSource());
        this.asyncReplyMP.setMuleContext(muleContext);
        Event resultEvent = this.asyncReplyMP.process(this.testEvent());
        Assert.assertEquals((Object)this.testEvent().getMessageAsString(muleContext), (Object)resultEvent.getMessageAsString(muleContext));
    }

    @Test
    public void testSingleEventNoTimeoutAsync() throws Exception {
        this.asyncReplyMP = new TestAsyncRequestReplyRequester(muleContext);
        SensingNullMessageProcessor target = this.getSensingNullMessageProcessor();
        AsyncDelegateMessageProcessor asyncMP = this.createAsyncMessageProcessor(target);
        asyncMP.setFlowConstruct((FlowConstruct)this.flow);
        LifecycleUtils.initialiseIfNeeded((Object)asyncMP, (boolean)true, (MuleContext)muleContext);
        asyncMP.start();
        this.asyncReplyMP.setListener((Processor)asyncMP);
        this.asyncReplyMP.setReplySource(target.getMessageSource());
        this.asyncReplyMP.setMuleContext(muleContext);
        Event resultEvent = this.asyncReplyMP.process(this.testEvent());
        Assert.assertEquals((Object)this.testEvent().getMessageAsString(muleContext), (Object)resultEvent.getMessageAsString(muleContext));
    }

    @Test
    public void testSingleEventTimeout() throws Exception {
        this.asyncReplyMP = new TestAsyncRequestReplyRequester(muleContext);
        this.asyncReplyMP.setTimeout(1L);
        SensingNullMessageProcessor target = this.getSensingNullMessageProcessor();
        target.setWaitTime(30000L);
        AsyncDelegateMessageProcessor asyncMP = this.createAsyncMessageProcessor(target);
        asyncMP.setFlowConstruct((FlowConstruct)this.flow);
        LifecycleUtils.initialiseIfNeeded((Object)asyncMP, (boolean)true, (MuleContext)muleContext);
        asyncMP.start();
        this.asyncReplyMP.setListener((Processor)asyncMP);
        this.asyncReplyMP.setReplySource(target.getMessageSource());
        this.asyncReplyMP.setMuleContext(muleContext);
        Event event = AsyncRequestReplyRequesterTestCase.eventBuilder().message(Message.of((Object)"Test Message")).build();
        try {
            this.asyncReplyMP.process(event);
            Assert.fail((String)"ResponseTimeoutException expected");
        }
        catch (Exception e) {
            Assert.assertEquals(ResponseTimeoutException.class, e.getClass());
        }
    }

    @Test
    @Ignore(value="See MULE-8830")
    public void returnsNullWhenInterruptedWhileWaitingForReply() throws Exception {
        final Latch fakeLatch = new Latch(){

            public void await() throws InterruptedException {
                throw new InterruptedException();
            }
        };
        this.asyncReplyMP = new TestAsyncRequestReplyRequester(muleContext){

            protected Latch createEventLock() {
                return fakeLatch;
            }
        };
        CountDownLatch processingLatch = new CountDownLatch(1);
        Processor target = (Processor)Mockito.mock(Processor.class);
        this.asyncReplyMP.setListener(target);
        MessageSource messageSource = (MessageSource)Mockito.mock(MessageSource.class);
        this.asyncReplyMP.setReplySource(messageSource);
        this.asyncReplyMP.setMuleContext(muleContext);
        boolean[] exceptionThrown = new boolean[1];
        Object[] responseEvent = new Object[1];
        Thread thread = new Thread(() -> {
            try {
                responseEvent[0] = this.asyncReplyMP.process(this.testEvent());
            }
            catch (MuleException e) {
                exceptionThrown[0] = true;
            }
            finally {
                processingLatch.countDown();
            }
        });
        thread.start();
        Assert.assertTrue((boolean)processingLatch.await(5000L, TimeUnit.MILLISECONDS));
        Assert.assertFalse((boolean)exceptionThrown[0]);
        junit.framework.Assert.assertNull((Object)responseEvent[0]);
    }

    @Test
    @Ignore(value="See MULE-8830")
    public void testMultiple() throws Exception {
        this.asyncReplyMP = new TestAsyncRequestReplyRequester(muleContext);
        SensingNullMessageProcessor target = this.getSensingNullMessageProcessor();
        target.setWaitTime(50L);
        AsyncDelegateMessageProcessor asyncMP = this.createAsyncMessageProcessor(target);
        asyncMP.initialise();
        this.asyncReplyMP.setListener((Processor)asyncMP);
        this.asyncReplyMP.setReplySource(target.getMessageSource());
        AtomicInteger count = new AtomicInteger();
        for (int i = 0; i < 500; ++i) {
            this.scheduler.execute(() -> {
                try {
                    Event resultEvent = this.asyncReplyMP.process(this.testEvent());
                    Assert.assertEquals((Object)this.testEvent().getMessageAsString(muleContext), (Object)resultEvent.getMessageAsString(muleContext));
                    count.incrementAndGet();
                    LOGGER.debug("Finished " + count.get());
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });
        }
        new PollingProber().check((Probe)new JUnitLambdaProbe(() -> {
            Assert.assertThat((Object)count.get(), (Matcher)Matchers.greaterThanOrEqualTo((Comparable)Integer.valueOf(500)));
            return true;
        }));
    }

    @After
    public void after() throws MuleException {
        LifecycleUtils.stopIfNeeded((Object)this.asyncMP);
        LifecycleUtils.disposeIfNeeded((Object)this.asyncMP, (Logger)LOGGER);
    }

    protected AsyncDelegateMessageProcessor createAsyncMessageProcessor(SensingNullMessageProcessor target) throws InitialisationException {
        this.asyncMP = new AsyncDelegateMessageProcessor(MessageProcessors.newChain((Processor[])new Processor[]{target}));
        LifecycleUtils.initialiseIfNeeded((Object)this.asyncMP, (boolean)true, (MuleContext)muleContext);
        return this.asyncMP;
    }

    @Override
    public void exceptionThrown(Exception e) {
        e.printStackTrace();
        Assert.fail((String)e.getMessage());
    }

    class TestAsyncRequestReplyRequester
    extends AbstractAsyncRequestReplyRequester {
        TestAsyncRequestReplyRequester(MuleContext muleContext) throws MuleException {
            this.setMuleContext(muleContext);
            this.initialise();
            this.start();
        }
    }
}

