/*
 * Decompiled with CFR 0.152.
 */
package org.marketcetera.modules.async;

import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import javax.management.Attribute;
import javax.management.AttributeList;
import javax.management.AttributeNotFoundException;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanInfo;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.ReflectionException;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.marketcetera.core.LoggerConfiguration;
import org.marketcetera.module.BlockingSinkDataListener;
import org.marketcetera.module.CopierModule;
import org.marketcetera.module.CopierModuleFactory;
import org.marketcetera.module.DataFlowID;
import org.marketcetera.module.DataFlowInfo;
import org.marketcetera.module.DataFlowStep;
import org.marketcetera.module.DataRequest;
import org.marketcetera.module.ExpectedFailure;
import org.marketcetera.module.IllegalRequestParameterValue;
import org.marketcetera.module.Messages;
import org.marketcetera.module.ModuleManager;
import org.marketcetera.module.ModuleState;
import org.marketcetera.module.ModuleTestBase;
import org.marketcetera.module.ModuleURN;
import org.marketcetera.module.SinkDataListener;
import org.marketcetera.module.SinkModuleFactory;
import org.marketcetera.modules.async.BlockingModuleFactory;
import org.marketcetera.modules.async.SimpleAsyncProcessor;
import org.marketcetera.modules.async.SimpleAsyncProcessorFactory;
import org.marketcetera.util.log.I18NMessage;
import org.marketcetera.util.misc.ClassVersion;

