/*
 * Decompiled with CFR 0.152.
 */
package org.smallmind.phalanx.wire.jms;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Topic;
import org.smallmind.instrument.ChronometerInstrumentAndReturn;
import org.smallmind.instrument.InstrumentAndReturn;
import org.smallmind.instrument.InstrumentationManager;
import org.smallmind.instrument.MetricProperty;
import org.smallmind.instrument.config.MetricConfiguration;
import org.smallmind.instrument.config.MetricConfigurationProvider;
import org.smallmind.nutsnbolts.util.SnowflakeId;
import org.smallmind.phalanx.wire.MetricInteraction;
import org.smallmind.phalanx.wire.ResponseTransport;
import org.smallmind.phalanx.wire.ResultSignal;
import org.smallmind.phalanx.wire.ServiceDefinitionException;
import org.smallmind.phalanx.wire.SignalCodec;
import org.smallmind.phalanx.wire.TransportException;
import org.smallmind.phalanx.wire.TransportState;
import org.smallmind.phalanx.wire.WireInvocationCircuit;
import org.smallmind.phalanx.wire.WireProperty;
import org.smallmind.phalanx.wire.WiredService;
import org.smallmind.phalanx.wire.jms.ConnectionManager;
import org.smallmind.phalanx.wire.jms.InvocationWorker;
import org.smallmind.phalanx.wire.jms.MessagePolicy;
import org.smallmind.phalanx.wire.jms.ReconnectionPolicy;
import org.smallmind.phalanx.wire.jms.RequestListener;
import org.smallmind.phalanx.wire.jms.RoutingFactories;
import org.smallmind.phalanx.wire.jms.TopicOperator;
import org.smallmind.phalanx.worker.WorkManager;
import org.smallmind.phalanx.worker.WorkQueue;
import org.smallmind.phalanx.worker.WorkerFactory;

