/*
 * Decompiled with CFR 0.152.
 */
package org.smallmind.quorum.transport.message;

import java.security.SecureRandom;
import java.util.HashMap;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import org.smallmind.quorum.transport.TransportException;
import org.smallmind.quorum.transport.message.MessagePolicy;
import org.smallmind.quorum.transport.message.MessageStrategy;
import org.smallmind.quorum.transport.message.MessageTarget;
import org.smallmind.quorum.transport.message.ReceptionListener;
import org.smallmind.quorum.transport.message.ReceptionWorker;
import org.smallmind.quorum.transport.message.TopicOperator;
import org.smallmind.quorum.transport.message.TransportManagedObjects;

public class MessageReceiver {
    private static final Random RANDOM = new SecureRandom();
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final ConcurrentLinkedQueue<TopicOperator> operatorQueue;
    private final ReceptionListener[] receptionListeners;
    private final ReceptionWorker[] receptionWorkers;
    private final TopicConnection[] responseConnections;

    public MessageReceiver(TransportManagedObjects requestManagedObjects, TransportManagedObjects responseManagedObjects, MessagePolicy messagePolicy, MessageStrategy messageStrategy, int clusterSize, int concurrencyLimit, MessageTarget ... messageTargets) throws JMSException, TransportException {
        int index;
        SynchronousQueue<Message> messageRendezvous = new SynchronousQueue<Message>(true);
        HashMap<String, MessageTarget> targetMap = new HashMap<String, MessageTarget>();
        for (MessageTarget messageTarget : messageTargets) {
            targetMap.put(messageTarget.getServiceInterface().getName(), messageTarget);
        }
        this.receptionListeners = new ReceptionListener[clusterSize];
        for (index = 0; index < this.receptionListeners.length; ++index) {
            this.receptionListeners[index] = new ReceptionListener((QueueConnection)requestManagedObjects.createConnection(), (Queue)requestManagedObjects.getDestination(), messagePolicy.getAcknowledgeMode(), messageRendezvous);
        }
        this.responseConnections = new TopicConnection[clusterSize];
        for (index = 0; index < this.responseConnections.length; ++index) {
            this.responseConnections[index] = (TopicConnection)responseManagedObjects.createConnection();
        }
        int topicIndex = RANDOM.nextInt(this.responseConnections.length);
        this.operatorQueue = new ConcurrentLinkedQueue();
        for (index = 0; index < Math.max(clusterSize, concurrencyLimit); ++index) {
            this.operatorQueue.add(new TopicOperator(this.responseConnections[topicIndex], (Topic)responseManagedObjects.getDestination(), messagePolicy));
            if (++topicIndex != this.responseConnections.length) continue;
            topicIndex = 0;
        }
        this.receptionWorkers = new ReceptionWorker[concurrencyLimit];
        for (index = 0; index < this.receptionWorkers.length; ++index) {
            this.receptionWorkers[index] = new ReceptionWorker(messageStrategy, targetMap, messageRendezvous, this.operatorQueue);
            new Thread(this.receptionWorkers[index]).start();
        }
    }

    public void close() throws JMSException, InterruptedException {
        if (this.closed.compareAndSet(false, true)) {
            for (ReceptionListener receptionListener : this.receptionListeners) {
                receptionListener.close();
            }
            for (ReceptionListener receptionListener : this.responseConnections) {
                receptionListener.stop();
            }
            for (TopicOperator topicOperator : this.operatorQueue) {
                topicOperator.close();
            }
            for (ReceptionListener receptionListener : this.responseConnections) {
                receptionListener.close();
            }
            for (ReceptionWorker receptionWorker : this.receptionWorkers) {
                receptionWorker.stop();
            }
        }
    }
}

