/*
 * Decompiled with CFR 0.152.
 */
package org.smallmind.phalanx.wire.jms;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.Topic;
import org.smallmind.instrument.ChronometerInstrumentAndReturn;
import org.smallmind.instrument.InstrumentAndReturn;
import org.smallmind.instrument.InstrumentationManager;
import org.smallmind.instrument.MetricProperty;
import org.smallmind.instrument.config.MetricConfiguration;
import org.smallmind.instrument.config.MetricConfigurationProvider;
import org.smallmind.nutsnbolts.time.Duration;
import org.smallmind.nutsnbolts.util.SelfDestructive;
import org.smallmind.nutsnbolts.util.SelfDestructiveMap;
import org.smallmind.nutsnbolts.util.SnowflakeId;
import org.smallmind.phalanx.wire.Address;
import org.smallmind.phalanx.wire.AsynchronousTransmissionCallback;
import org.smallmind.phalanx.wire.InvocationSignal;
import org.smallmind.phalanx.wire.MetricType;
import org.smallmind.phalanx.wire.RequestTransport;
import org.smallmind.phalanx.wire.ResultSignal;
import org.smallmind.phalanx.wire.SignalCodec;
import org.smallmind.phalanx.wire.SynchronousTransmissionCallback;
import org.smallmind.phalanx.wire.TransmissionCallback;
import org.smallmind.phalanx.wire.TransportException;
import org.smallmind.phalanx.wire.WireContext;
import org.smallmind.phalanx.wire.WireProperty;
import org.smallmind.phalanx.wire.jms.ConnectionManager;
import org.smallmind.phalanx.wire.jms.MessageHandler;
import org.smallmind.phalanx.wire.jms.MessagePolicy;
import org.smallmind.phalanx.wire.jms.QueueOperator;
import org.smallmind.phalanx.wire.jms.ReconnectionPolicy;
import org.smallmind.phalanx.wire.jms.ResponseListener;
import org.smallmind.phalanx.wire.jms.RoutingFactories;
import org.smallmind.phalanx.wire.jms.TopicOperator;