public class JmsResponseTransport
extends WorkManager<InvocationWorker, Message>
implements MetricConfigurationProvider,
WorkerFactory<InvocationWorker, Message>,
ResponseTransport {
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final AtomicReference<TransportState> transportStateRef = new AtomicReference<TransportState>(TransportState.PLAYING);
    private final WireInvocationCircuit invocationCircuit = new WireInvocationCircuit();
    private final SignalCodec signalCodec;
    private final ConcurrentLinkedQueue<TopicOperator> responseQueue;
    private final RequestListener[] shoutRequestListeners;
    private final RequestListener[] talkRequestListeners;
    private final RequestListener[] whisperRequestListeners;
    private final ConnectionManager[] responseConnectionManagers;
    private final String instanceId = SnowflakeId.newInstance().generateDottedString();
    private final int maximumMessageLength;

    public JmsResponseTransport(MetricConfiguration metricConfiguration, RoutingFactories routingFactories, MessagePolicy messagePolicy, ReconnectionPolicy reconnectionPolicy, SignalCodec signalCodec, String serviceGroup, int clusterSize, int concurrencyLimit, int maximumMessageLength) throws InterruptedException, JMSException, TransportException {
        super(metricConfiguration, InvocationWorker.class, concurrencyLimit);
        int index;
        int topicIndex = 0;
        this.signalCodec = signalCodec;
        this.maximumMessageLength = maximumMessageLength;
        this.shoutRequestListeners = new RequestListener[clusterSize];
        for (index = 0; index < this.shoutRequestListeners.length; ++index) {
            this.shoutRequestListeners[index] = new RequestListener(this, new ConnectionManager(routingFactories.getRequestTopicFactory(), messagePolicy, reconnectionPolicy), routingFactories.getRequestTopicFactory().getDestination(), serviceGroup, null);
        }
        this.talkRequestListeners = new RequestListener[clusterSize];
        for (index = 0; index < this.talkRequestListeners.length; ++index) {
            this.talkRequestListeners[index] = new RequestListener(this, new ConnectionManager(routingFactories.getRequestQueueFactory(), messagePolicy, reconnectionPolicy), routingFactories.getRequestQueueFactory().getDestination(), serviceGroup, null);
        }
        this.whisperRequestListeners = new RequestListener[clusterSize];
        for (index = 0; index < this.whisperRequestListeners.length; ++index) {
            this.whisperRequestListeners[index] = new RequestListener(this, new ConnectionManager(routingFactories.getRequestTopicFactory(), messagePolicy, reconnectionPolicy), routingFactories.getRequestTopicFactory().getDestination(), serviceGroup, this.instanceId);
        }
        this.responseConnectionManagers = new ConnectionManager[clusterSize];
        for (index = 0; index < this.responseConnectionManagers.length; ++index) {
            this.responseConnectionManagers[index] = new ConnectionManager(routingFactories.getResponseTopicFactory(), messagePolicy, reconnectionPolicy);
        }
        this.responseQueue = new ConcurrentLinkedQueue();
        for (index = 0; index < Math.max(clusterSize, concurrencyLimit); ++index) {
            this.responseQueue.add(new TopicOperator(this.responseConnectionManagers[topicIndex], (Topic)routingFactories.getResponseTopicFactory().getDestination()));
            if (++topicIndex != this.responseConnectionManagers.length) continue;
            topicIndex = 0;
        }
        this.startUp(this);
    }

    @Override
    public String getInstanceId() {
        return this.instanceId;
    }

    @Override
    public String register(Class<?> serviceInterface, WiredService targetService) throws NoSuchMethodException, ServiceDefinitionException {
        this.invocationCircuit.register(serviceInterface, targetService);
        return this.instanceId;
    }

    @Override
    public InvocationWorker createWorker(MetricConfiguration metricConfiguration, WorkQueue<Message> workQueue) {
        return new InvocationWorker(metricConfiguration, workQueue, this, this.invocationCircuit, this.signalCodec, this.maximumMessageLength);
    }

    @Override
    public TransportState getState() {
        return this.transportStateRef.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void play() throws JMSException {
        AtomicReference<TransportState> atomicReference = this.transportStateRef;
        synchronized (atomicReference) {
            if (this.transportStateRef.compareAndSet(TransportState.PAUSED, TransportState.PLAYING)) {
                for (RequestListener requestListener : this.shoutRequestListeners) {
                    requestListener.play();
                }
                for (RequestListener requestListener : this.talkRequestListeners) {
                    requestListener.play();
                }
                for (RequestListener requestListener : this.whisperRequestListeners) {
                    requestListener.play();
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void pause() throws JMSException {
        AtomicReference<TransportState> atomicReference = this.transportStateRef;
        synchronized (atomicReference) {
            if (this.transportStateRef.compareAndSet(TransportState.PLAYING, TransportState.PAUSED)) {
                for (RequestListener requestListener : this.shoutRequestListeners) {
                    requestListener.pause();
                }
                for (RequestListener requestListener : this.talkRequestListeners) {
                    requestListener.pause();
                }
                for (RequestListener requestListener : this.whisperRequestListeners) {
                    requestListener.pause();
                }
            }
        }
    }

    @Override
    public void transmit(String callerId, String correlationId, boolean error, String nativeType, Object result) throws Throwable {
        TopicOperator topicOperator = this.responseQueue.poll();
        if (topicOperator == null) {
            throw new TransportException("Unable to take a TopicOperator, which should never happen - please contact your system administrator", new Object[0]);
        }
        topicOperator.send(this.constructMessage(callerId, correlationId, topicOperator, new ResultSignal(error, nativeType, result)));
        this.responseQueue.add(topicOperator);
    }

    private Message constructMessage(final String callerId, final String correlationId, final TopicOperator topicOperator, final ResultSignal resultSignal) throws Throwable {
        return (Message)InstrumentationManager.execute((InstrumentAndReturn)new ChronometerInstrumentAndReturn<Message>((MetricConfigurationProvider)this, new MetricProperty[]{new MetricProperty("event", MetricInteraction.CONSTRUCT_MESSAGE.getDisplay())}){

            public Message withChronometer() throws Exception {
                BytesMessage responseMessage = topicOperator.createMessage();
                responseMessage.writeBytes(JmsResponseTransport.this.signalCodec.encode(resultSignal));
                responseMessage.setJMSCorrelationID(correlationId);
                responseMessage.setStringProperty(WireProperty.CALLER_ID.getKey(), callerId);
                responseMessage.setStringProperty(WireProperty.CONTENT_TYPE.getKey(), JmsResponseTransport.this.signalCodec.getContentType());
                responseMessage.setLongProperty(WireProperty.CLOCK.getKey(), System.currentTimeMillis());
                return responseMessage;
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() throws JMSException, InterruptedException {
        if (this.closed.compareAndSet(false, true)) {
            AtomicReference<TransportState> atomicReference = this.transportStateRef;
            synchronized (atomicReference) {
                this.transportStateRef.set(TransportState.CLOSED);
                for (RequestListener requestListener : this.shoutRequestListeners) {
                    requestListener.close();
                }
                for (RequestListener requestListener : this.talkRequestListeners) {
                    requestListener.close();
                }
                for (RequestListener requestListener : this.whisperRequestListeners) {
                    requestListener.close();
                }
                for (ConnectionManager connectionManager : this.responseConnectionManagers) {
                    connectionManager.stop();
                }
                for (ConnectionManager connectionManager : this.responseConnectionManagers) {
                    connectionManager.close();
                }
                this.shutDown();
            }
        }
    }
}