@ClassVersion(value="$Id: AsyncModuleTest.java 16154 2012-07-14 16:34:05Z colin $")
public class AsyncModuleTest
extends ModuleTestBase {
    private ModuleManager mManager;

    @BeforeClass
    public static void logSetup() {
        LoggerConfiguration.logSetup();
    }

    @Test
    public void info() throws Exception {
        AsyncModuleTest.assertProviderInfo((ModuleManager)this.mManager, (ModuleURN)SimpleAsyncProcessorFactory.PROVIDER_URN, (String[])new String[]{ModuleURN.class.getName()}, (Class[])new Class[]{ModuleURN.class}, (String)org.marketcetera.modules.async.Messages.PROVIDER_DESCRIPTION.getText(), (boolean)true, (boolean)true);
        ModuleURN instanceURN = new ModuleURN(SimpleAsyncProcessorFactory.PROVIDER_URN, "mymodule");
        Assert.assertEquals((Object)instanceURN, (Object)this.mManager.createModule(SimpleAsyncProcessorFactory.PROVIDER_URN, new Object[]{instanceURN}));
        AsyncModuleTest.assertModuleInfo((ModuleManager)this.mManager, (ModuleURN)instanceURN, (ModuleState)ModuleState.STARTED, null, null, (boolean)false, (boolean)true, (boolean)true, (boolean)true, (boolean)false);
        Assert.assertTrue((boolean)this.getAttributes(instanceURN).isEmpty());
        this.mManager.stop(instanceURN);
        this.mManager.deleteModule(instanceURN);
    }

    @Test
    public void requestParameters() throws Exception {
        final ModuleURN instanceURN = new ModuleURN(SimpleAsyncProcessorFactory.PROVIDER_URN, "mymodule");
        String requestParm = "not null value";
        new ExpectedFailure<IllegalRequestParameterValue>((I18NMessage)Messages.ILLEGAL_REQ_PARM_VALUE, new Object[]{instanceURN.getValue(), "not null value"}){

            public void run() throws Exception {
                AsyncModuleTest.this.mManager.createDataFlow(new DataRequest[]{new DataRequest(CopierModuleFactory.INSTANCE_URN, (Object)"doesnt matter"), new DataRequest(instanceURN, (Object)"not null value")});
            }
        };
        List instances = this.mManager.getModuleInstances(SimpleAsyncProcessorFactory.PROVIDER_URN);
        Assert.assertTrue((String)instances.toString(), (boolean)instances.isEmpty());
    }

    @Test
    public void simpleFlowAndJMX() throws Exception {
        ModuleURN instanceURN = new ModuleURN(SimpleAsyncProcessorFactory.PROVIDER_URN, "mymodule");
        Object[] requestParm = new Object[]{BigDecimal.ONE, 2, "three"};
        CopierModule.SynchronousRequest req = new CopierModule.SynchronousRequest((Object)requestParm);
        req.semaphore.acquire();
        BlockingSinkDataListener listener = new BlockingSinkDataListener();
        this.mManager.addSinkListener((SinkDataListener)listener);
        DataFlowID flowID = this.mManager.createDataFlow(new DataRequest[]{new DataRequest(CopierModuleFactory.INSTANCE_URN, (Object)req), new DataRequest(instanceURN, null)});
        req.semaphore.acquire();
        for (Object o : requestParm) {
            Assert.assertEquals((Object)o, (Object)listener.getNextData());
        }
        DataFlowInfo flowInfo = this.mManager.getDataFlowInfo(flowID);
        AsyncModuleTest.assertFlowInfo((DataFlowInfo)flowInfo, (DataFlowID)flowID, (int)3, (boolean)true, (boolean)false, null, null);
        AsyncModuleTest.assertFlowStep((DataFlowStep)flowInfo.getFlowSteps()[1], (ModuleURN)instanceURN, (boolean)true, (int)3, (int)0, null, (boolean)true, (int)3, (int)0, null, (ModuleURN)instanceURN, null);
        AsyncModuleTest.assertFlowStep((DataFlowStep)flowInfo.getFlowSteps()[2], (ModuleURN)SinkModuleFactory.INSTANCE_URN, (boolean)false, (int)0, (int)0, null, (boolean)true, (int)3, (int)0, null, (ModuleURN)SinkModuleFactory.INSTANCE_URN, null);
        AsyncModuleTest.assertModuleInfo((ModuleManager)this.mManager, (ModuleURN)instanceURN, (ModuleState)ModuleState.STARTED, null, (DataFlowID[])new DataFlowID[]{flowID}, (boolean)true, (boolean)true, (boolean)true, (boolean)true, (boolean)false);
        final ObjectName on = instanceURN.toObjectName();
        final MBeanServer beanServer = AsyncModuleTest.getMBeanServer();
        Assert.assertTrue((boolean)beanServer.isRegistered(on));
        MBeanInfo beanInfo = beanServer.getMBeanInfo(on);
        Assert.assertEquals((Object)SimpleAsyncProcessor.class.getName(), (Object)beanInfo.getClassName());
        Assert.assertEquals((Object)org.marketcetera.modules.async.Messages.JMX_MXBEAN_DESCRIPTION.getText(), (Object)beanInfo.getDescription());
        Assert.assertEquals((long)0L, (long)beanInfo.getOperations().length);
        Assert.assertEquals((long)0L, (long)beanInfo.getConstructors().length);
        Assert.assertEquals((long)0L, (long)beanInfo.getNotifications().length);
        Assert.assertEquals((long)0L, (long)beanInfo.getDescriptor().getFieldNames().length);
        MBeanAttributeInfo[] attributeInfos = beanInfo.getAttributes();
        Assert.assertEquals((long)1L, (long)attributeInfos.length);
        final String validAttribute = "Flow" + flowID;
        Assert.assertEquals((Object)validAttribute, (Object)attributeInfos[0].getName());
        Assert.assertEquals((Object)Integer.class.getName(), (Object)attributeInfos[0].getType());
        Assert.assertEquals((Object)org.marketcetera.modules.async.Messages.JMX_ATTRIBUTE_FLOW_CNT_DESCRIPTION.getText((Object)flowID), (Object)attributeInfos[0].getDescription());
        Assert.assertEquals((long)0L, (long)attributeInfos[0].getDescriptor().getFieldNames().length);
        Assert.assertFalse((boolean)attributeInfos[0].isIs());
        Assert.assertFalse((boolean)attributeInfos[0].isWritable());
        Assert.assertTrue((boolean)attributeInfos[0].isReadable());
        Object value = beanServer.getAttribute(on, "Flow" + flowID);
        Assert.assertEquals((Object)0, (Object)((Integer)value));
        String invalidAttribute = "Flow1";
        new ExpectedFailure<AttributeNotFoundException>("Flow1"){

            protected void run() throws Exception {
                beanServer.getAttribute(on, "Flow1");
            }
        };
        new ExpectedFailure<AttributeNotFoundException>("blah"){

            protected void run() throws Exception {
                beanServer.getAttribute(on, "blah");
            }
        };
        AttributeList attribList = beanServer.getAttributes(on, new String[]{validAttribute, "Flow1"});
        Assert.assertEquals((long)1L, (long)attribList.size());
        Assert.assertEquals((Object)new Attribute(validAttribute, 0), attribList.get(0));
        new ExpectedFailure<AttributeNotFoundException>(){

            protected void run() throws Exception {
                beanServer.setAttribute(on, new Attribute(validAttribute, 34));
            }
        };
        new ExpectedFailure<AttributeNotFoundException>(){

            protected void run() throws Exception {
                beanServer.setAttribute(on, new Attribute("Flow1", 34));
            }
        };
        AttributeList list = new AttributeList(Arrays.asList(new Attribute(validAttribute, 12), new Attribute("Flow1", 13)));
        Assert.assertEquals((long)0L, (long)beanServer.setAttributes(on, list).size());
        ReflectionException excpt = (ReflectionException)new ExpectedFailure<ReflectionException>(){

            protected void run() throws Exception {
                beanServer.invoke(on, "getQueueSizes", null, null);
            }
        }.getException();
        Assert.assertTrue((String)excpt.toString(), (boolean)(excpt.getCause() instanceof NoSuchMethodException));
        this.mManager.cancel(flowID);
        List instances = this.mManager.getModuleInstances(SimpleAsyncProcessorFactory.PROVIDER_URN);
        Assert.assertTrue((String)instances.toString(), (boolean)instances.isEmpty());
        this.mManager.removeSinkListener((SinkDataListener)listener);
    }

    @Test
    public void noEmitFromOtherFlows() throws Exception {
        ModuleURN instanceURN = new ModuleURN(SimpleAsyncProcessorFactory.PROVIDER_URN, "mymodule");
        FlowSpecificListener listener = new FlowSpecificListener();
        this.mManager.addSinkListener((SinkDataListener)listener);
        DataFlowID emitOnlyflowID = this.mManager.createDataFlow(new DataRequest[]{new DataRequest(instanceURN)});
        Object[] reqParms1 = new Object[]{"firstOne", BigDecimal.TEN, "uno"};
        Object[] reqParms2 = new Object[]{"secondOne", BigDecimal.ZERO, "dos"};
        DataFlowID flowID1 = this.mManager.createDataFlow(new DataRequest[]{new DataRequest(CopierModuleFactory.INSTANCE_URN, (Object)reqParms1), new DataRequest(instanceURN)});
        DataFlowID flowID2 = this.mManager.createDataFlow(new DataRequest[]{new DataRequest(CopierModuleFactory.INSTANCE_URN, (Object)reqParms2), new DataRequest(instanceURN)});
        while (!listener.getFlows().contains(flowID2)) {
            Thread.sleep(100L);
        }
        for (Object o : reqParms2) {
            Assert.assertEquals((Object)o, (Object)listener.getNextDataFor(flowID2));
        }
        while (!listener.getFlows().contains(flowID1)) {
            Thread.sleep(100L);
        }
        for (Object o : reqParms1) {
            Assert.assertEquals((Object)o, (Object)listener.getNextDataFor(flowID1));
        }
        Assert.assertThat(listener.getFlows(), (Matcher)Matchers.not((Matcher)Matchers.hasItem((Object)emitOnlyflowID)));
        Assert.assertEquals((Object)0, (Object)AsyncModuleTest.getMBeanServer().getAttribute(instanceURN.toObjectName(), "Flow" + emitOnlyflowID));
        Assert.assertEquals((Object)0, (Object)AsyncModuleTest.getMBeanServer().getAttribute(instanceURN.toObjectName(), "Flow" + flowID1));
        Assert.assertEquals((Object)0, (Object)AsyncModuleTest.getMBeanServer().getAttribute(instanceURN.toObjectName(), "Flow" + flowID2));
        Assert.assertEquals(null, (Object)listener.getThreadNameFor(emitOnlyflowID));
        Assert.assertThat((Object)listener.getThreadNameFor(flowID1), (Matcher)Matchers.startsWith((String)("SimpleAsyncProc-" + instanceURN.instanceName())));
        Assert.assertThat((Object)listener.getThreadNameFor(flowID2), (Matcher)Matchers.startsWith((String)("SimpleAsyncProc-" + instanceURN.instanceName())));
        Assert.assertThat((Object)listener.getThreadNameFor(flowID1), (Matcher)Matchers.not((Matcher)Matchers.equalTo((Object)listener.getThreadNameFor(flowID2))));
        Assert.assertThat((Object)listener.getThreadNameFor(flowID1), (Matcher)Matchers.not((Matcher)Matchers.equalTo((Object)Thread.currentThread().getName())));
        this.mManager.cancel(emitOnlyflowID);
        this.mManager.cancel(flowID1);
        List<String> list = this.getAttributes(instanceURN);
        Assert.assertEquals((long)1L, (long)list.size());
        Assert.assertThat(list, (Matcher)Matchers.hasItem((Object)("Flow" + flowID2)));
        this.mManager.cancel(flowID2);
        List instances = this.mManager.getModuleInstances(SimpleAsyncProcessorFactory.PROVIDER_URN);
        Assert.assertTrue((String)instances.toString(), (boolean)instances.isEmpty());
        this.mManager.removeSinkListener((SinkDataListener)listener);
    }

    @Test(timeout=10000L)
    public void slowConsumer() throws Exception {
        DataFlowInfo flowInfo;
        ModuleURN instanceURN = new ModuleURN(SimpleAsyncProcessorFactory.PROVIDER_URN, "mymodule");
        Object[] data = new Object[]{"item1", "item2", "item3", "item4"};
        DataFlowID flowID = this.mManager.createDataFlow(new DataRequest[]{new DataRequest(CopierModuleFactory.INSTANCE_URN, (Object)data), new DataRequest(instanceURN), new DataRequest(BlockingModuleFactory.INSTANCE_URN)});
        do {
            Thread.sleep(100L);
        } while ((flowInfo = this.mManager.getDataFlowInfo(flowID)).getFlowSteps()[0].getNumEmitted() < 4L);
        for (int i = 0; i < data.length; ++i) {
            BlockingModuleFactory.getLastInstance().getSemaphore().acquire();
            flowInfo = this.mManager.getDataFlowInfo(flowID);
            AsyncModuleTest.assertFlowStep((DataFlowStep)flowInfo.getFlowSteps()[2], (ModuleURN)BlockingModuleFactory.INSTANCE_URN, (boolean)false, (int)0, (int)0, null, (boolean)true, (int)(i + 1), (int)0, null, (ModuleURN)BlockingModuleFactory.INSTANCE_URN, null);
            if (i < data.length - 1) {
                Assert.assertEquals((Object)(data.length - 1 - i), (Object)AsyncModuleTest.getMBeanServer().getAttribute(instanceURN.toObjectName(), "Flow" + flowID));
            }
            Assert.assertEquals((Object)data[i], (Object)BlockingModuleFactory.getLastInstance().getNextData());
        }
        Assert.assertEquals((Object)0, (Object)AsyncModuleTest.getMBeanServer().getAttribute(instanceURN.toObjectName(), "Flow" + flowID));
        this.mManager.cancel(flowID);
    }

    private List<String> getAttributes(ModuleURN inURN) throws Exception {
        MBeanInfo beanInfo = AsyncModuleTest.getMBeanServer().getMBeanInfo(inURN.toObjectName());
        ArrayList<String> attribs = new ArrayList<String>();
        for (MBeanAttributeInfo info : beanInfo.getAttributes()) {
            attribs.add(info.getName());
        }
        return attribs;
    }

    @Before
    public void setup() throws Exception {
        this.mManager = new ModuleManager();
        this.mManager.init();
    }

    @After
    public void cleanup() throws Exception {
        this.mManager.stop();
        this.mManager = null;
    }

    private static class FlowSpecificListener
    implements SinkDataListener {
        private final Map<DataFlowID, BlockingQueue<Object>> mFlowData = new ConcurrentHashMap<DataFlowID, BlockingQueue<Object>>();
        private final Map<DataFlowID, String> mThreadNames = new HashMap<DataFlowID, String>();

        private FlowSpecificListener() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void receivedData(DataFlowID inFlowID, Object inData) {
            BlockingQueue<Object> queue = this.mFlowData.get(inFlowID);
            if (queue == null) {
                FlowSpecificListener flowSpecificListener = this;
                synchronized (flowSpecificListener) {
                    queue = this.mFlowData.get(inFlowID);
                    if (queue == null) {
                        queue = new LinkedBlockingQueue<Object>();
                        this.mFlowData.put(inFlowID, queue);
                        this.mThreadNames.put(inFlowID, Thread.currentThread().getName());
                    }
                }
            }
            queue.add(inData);
        }

        public Object getNextDataFor(DataFlowID inFlowID) throws InterruptedException {
            BlockingQueue<Object> queue = this.mFlowData.get(inFlowID);
            return queue == null ? null : queue.take();
        }

        public int sizeFor(DataFlowID inFlowID) {
            BlockingQueue<Object> queue = this.mFlowData.get(inFlowID);
            return queue == null ? -1 : queue.size();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public String getThreadNameFor(DataFlowID inFlowID) {
            FlowSpecificListener flowSpecificListener = this;
            synchronized (flowSpecificListener) {
                return this.mThreadNames.get(inFlowID);
            }
        }

        public Set<DataFlowID> getFlows() {
            return this.mFlowData.keySet();
        }
    }
}

