/*
 * Decompiled with CFR 0.152.
 */
package org.marketcetera.modules.async;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import javax.management.Attribute;
import javax.management.AttributeList;
import javax.management.AttributeNotFoundException;
import javax.management.DynamicMBean;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanInfo;
import javax.management.ReflectionException;
import org.marketcetera.module.DataEmitter;
import org.marketcetera.module.DataEmitterSupport;
import org.marketcetera.module.DataFlowID;
import org.marketcetera.module.DataReceiver;
import org.marketcetera.module.DataRequest;
import org.marketcetera.module.IllegalRequestParameterValue;
import org.marketcetera.module.Module;
import org.marketcetera.module.ModuleURN;
import org.marketcetera.module.RequestID;
import org.marketcetera.modules.async.Messages;
import org.marketcetera.util.log.SLF4JLoggerProxy;
import org.marketcetera.util.misc.ClassVersion;
import org.marketcetera.util.misc.NamedThreadFactory;

@ClassVersion(value="$Id$")
public class SimpleAsyncProcessor
extends Module
implements DataEmitter,
DataReceiver,
DynamicMBean {
    static final String ATTRIB_PREFIX = "Flow";
    static final String ASYNC_THREAD_NAME_PREFIX = "SimpleAsyncProc";
    private ExecutorService mService;
    private final Map<DataFlowID, DataFlowHandler> mFlows = new ConcurrentHashMap<DataFlowID, DataFlowHandler>();

    public void requestData(DataRequest inRequest, DataEmitterSupport inSupport) throws IllegalRequestParameterValue {
        Object obj = inRequest.getData();
        if (obj != null) {
            throw new IllegalRequestParameterValue(this.getURN(), obj);
        }
        DataFlowHandler handler = new DataFlowHandler(inSupport);
        handler.setFuture(this.mService.submit(handler));
        this.addFlow(inSupport, handler);
    }

    public void cancel(DataFlowID inFlowID, RequestID inRequestID) {
        Future<?> future = this.removeFlow(inFlowID);
        if (future != null) {
            future.cancel(true);
        }
    }

    public void receiveData(DataFlowID inFlowID, Object inData) {
        DataFlowHandler handler = this.getHandler(inFlowID);
        if (handler != null) {
            handler.receiveData(inData);
        } else {
            Messages.DATA_RECVD_UNKNOWN_FLOW.warn((Object)this, (Object)inFlowID);
        }
    }

    @Override
    public Object getAttribute(String attribute) throws AttributeNotFoundException {
        DataFlowID id;
        DataFlowHandler handler;
        String flowID;
        Integer value = null;
        if (attribute.startsWith(ATTRIB_PREFIX) && !(flowID = attribute.substring(ATTRIB_PREFIX.length())).isEmpty() && (handler = this.mFlows.get(id = new DataFlowID(flowID))) != null) {
            value = handler.getQueueSize();
        }
        if (value == null) {
            throw new AttributeNotFoundException(attribute);
        }
        return value;
    }

    @Override
    public void setAttribute(Attribute attribute) throws AttributeNotFoundException {
        throw new AttributeNotFoundException(Messages.MXBEAN_ATTRIB_NOT_WRITABLE.getText((Object)attribute.getName()));
    }

    @Override
    public AttributeList getAttributes(String[] attributes) {
        Map<DataFlowID, Integer> sizes = this.getQueueSizes();
        AttributeList list = new AttributeList();
        for (String attribute : attributes) {
            if (!attribute.startsWith(ATTRIB_PREFIX)) continue;
            Integer value = null;
            String flowID = attribute.substring(ATTRIB_PREFIX.length());
            if (!flowID.isEmpty()) {
                DataFlowID id = new DataFlowID(flowID);
                value = sizes.get(id);
            }
            if (value == null) continue;
            list.add(new Attribute(attribute, value));
        }
        return list;
    }

    @Override
    public AttributeList setAttributes(AttributeList attributes) {
        return new AttributeList();
    }

    @Override
    public Object invoke(String actionName, Object[] params, String[] signature) throws ReflectionException {
        throw new ReflectionException(new NoSuchMethodException(actionName));
    }

    @Override
    public MBeanInfo getMBeanInfo() {
        Map<DataFlowID, Integer> queueSizes = this.getQueueSizes();
        ArrayList<MBeanAttributeInfo> attribInfo = new ArrayList<MBeanAttributeInfo>(queueSizes.size());
        for (Map.Entry<DataFlowID, Integer> entry : queueSizes.entrySet()) {
            attribInfo.add(new MBeanAttributeInfo(ATTRIB_PREFIX + entry.getKey(), Integer.class.getName(), Messages.JMX_ATTRIBUTE_FLOW_CNT_DESCRIPTION.getText((Object)entry.getKey()), true, false, false));
        }
        return new MBeanInfo(this.getClass().getName(), Messages.JMX_MXBEAN_DESCRIPTION.getText(), attribInfo.toArray(new MBeanAttributeInfo[attribInfo.size()]), null, null, null);
    }

    protected SimpleAsyncProcessor(ModuleURN inURN) {
        super(inURN, true);
    }

    protected void preStart() {
        this.mService = Executors.newCachedThreadPool((ThreadFactory)new NamedThreadFactory(ASYNC_THREAD_NAME_PREFIX + "-" + this.getURN().instanceName()));
    }

    protected void preStop() {
        this.mService.shutdownNow();
    }

    private void addFlow(DataEmitterSupport inSupport, DataFlowHandler inFlowHandler) {
        this.mFlows.put(inSupport.getFlowID(), inFlowHandler);
    }

    private Future<?> removeFlow(DataFlowID inFlowID) {
        DataFlowHandler handler = this.mFlows.remove(inFlowID);
        return handler.getFuture();
    }

    private DataFlowHandler getHandler(DataFlowID inFlowID) {
        return this.mFlows.get(inFlowID);
    }

    private Map<DataFlowID, Integer> getQueueSizes() {
        HashMap<DataFlowID, Integer> sizes = new HashMap<DataFlowID, Integer>();
        for (Map.Entry<DataFlowID, DataFlowHandler> entry : this.mFlows.entrySet()) {
            sizes.put(entry.getKey(), entry.getValue().getQueueSize());
        }
        return sizes;
    }

    private static class DataFlowHandler
    implements Runnable {
        private final DataEmitterSupport mEmitterSupport;
        private final BlockingQueue<Object> mDataQueue = new LinkedBlockingQueue<Object>();
        private Future<?> mFuture;

        DataFlowHandler(DataEmitterSupport inEmitterSupport) {
            this.mEmitterSupport = inEmitterSupport;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    this.mEmitterSupport.send(this.mDataQueue.take());
                }
            }
            catch (InterruptedException e) {
                SLF4JLoggerProxy.debug((Object)this, (Throwable)e, (String)"Data publishing interrupted. Discarding {} undelivered items", (Object[])new Object[]{this.mDataQueue.size()});
                return;
            }
        }

        int getQueueSize() {
            return this.mDataQueue.size();
        }

        void receiveData(Object inData) {
            this.mDataQueue.add(inData);
        }

        Future<?> getFuture() {
            return this.mFuture;
        }

        void setFuture(Future<?> inFuture) {
            this.mFuture = inFuture;
        }
    }
}