public class JmsRequestTransport
implements MetricConfigurationProvider,
RequestTransport {
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final MetricConfiguration metricConfiguration;
    private final SignalCodec signalCodec;
    private final SelfDestructiveMap<String, TransmissionCallback> callbackMap;
    private final LinkedBlockingQueue<MessageHandler> talkQueue;
    private final LinkedBlockingQueue<MessageHandler> whisperQueue;
    private final ConnectionManager[] talkRequestConnectionManagers;
    private final ConnectionManager[] whisperRequestConnectionManagers;
    private final ResponseListener[] responseListeners;
    private final String callerId = SnowflakeId.newInstance().generateDottedString();

    public JmsRequestTransport(MetricConfiguration metricConfiguration, RoutingFactories routingFactories, MessagePolicy messagePolicy, ReconnectionPolicy reconnectionPolicy, SignalCodec signalCodec, int clusterSize, int concurrencyLimit, int maximumMessageLength, int timeoutSeconds) throws IOException, JMSException, TransportException {
        int index;
        int talkIndex = 0;
        int whisperIndex = 0;
        this.metricConfiguration = metricConfiguration;
        this.signalCodec = signalCodec;
        this.callbackMap = new SelfDestructiveMap(new Duration((long)timeoutSeconds, TimeUnit.SECONDS));
        this.talkRequestConnectionManagers = new ConnectionManager[clusterSize];
        for (index = 0; index < this.talkRequestConnectionManagers.length; ++index) {
            this.talkRequestConnectionManagers[index] = new ConnectionManager(routingFactories.getRequestQueueFactory(), messagePolicy, reconnectionPolicy);
        }
        this.whisperRequestConnectionManagers = new ConnectionManager[clusterSize];
        for (index = 0; index < this.whisperRequestConnectionManagers.length; ++index) {
            this.whisperRequestConnectionManagers[index] = new ConnectionManager(routingFactories.getRequestTopicFactory(), messagePolicy, reconnectionPolicy);
        }
        this.talkQueue = new LinkedBlockingQueue();
        for (index = 0; index < Math.max(clusterSize, concurrencyLimit); ++index) {
            this.talkQueue.add(new QueueOperator(this.talkRequestConnectionManagers[talkIndex], (Queue)routingFactories.getRequestQueueFactory().getDestination()));
            if (++talkIndex != this.talkRequestConnectionManagers.length) continue;
            talkIndex = 0;
        }
        this.whisperQueue = new LinkedBlockingQueue();
        for (index = 0; index < Math.max(clusterSize, concurrencyLimit); ++index) {
            this.whisperQueue.add(new TopicOperator(this.whisperRequestConnectionManagers[whisperIndex], (Topic)routingFactories.getRequestTopicFactory().getDestination()));
            if (++whisperIndex != this.whisperRequestConnectionManagers.length) continue;
            whisperIndex = 0;
        }
        this.responseListeners = new ResponseListener[clusterSize];
        for (index = 0; index < this.responseListeners.length; ++index) {
            this.responseListeners[index] = new ResponseListener(this, new ConnectionManager(routingFactories.getResponseTopicFactory(), messagePolicy, reconnectionPolicy), (Topic)routingFactories.getResponseTopicFactory().getDestination(), signalCodec, this.callerId, maximumMessageLength);
        }
    }

    @Override
    public String getCallerId() {
        return this.callerId;
    }

    public MetricConfiguration getMetricConfiguration() {
        return this.metricConfiguration;
    }

    @Override
    public void transmitInOnly(String serviceGroup, String instanceId, Address address, Map<String, Object> arguments, WireContext ... contexts) throws Exception {
        this.transmit(true, serviceGroup, instanceId, address, arguments, contexts);
    }

    @Override
    public Object transmitInOut(String serviceGroup, String instanceId, Address address, Map<String, Object> arguments, WireContext ... contexts) throws Throwable {
        TransmissionCallback transmissionCallback = this.transmit(false, serviceGroup, instanceId, address, arguments, contexts);
        if (transmissionCallback != null) {
            return transmissionCallback.getResult(this.signalCodec);
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private TransmissionCallback transmit(boolean inOnly, String serviceGroup, String instanceId, Address address, Map<String, Object> arguments, WireContext ... contexts) throws Exception {
        LinkedBlockingQueue<MessageHandler> messageQueue = instanceId == null ? this.talkQueue : this.whisperQueue;
        MessageHandler messageHandler = this.acquireMessageHandler(messageQueue);
        try {
            Message requestMessage = this.constructMessage(messageHandler, inOnly, serviceGroup, instanceId, address, arguments, contexts);
            messageHandler.send(requestMessage);
            if (!inOnly) {
                AsynchronousTransmissionCallback asynchronousCallback = new AsynchronousTransmissionCallback(address.getService(), address.getFunction().getName());
                SynchronousTransmissionCallback previousCallback = (SynchronousTransmissionCallback)this.callbackMap.putIfAbsent((Comparable)((Object)requestMessage.getJMSMessageID()), (SelfDestructive)asynchronousCallback);
                if (previousCallback != null) {
                    SynchronousTransmissionCallback synchronousTransmissionCallback = previousCallback;
                    return synchronousTransmissionCallback;
                }
                AsynchronousTransmissionCallback asynchronousTransmissionCallback = asynchronousCallback;
                return asynchronousTransmissionCallback;
            }
            TransmissionCallback transmissionCallback = null;
            return transmissionCallback;
        }
        finally {
            messageQueue.put(messageHandler);
        }
    }

    private MessageHandler acquireMessageHandler(final LinkedBlockingQueue<MessageHandler> messageHandlerQueue) throws Exception {
        return (MessageHandler)InstrumentationManager.execute((InstrumentAndReturn)new ChronometerInstrumentAndReturn<MessageHandler>((MetricConfigurationProvider)this, new MetricProperty[]{new MetricProperty("event", MetricType.ACQUIRE_REQUEST_DESTINATION.getDisplay())}){

            public MessageHandler withChronometer() throws TransportException, InterruptedException {
                MessageHandler messageHandler;
                do {
                    messageHandler = (MessageHandler)messageHandlerQueue.poll(1L, TimeUnit.SECONDS);
                } while (!JmsRequestTransport.this.closed.get() && messageHandler == null);
                if (messageHandler == null) {
                    throw new TransportException("Message transmission has been closed", new Object[0]);
                }
                return messageHandler;
            }
        });
    }

    private Message constructMessage(final MessageHandler messageHandler, final boolean inOnly, final String serviceGroup, final String instanceId, final Address address, final Map<String, Object> arguments, final WireContext ... contexts) throws Exception {
        return (Message)InstrumentationManager.execute((InstrumentAndReturn)new ChronometerInstrumentAndReturn<Message>((MetricConfigurationProvider)this, new MetricProperty[]{new MetricProperty("event", MetricType.CONSTRUCT_MESSAGE.getDisplay())}){

            public Message withChronometer() throws Exception {
                BytesMessage requestMessage = messageHandler.createMessage();
                requestMessage.writeBytes(JmsRequestTransport.this.signalCodec.encode(new InvocationSignal(inOnly, address, arguments, contexts)));
                if (!inOnly) {
                    requestMessage.setStringProperty(WireProperty.CALLER_ID.getKey(), JmsRequestTransport.this.callerId);
                }
                requestMessage.setStringProperty(WireProperty.CONTENT_TYPE.getKey(), JmsRequestTransport.this.signalCodec.getContentType());
                requestMessage.setLongProperty(WireProperty.CLOCK.getKey(), System.currentTimeMillis());
                requestMessage.setStringProperty(WireProperty.SERVICE_GROUP.getKey(), serviceGroup);
                if (instanceId != null) {
                    requestMessage.setStringProperty(WireProperty.INSTANCE_ID.getKey(), instanceId);
                }
                return requestMessage;
            }
        });
    }

    public void completeCallback(String correlationId, ResultSignal resultSignal) {
        TransmissionCallback previousCallback = (TransmissionCallback)this.callbackMap.get((Comparable)((Object)correlationId));
        if (previousCallback == null) {
            previousCallback = (TransmissionCallback)this.callbackMap.putIfAbsent((Comparable)((Object)correlationId), (SelfDestructive)new SynchronousTransmissionCallback(resultSignal));
            if (previousCallback != null && previousCallback instanceof AsynchronousTransmissionCallback) {
                ((AsynchronousTransmissionCallback)previousCallback).setResultSignal(resultSignal);
            }
        } else if (previousCallback instanceof AsynchronousTransmissionCallback) {
            ((AsynchronousTransmissionCallback)previousCallback).setResultSignal(resultSignal);
        }
    }

    @Override
    public void close() throws JMSException, InterruptedException {
        if (this.closed.compareAndSet(false, true)) {
            for (ConnectionManager requestConnectionManager : this.whisperRequestConnectionManagers) {
                requestConnectionManager.stop();
            }
            for (ConnectionManager requestConnectionManager : this.talkRequestConnectionManagers) {
                requestConnectionManager.stop();
            }
            for (ConnectionManager requestConnectionManager : this.whisperRequestConnectionManagers) {
                requestConnectionManager.close();
            }
            for (ConnectionManager requestConnectionManager : this.talkRequestConnectionManagers) {
                requestConnectionManager.close();
            }
            for (ResponseListener responseListener : this.responseListeners) {
                responseListener.close();
            }
            this.callbackMap.shutdown();
        }
    }
}

