/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.routing.requestreply;

import java.beans.ExceptionListener;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.hamcrest.collection.IsEmptyCollection;
import org.junit.After;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mockito;
import org.mule.runtime.api.component.location.ConfigurationComponentLocator;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.message.Message;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.store.SimpleMemoryObjectStore;
import org.mule.runtime.api.util.concurrent.Latch;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.lifecycle.LifecycleUtils;
import org.mule.runtime.core.api.message.GroupCorrelation;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.source.MessageSource;
import org.mule.runtime.core.internal.processor.AsyncDelegateMessageProcessor;
import org.mule.runtime.core.internal.routing.correlation.EventCorrelatorTestCase;
import org.mule.runtime.core.internal.routing.requestreply.AbstractAsyncRequestReplyRequester;
import org.mule.runtime.core.internal.util.store.MuleObjectStoreManager;
import org.mule.runtime.core.privileged.event.PrivilegedEvent;
import org.mule.runtime.core.privileged.processor.MessageProcessors;
import org.mule.runtime.core.privileged.routing.ResponseTimeoutException;
import org.mule.tck.MuleTestUtils;
import org.mule.tck.SensingNullMessageProcessor;
import org.mule.tck.junit4.AbstractMuleContextTestCase;
import org.mule.tck.junit4.AbstractMuleTestCase;
import org.mule.tck.probe.JUnitLambdaProbe;
import org.mule.tck.probe.PollingProber;
import org.mule.tck.probe.Probe;
import org.mule.tck.util.MuleContextUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncRequestReplyRequesterTestCase
extends AbstractMuleContextTestCase
implements ExceptionListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(EventCorrelatorTestCase.class);
    private Scheduler scheduler;
    private TestAsyncRequestReplyRequester asyncReplyMP;
    private AsyncDelegateMessageProcessor asyncMP;

    protected Map<String, Object> getStartUpRegistryObjects() {
        HashMap<String, Object> objects = new HashMap<String, Object>();
        objects.put("_muleObjectStoreManager", new MuleObjectStoreManager());
        objects.put("_muleConfigurationComponentLocator", this.componentLocator);
        return objects;
    }

    protected void doSetUp() throws Exception {
        super.doSetUp();
        this.scheduler = muleContext.getSchedulerService().cpuIntensiveScheduler();
        MuleTestUtils.createAndRegisterFlow((MuleContext)muleContext, (String)"appleFlow", (ConfigurationComponentLocator)this.componentLocator);
    }

    protected void doTearDown() throws Exception {
        this.scheduler.stop();
        if (this.asyncReplyMP != null) {
            this.asyncReplyMP.stop();
            this.asyncReplyMP.dispose();
        }
        super.doTearDown();
    }

    @Test
    public void testSingleEventNoTimeout() throws Exception {
        this.asyncReplyMP = new TestAsyncRequestReplyRequester(muleContext);
        SensingNullMessageProcessor target = this.getSensingNullMessageProcessor();
        this.asyncReplyMP.setListener((Processor)target);
        this.asyncReplyMP.setReplySource(target.getMessageSource());
        this.asyncReplyMP.setMuleContext(muleContext);
        CoreEvent resultEvent = this.asyncReplyMP.process(this.testEvent());
        Assert.assertEquals((Object)((PrivilegedEvent)this.testEvent()).getMessageAsString(muleContext), (Object)((PrivilegedEvent)resultEvent).getMessageAsString(muleContext));
    }

    @Test
    public void testSingleEventNoTimeoutAsync() throws Exception {
        this.asyncReplyMP = new TestAsyncRequestReplyRequester(muleContext);
        SensingNullMessageProcessor target = this.getSensingNullMessageProcessor();
        AsyncDelegateMessageProcessor asyncMP = this.createAsyncMessageProcessor(target);
        LifecycleUtils.initialiseIfNeeded((Object)asyncMP, (boolean)true, (MuleContext)muleContext);
        asyncMP.start();
        this.asyncReplyMP.setListener((Processor)asyncMP);
        this.asyncReplyMP.setReplySource(target.getMessageSource());
        this.asyncReplyMP.setMuleContext(muleContext);
        CoreEvent resultEvent = this.asyncReplyMP.process(this.testEvent());
        Assert.assertEquals((Object)((PrivilegedEvent)this.testEvent()).getMessageAsString(muleContext), (Object)((PrivilegedEvent)resultEvent).getMessageAsString(muleContext));
    }

    @Test
    public void testSingleEventTimeout() throws Exception {
        this.asyncReplyMP = new TestAsyncRequestReplyRequester(muleContext);
        this.asyncReplyMP.setTimeout(1L);
        SensingNullMessageProcessor target = this.getSensingNullMessageProcessor();
        target.setWaitTime(30000L);
        AsyncDelegateMessageProcessor asyncMP = this.createAsyncMessageProcessor(target);
        LifecycleUtils.initialiseIfNeeded((Object)asyncMP, (boolean)true, (MuleContext)muleContext);
        asyncMP.start();
        this.asyncReplyMP.setListener((Processor)asyncMP);
        this.asyncReplyMP.setReplySource(target.getMessageSource());
        this.asyncReplyMP.setMuleContext(muleContext);
        CoreEvent event = MuleContextUtils.eventBuilder((MuleContext)muleContext).message(Message.of((Object)"Test Message")).build();
        try {
            this.asyncReplyMP.process(event);
            Assert.fail((String)"ResponseTimeoutException expected");
        }
        catch (Exception e) {
            Assert.assertThat((Object)e, (Matcher)Matchers.instanceOf(ResponseTimeoutException.class));
        }
    }

    @Test
    @Ignore(value="See MULE-8830")
    public void returnsNullWhenInterruptedWhileWaitingForReply() throws Exception {
        final Latch fakeLatch = new Latch(){

            public void await() throws InterruptedException {
                throw new InterruptedException();
            }
        };
        this.asyncReplyMP = new TestAsyncRequestReplyRequester(muleContext){

            protected Latch createEventLock() {
                return fakeLatch;
            }
        };
        CountDownLatch processingLatch = new CountDownLatch(1);
        Processor target = (Processor)Mockito.mock(Processor.class);
        this.asyncReplyMP.setListener(target);
        MessageSource messageSource = (MessageSource)Mockito.mock(MessageSource.class);
        this.asyncReplyMP.setReplySource(messageSource);
        this.asyncReplyMP.setMuleContext(muleContext);
        boolean[] exceptionThrown = new boolean[1];
        Object[] responseEvent = new Object[1];
        Thread thread = new Thread(() -> {
            try {
                responseEvent[0] = this.asyncReplyMP.process(this.testEvent());
            }
            catch (MuleException e) {
                exceptionThrown[0] = true;
            }
            finally {
                processingLatch.countDown();
            }
        });
        thread.start();
        Assert.assertTrue((boolean)processingLatch.await(5000L, TimeUnit.MILLISECONDS));
        Assert.assertFalse((boolean)exceptionThrown[0]);
        junit.framework.Assert.assertNull((Object)responseEvent[0]);
    }

    @Test
    @Ignore(value="See MULE-8830")
    public void testMultiple() throws Exception {
        this.asyncReplyMP = new TestAsyncRequestReplyRequester(muleContext);
        SensingNullMessageProcessor target = this.getSensingNullMessageProcessor();
        target.setWaitTime(50L);
        AsyncDelegateMessageProcessor asyncMP = this.createAsyncMessageProcessor(target);
        asyncMP.initialise();
        this.asyncReplyMP.setListener((Processor)asyncMP);
        this.asyncReplyMP.setReplySource(target.getMessageSource());
        AtomicInteger count = new AtomicInteger();
        for (int i = 0; i < 500; ++i) {
            this.scheduler.execute(() -> {
                try {
                    CoreEvent resultEvent = this.asyncReplyMP.process(this.testEvent());
                    Assert.assertEquals((Object)((PrivilegedEvent)this.testEvent()).getMessageAsString(muleContext), (Object)((PrivilegedEvent)resultEvent).getMessageAsString(muleContext));
                    count.incrementAndGet();
                    LOGGER.debug("Finished " + count.get());
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });
        }
        new PollingProber().check((Probe)new JUnitLambdaProbe(() -> {
            Assert.assertThat((Object)count.get(), (Matcher)Matchers.greaterThanOrEqualTo((Comparable)Integer.valueOf(500)));
            return true;
        }));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testResponseEventsCleanedUp() throws Exception {
        RelaxedAsyncReplyMP mp = new RelaxedAsyncReplyMP(muleContext);
        try {
            CoreEvent event = MuleContextUtils.eventBuilder((MuleContext)muleContext).message(Message.of((Object)"message1")).groupCorrelation(Optional.of(GroupCorrelation.of((int)0, (int)3))).build();
            SensingNullMessageProcessor listener = this.getSensingNullMessageProcessor();
            mp.setListener((Processor)listener);
            mp.setReplySource(listener.getMessageSource());
            mp.process(event);
            Map<String, PrivilegedEvent> responseEvents = mp.getResponseEvents();
            Assert.assertThat(responseEvents.entrySet(), (Matcher)IsEmptyCollection.empty());
        }
        finally {
            mp.stop();
        }
    }

    @After
    public void after() throws MuleException {
        LifecycleUtils.stopIfNeeded((Object)this.asyncMP);
        LifecycleUtils.disposeIfNeeded((Object)this.asyncMP, (Logger)LOGGER);
    }

    protected AsyncDelegateMessageProcessor createAsyncMessageProcessor(SensingNullMessageProcessor target) throws InitialisationException {
        this.asyncMP = new AsyncDelegateMessageProcessor(MessageProcessors.newChain(Optional.empty(), (Processor[])new Processor[]{target}));
        this.asyncMP.setAnnotations(AsyncRequestReplyRequesterTestCase.getAppleFlowComponentLocationAnnotations());
        LifecycleUtils.initialiseIfNeeded((Object)this.asyncMP, (boolean)true, (MuleContext)muleContext);
        return this.asyncMP;
    }

    @Override
    public void exceptionThrown(Exception e) {
        e.printStackTrace();
        Assert.fail((String)e.getMessage());
    }

    private static final class RelaxedAsyncReplyMP
    extends AbstractAsyncRequestReplyRequester {
        private RelaxedAsyncReplyMP(MuleContext muleContext) throws MuleException {
            this.store = new SimpleMemoryObjectStore();
            this.name = "asyncReply";
            this.setMuleContext(muleContext);
            this.start();
        }

        public Map<String, PrivilegedEvent> getResponseEvents() {
            return this.responseEvents;
        }
    }

    class TestAsyncRequestReplyRequester
    extends AbstractAsyncRequestReplyRequester {
        TestAsyncRequestReplyRequester(MuleContext muleContext) throws MuleException {
            this.setAnnotations(Collections.singletonMap(LOCATION_KEY, AbstractMuleTestCase.TEST_CONNECTOR_LOCATION));
            this.setMuleContext(muleContext);
            this.initialise();
            this.start();
        }
    }
}

