/*
 * Decompiled with CFR 0.152.
 */
package org.smallmind.phalanx.wire.jms;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.Topic;
import org.smallmind.instrument.ChronometerInstrumentAndReturn;
import org.smallmind.instrument.InstrumentAndReturn;
import org.smallmind.instrument.InstrumentationManager;
import org.smallmind.instrument.MetricProperty;
import org.smallmind.instrument.config.MetricConfiguration;
import org.smallmind.instrument.config.MetricConfigurationProvider;
import org.smallmind.nutsnbolts.util.SnowflakeId;
import org.smallmind.phalanx.wire.AbstractRequestTransport;
import org.smallmind.phalanx.wire.Address;
import org.smallmind.phalanx.wire.ConversationType;
import org.smallmind.phalanx.wire.InvocationSignal;
import org.smallmind.phalanx.wire.MetricType;
import org.smallmind.phalanx.wire.SignalCodec;
import org.smallmind.phalanx.wire.TransportException;
import org.smallmind.phalanx.wire.VocalMode;
import org.smallmind.phalanx.wire.Voice;
import org.smallmind.phalanx.wire.WireContext;
import org.smallmind.phalanx.wire.WireProperty;
import org.smallmind.phalanx.wire.jms.ConnectionManager;
import org.smallmind.phalanx.wire.jms.MessageHandler;
import org.smallmind.phalanx.wire.jms.MessagePolicy;
import org.smallmind.phalanx.wire.jms.QueueOperator;
import org.smallmind.phalanx.wire.jms.ReconnectionPolicy;
import org.smallmind.phalanx.wire.jms.ResponseListener;
import org.smallmind.phalanx.wire.jms.RoutingFactories;
import org.smallmind.phalanx.wire.jms.TopicOperator;

