/*
 * Decompiled with CFR 0.152.
 */
package org.smallmind.quorum.transport.message;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.Topic;
import org.smallmind.instrument.ChronometerInstrumentAndReturn;
import org.smallmind.instrument.InstrumentAndReturn;
import org.smallmind.instrument.InstrumentationManager;
import org.smallmind.instrument.MetricProperty;
import org.smallmind.instrument.config.MetricConfigurationProvider;
import org.smallmind.nutsnbolts.ntp.NTPTime;
import org.smallmind.nutsnbolts.util.SelfDestructive;
import org.smallmind.nutsnbolts.util.SelfDestructiveMap;
import org.smallmind.quorum.transport.InvocationSignal;
import org.smallmind.quorum.transport.TransportException;
import org.smallmind.quorum.transport.TransportManager;
import org.smallmind.quorum.transport.instrument.MetricEvent;
import org.smallmind.quorum.transport.message.AsynchronousTransmissionCallback;
import org.smallmind.quorum.transport.message.ConnectionFactor;
import org.smallmind.quorum.transport.message.MessagePolicy;
import org.smallmind.quorum.transport.message.MessageProperty;
import org.smallmind.quorum.transport.message.MessageStrategy;
import org.smallmind.quorum.transport.message.QueueOperator;
import org.smallmind.quorum.transport.message.ReconnectionPolicy;
import org.smallmind.quorum.transport.message.SynchronousTransmissionCallback;
import org.smallmind.quorum.transport.message.TransmissionCallback;
import org.smallmind.quorum.transport.message.TransmissionListener;
import org.smallmind.quorum.transport.message.TransportManagedObjects;
import org.smallmind.scribe.pen.LoggerManager;

public class MessageTransmitter {
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final MessageStrategy messageStrategy;
    private final LinkedBlockingQueue<QueueOperator> operatorQueue;
    private final SelfDestructiveMap<String, TransmissionCallback> callbackMap;
    private final TransmissionListener[] transmissionListeners;
    private final ConnectionFactor[] requestConnectionFactors;
    private final String instanceId = UUID.randomUUID().toString();
    private final long ntpOffset;
    private final long timeoutSeconds;

    public MessageTransmitter(TransportManagedObjects requestManagedObjects, TransportManagedObjects responseManagedObjects, MessagePolicy messagePolicy, ReconnectionPolicy reconnectionPolicy, MessageStrategy messageStrategy, NTPTime ntpTime, int clusterSize, int concurrencyLimit, int timeoutSeconds) throws IOException, JMSException, TransportException {
        int requestIndex = 0;
        this.messageStrategy = messageStrategy;
        this.timeoutSeconds = timeoutSeconds;
        this.ntpOffset = ntpTime == null ? 0L : ntpTime.getOffset(10000);
        this.callbackMap = new SelfDestructiveMap(timeoutSeconds);
        this.requestConnectionFactors = new ConnectionFactor[clusterSize];
        int index = 0;
        while (index < this.requestConnectionFactors.length) {
            this.requestConnectionFactors[index] = new ConnectionFactor(requestManagedObjects, messagePolicy, reconnectionPolicy);
            ++index;
        }
        this.operatorQueue = new LinkedBlockingQueue();
        index = 0;
        while (index < Math.max(clusterSize, concurrencyLimit)) {
            this.operatorQueue.add(new QueueOperator(this.requestConnectionFactors[requestIndex], (Queue)requestManagedObjects.getDestination()));
            if (++requestIndex == this.requestConnectionFactors.length) {
                requestIndex = 0;
            }
            ++index;
        }
        this.transmissionListeners = new TransmissionListener[clusterSize];
        index = 0;
        while (index < this.transmissionListeners.length) {
            this.transmissionListeners[index] = new TransmissionListener(this, new ConnectionFactor(responseManagedObjects, messagePolicy, reconnectionPolicy), (Topic)responseManagedObjects.getDestination(), this.ntpOffset);
            ++index;
        }
    }

    public String getInstanceId() {
        return this.instanceId;
    }

