/*
 * Decompiled with CFR 0.152.
 */
package org.smallmind.quorum.transport.message;

import java.security.SecureRandom;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import org.smallmind.quorum.transport.InvocationSignal;
import org.smallmind.quorum.transport.TransportException;
import org.smallmind.quorum.transport.message.AsynchronousTransmissionCallback;
import org.smallmind.quorum.transport.message.MessagePolicy;
import org.smallmind.quorum.transport.message.MessageProperty;
import org.smallmind.quorum.transport.message.MessageStrategy;
import org.smallmind.quorum.transport.message.QueueOperator;
import org.smallmind.quorum.transport.message.SelfDestructiveMap;
import org.smallmind.quorum.transport.message.SynchronousTransmissionCallback;
import org.smallmind.quorum.transport.message.TransmissionCallback;
import org.smallmind.quorum.transport.message.TransmissionListener;
import org.smallmind.quorum.transport.message.TransportManagedObjects;
import org.smallmind.scribe.pen.LoggerManager;

public class MessageTransmitter {
    private static final Random RANDOM = new SecureRandom();
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final MessageStrategy messageStrategy;
    private final LinkedBlockingQueue<QueueOperator> operatorQueue;
    private final SelfDestructiveMap<String, TransmissionCallback> callbackMap;
    private final TransmissionListener[] transmissionListeners;
    private final QueueConnection[] requestConnections;
    private final String instanceId = UUID.randomUUID().toString();

    public MessageTransmitter(TransportManagedObjects requestManagedObjects, TransportManagedObjects responseManagedObjects, MessagePolicy messagePolicy, MessageStrategy messageStrategy, int clusterSize, int concurrencyLimit, int timeoutSeconds) throws JMSException, TransportException {
        int index;
        this.messageStrategy = messageStrategy;
        this.callbackMap = new SelfDestructiveMap(timeoutSeconds);
        this.requestConnections = new QueueConnection[clusterSize];
        for (index = 0; index < this.requestConnections.length; ++index) {
            this.requestConnections[index] = (QueueConnection)requestManagedObjects.createConnection();
        }
        int requestIndex = RANDOM.nextInt(this.requestConnections.length);
        this.operatorQueue = new LinkedBlockingQueue();
        for (index = 0; index < Math.max(clusterSize, concurrencyLimit); ++index) {
            this.operatorQueue.add(new QueueOperator(this.requestConnections[requestIndex], (Queue)requestManagedObjects.getDestination(), messagePolicy));
            if (++requestIndex != this.requestConnections.length) continue;
            requestIndex = 0;
        }
        this.transmissionListeners = new TransmissionListener[clusterSize];
        for (index = 0; index < this.transmissionListeners.length; ++index) {
            this.transmissionListeners[index] = new TransmissionListener(this, (TopicConnection)responseManagedObjects.createConnection(), (Topic)responseManagedObjects.getDestination(), messagePolicy.getAcknowledgeMode());
        }
    }

    public String getInstanceId() {
        return this.instanceId;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public TransmissionCallback sendMessage(InvocationSignal invocationSignal, String serviceSelector) throws Exception {
        QueueOperator queueOperator;
        do {
            queueOperator = this.operatorQueue.poll(1L, TimeUnit.SECONDS);
        } while (!this.closed.get() && queueOperator == null);
        if (queueOperator == null) {
            throw new TransportException("Message transmission has been closed", new Object[0]);
        }
        try {
            Message requestMessage = this.messageStrategy.wrapInMessage((Session)queueOperator.getRequestSession(), invocationSignal);
            requestMessage.setStringProperty(MessageProperty.INSTANCE.getKey(), this.instanceId);
            requestMessage.setStringProperty(MessageProperty.SERVICE.getKey(), serviceSelector);
            queueOperator.send(requestMessage);
            AsynchronousTransmissionCallback asynchronousCallback = new AsynchronousTransmissionCallback(this.messageStrategy);
            SynchronousTransmissionCallback previousCallback = (SynchronousTransmissionCallback)this.callbackMap.putIfAbsent(requestMessage.getJMSMessageID(), asynchronousCallback);
            if (previousCallback != null) {
                SynchronousTransmissionCallback synchronousTransmissionCallback = previousCallback;
                return synchronousTransmissionCallback;
            }
            AsynchronousTransmissionCallback asynchronousTransmissionCallback = asynchronousCallback;
            return asynchronousTransmissionCallback;
        }
        finally {
            this.operatorQueue.put(queueOperator);
        }
    }

    public void completeCallback(Message responseMessage) {
        try {
            String correlationId = responseMessage.getJMSCorrelationID();
            AsynchronousTransmissionCallback previousCallback = (AsynchronousTransmissionCallback)this.callbackMap.get(correlationId);
            if (previousCallback == null) {
                previousCallback = (AsynchronousTransmissionCallback)this.callbackMap.putIfAbsent(correlationId, new SynchronousTransmissionCallback(this.messageStrategy, responseMessage));
                if (previousCallback != null) {
                    previousCallback.setResponseMessage(responseMessage);
                }
            } else {
                previousCallback.setResponseMessage(responseMessage);
            }
        }
        catch (JMSException jmsException) {
            LoggerManager.getLogger(MessageTransmitter.class).error((Throwable)jmsException);
        }
    }

    public void close() throws JMSException, InterruptedException {
        if (this.closed.compareAndSet(false, true)) {
            for (QueueConnection queueConnection : this.requestConnections) {
                queueConnection.stop();
            }
            for (QueueOperator queueOperator : this.operatorQueue) {
                queueOperator.close();
            }
            for (QueueConnection queueConnection : this.requestConnections) {
                queueConnection.close();
            }
            for (TransmissionListener transmissionListener : this.transmissionListeners) {
                transmissionListener.close();
            }
            this.callbackMap.shutdown();
        }
    }
}