public class JmsRequestTransport
extends AbstractRequestTransport
implements MetricConfigurationProvider {
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final MetricConfiguration metricConfiguration;
    private final SignalCodec signalCodec;
    private final LinkedBlockingQueue<MessageHandler> talkQueue;
    private final LinkedBlockingQueue<MessageHandler> whisperAndShoutQueue;
    private final ConnectionManager[] talkRequestConnectionManagers;
    private final ConnectionManager[] whisperAndShoutRequestConnectionManagers;
    private final ResponseListener[] responseListeners;
    private final String callerId = SnowflakeId.newInstance().generateDottedString();

    public JmsRequestTransport(MetricConfiguration metricConfiguration, RoutingFactories routingFactories, MessagePolicy messagePolicy, ReconnectionPolicy reconnectionPolicy, SignalCodec signalCodec, int clusterSize, int concurrencyLimit, int maximumMessageLength, int defaultTimeoutSeconds) throws IOException, JMSException, TransportException {
        super(defaultTimeoutSeconds);
        int index;
        int talkIndex = 0;
        int whisperIndex = 0;
        this.metricConfiguration = metricConfiguration;
        this.signalCodec = signalCodec;
        this.talkRequestConnectionManagers = new ConnectionManager[clusterSize];
        for (index = 0; index < this.talkRequestConnectionManagers.length; ++index) {
            this.talkRequestConnectionManagers[index] = new ConnectionManager(routingFactories.getRequestQueueFactory(), messagePolicy, reconnectionPolicy);
        }
        this.whisperAndShoutRequestConnectionManagers = new ConnectionManager[clusterSize];
        for (index = 0; index < this.whisperAndShoutRequestConnectionManagers.length; ++index) {
            this.whisperAndShoutRequestConnectionManagers[index] = new ConnectionManager(routingFactories.getRequestTopicFactory(), messagePolicy, reconnectionPolicy);
        }
        this.talkQueue = new LinkedBlockingQueue();
        for (index = 0; index < Math.max(clusterSize, concurrencyLimit); ++index) {
            this.talkQueue.add(new QueueOperator(this.talkRequestConnectionManagers[talkIndex], (Queue)routingFactories.getRequestQueueFactory().getDestination()));
            if (++talkIndex != this.talkRequestConnectionManagers.length) continue;
            talkIndex = 0;
        }
        this.whisperAndShoutQueue = new LinkedBlockingQueue();
        for (index = 0; index < Math.max(clusterSize, concurrencyLimit); ++index) {
            this.whisperAndShoutQueue.add(new TopicOperator(this.whisperAndShoutRequestConnectionManagers[whisperIndex], (Topic)routingFactories.getRequestTopicFactory().getDestination()));
            if (++whisperIndex != this.whisperAndShoutRequestConnectionManagers.length) continue;
            whisperIndex = 0;
        }
        this.responseListeners = new ResponseListener[clusterSize];
        for (index = 0; index < this.responseListeners.length; ++index) {
            this.responseListeners[index] = new ResponseListener(this, new ConnectionManager(routingFactories.getResponseTopicFactory(), messagePolicy, reconnectionPolicy), (Topic)routingFactories.getResponseTopicFactory().getDestination(), signalCodec, this.callerId, maximumMessageLength);
        }
    }

    @Override
    public String getCallerId() {
        return this.callerId;
    }

    public MetricConfiguration getMetricConfiguration() {
        return this.metricConfiguration;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Object transmit(Voice voice, Address address, Map<String, Object> arguments, WireContext ... contexts) throws Throwable {
        LinkedBlockingQueue<MessageHandler> messageQueue = voice.getMode().equals((Object)VocalMode.TALK) ? this.talkQueue : this.whisperAndShoutQueue;
        MessageHandler messageHandler = this.acquireMessageHandler(messageQueue);
        boolean inOnly = voice.getConversation().getConversationType().equals((Object)ConversationType.IN_ONLY);
        try {
            Message requestMessage = this.constructMessage(messageHandler, inOnly, (String)voice.getServiceGroup(), voice.getMode().equals((Object)VocalMode.WHISPER) ? (String)voice.getInstanceId() : null, address, arguments, contexts);
            messageHandler.send(requestMessage);
            String messageId = requestMessage.getJMSMessageID();
            Object object = this.acquireResult(this.signalCodec, address, voice, messageId, inOnly);
            return object;
        }
        finally {
            messageQueue.put(messageHandler);
        }
    }

    private MessageHandler acquireMessageHandler(final LinkedBlockingQueue<MessageHandler> messageHandlerQueue) throws Exception {
        return (MessageHandler)InstrumentationManager.execute((InstrumentAndReturn)new ChronometerInstrumentAndReturn<MessageHandler>((MetricConfigurationProvider)this, new MetricProperty[]{new MetricProperty("event", MetricType.ACQUIRE_REQUEST_DESTINATION.getDisplay())}){

            public MessageHandler withChronometer() throws TransportException, InterruptedException {
                MessageHandler messageHandler;
                do {
                    messageHandler = (MessageHandler)messageHandlerQueue.poll(1L, TimeUnit.SECONDS);
                } while (!JmsRequestTransport.this.closed.get() && messageHandler == null);
                if (messageHandler == null) {
                    throw new TransportException("Message transmission has been closed", new Object[0]);
                }
                return messageHandler;
            }
        });
    }

    private Message constructMessage(final MessageHandler messageHandler, final boolean inOnly, final String serviceGroup, final String instanceId, final Address address, final Map<String, Object> arguments, final WireContext ... contexts) throws Exception {
        return (Message)InstrumentationManager.execute((InstrumentAndReturn)new ChronometerInstrumentAndReturn<Message>((MetricConfigurationProvider)this, new MetricProperty[]{new MetricProperty("event", MetricType.CONSTRUCT_MESSAGE.getDisplay())}){

            public Message withChronometer() throws Exception {
                BytesMessage requestMessage = messageHandler.createMessage();
                requestMessage.writeBytes(JmsRequestTransport.this.signalCodec.encode(new InvocationSignal(inOnly, address, arguments, contexts)));
                if (!inOnly) {
                    requestMessage.setStringProperty(WireProperty.CALLER_ID.getKey(), JmsRequestTransport.this.callerId);
                }
                requestMessage.setStringProperty(WireProperty.CONTENT_TYPE.getKey(), JmsRequestTransport.this.signalCodec.getContentType());
                requestMessage.setLongProperty(WireProperty.CLOCK.getKey(), System.currentTimeMillis());
                requestMessage.setStringProperty(WireProperty.SERVICE_GROUP.getKey(), serviceGroup);
                if (instanceId != null) {
                    requestMessage.setStringProperty(WireProperty.INSTANCE_ID.getKey(), instanceId);
                }
                return requestMessage;
            }
        });
    }

    @Override
    public void close() throws JMSException, InterruptedException {
        if (this.closed.compareAndSet(false, true)) {
            for (ConnectionManager requestConnectionManager : this.whisperAndShoutRequestConnectionManagers) {
                requestConnectionManager.stop();
            }
            for (ConnectionManager requestConnectionManager : this.talkRequestConnectionManagers) {
                requestConnectionManager.stop();
            }
            for (ConnectionManager requestConnectionManager : this.whisperAndShoutRequestConnectionManagers) {
                requestConnectionManager.close();
            }
            for (ConnectionManager requestConnectionManager : this.talkRequestConnectionManagers) {
                requestConnectionManager.close();
            }
            for (ResponseListener responseListener : this.responseListeners) {
                responseListener.close();
            }
            this.getCallbackMap().shutdown();
        }
    }
}