    public TransmissionCallback sendMessage(final InvocationSignal invocationSignal, final String serviceSelector) throws Exception {
        final QueueOperator queueOperator = (QueueOperator)InstrumentationManager.execute((InstrumentAndReturn)new ChronometerInstrumentAndReturn<QueueOperator>((MetricConfigurationProvider)TransportManager.getTransport(), new MetricProperty[]{new MetricProperty("event", MetricEvent.ACQUIRE_QUEUE.getDisplay())}){

            public QueueOperator withChronometer() throws TransportException, InterruptedException {
                QueueOperator queueOperator;
                do {
                    queueOperator = (QueueOperator)MessageTransmitter.this.operatorQueue.poll(1L, TimeUnit.SECONDS);
                } while (!MessageTransmitter.this.closed.get() && queueOperator == null);
                if (queueOperator == null) {
                    throw new TransportException("Message transmission has been closed", new Object[0]);
                }
                return queueOperator;
            }
        });
        try {
            Message requestMessage = (Message)InstrumentationManager.execute((InstrumentAndReturn)new ChronometerInstrumentAndReturn<Message>((MetricConfigurationProvider)TransportManager.getTransport(), new MetricProperty[]{new MetricProperty("event", MetricEvent.CONSTRUCT_MESSAGE.getDisplay())}){

                public Message withChronometer() throws Exception {
                    Message requestMessage = MessageTransmitter.this.messageStrategy.wrapInMessage(queueOperator.getRequestSession(), invocationSignal);
                    requestMessage.setStringProperty(MessageProperty.INSTANCE.getKey(), MessageTransmitter.this.instanceId);
                    requestMessage.setStringProperty(MessageProperty.SERVICE.getKey(), serviceSelector);
                    requestMessage.setLongProperty(MessageProperty.TIME.getKey(), System.currentTimeMillis() + MessageTransmitter.this.ntpOffset);
                    return requestMessage;
                }
            });
            queueOperator.send(requestMessage);
            AsynchronousTransmissionCallback asynchronousCallback = new AsynchronousTransmissionCallback(this.messageStrategy, this.timeoutSeconds);
            SynchronousTransmissionCallback previousCallback = (SynchronousTransmissionCallback)this.callbackMap.putIfAbsent((Comparable)((Object)requestMessage.getJMSMessageID()), (SelfDestructive)asynchronousCallback);
            if (previousCallback != null) {
                SynchronousTransmissionCallback synchronousTransmissionCallback = previousCallback;
                return synchronousTransmissionCallback;
            }
            AsynchronousTransmissionCallback asynchronousTransmissionCallback = asynchronousCallback;
            return asynchronousTransmissionCallback;
        }
        finally {
            this.operatorQueue.put(queueOperator);
        }
    }

    public void completeCallback(Message responseMessage) {
        try {
            String correlationId = responseMessage.getJMSCorrelationID();
            TransmissionCallback previousCallback = (TransmissionCallback)this.callbackMap.get((Comparable)((Object)correlationId));
            if (previousCallback == null) {
                previousCallback = (TransmissionCallback)this.callbackMap.putIfAbsent((Comparable)((Object)correlationId), (SelfDestructive)new SynchronousTransmissionCallback(this.messageStrategy, responseMessage));
                if (previousCallback != null && previousCallback instanceof AsynchronousTransmissionCallback) {
                    ((AsynchronousTransmissionCallback)previousCallback).setResponseMessage(responseMessage);
                }
            } else if (previousCallback instanceof AsynchronousTransmissionCallback) {
                ((AsynchronousTransmissionCallback)previousCallback).setResponseMessage(responseMessage);
            }
        }
        catch (JMSException jmsException) {
            LoggerManager.getLogger(MessageTransmitter.class).error((Throwable)jmsException);
        }
    }

    public void close() throws JMSException, InterruptedException {
        if (this.closed.compareAndSet(false, true)) {
            ConnectionFactor requestConnectionFactor;
            Object[] objectArray = this.requestConnectionFactors;
            int n = this.requestConnectionFactors.length;
            int n2 = 0;
            while (n2 < n) {
                requestConnectionFactor = objectArray[n2];
                requestConnectionFactor.stop();
                ++n2;
            }
            objectArray = this.requestConnectionFactors;
            n = this.requestConnectionFactors.length;
            n2 = 0;
            while (n2 < n) {
                requestConnectionFactor = objectArray[n2];
                requestConnectionFactor.close();
                ++n2;
            }
            objectArray = this.transmissionListeners;
            n = this.transmissionListeners.length;
            n2 = 0;
            while (n2 < n) {
                Object transmissionListener = objectArray[n2];
                ((TransmissionListener)transmissionListener).close();
                ++n2;
            }
            this.callbackMap.shutdown();
        }
    }
}

