/*
 * Decompiled with CFR 0.152.
 */
package org.smallmind.quorum.transport.message.gossip;

import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Topic;
import org.smallmind.instrument.ChronometerInstrumentAndReturn;
import org.smallmind.instrument.InstrumentAndReturn;
import org.smallmind.instrument.InstrumentationManager;
import org.smallmind.instrument.MetricProperty;
import org.smallmind.instrument.config.MetricConfigurationProvider;
import org.smallmind.quorum.transport.InvocationSignal;
import org.smallmind.quorum.transport.TransportException;
import org.smallmind.quorum.transport.TransportManager;
import org.smallmind.quorum.transport.instrument.MetricInteraction;
import org.smallmind.quorum.transport.message.ConnectionFactor;
import org.smallmind.quorum.transport.message.MessagePolicy;
import org.smallmind.quorum.transport.message.MessageProperty;
import org.smallmind.quorum.transport.message.MessageStrategy;
import org.smallmind.quorum.transport.message.ReconnectionPolicy;
import org.smallmind.quorum.transport.message.TopicOperator;
import org.smallmind.quorum.transport.message.TransportManagedObjects;

public class GossipTransmitter {
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final MessageStrategy messageStrategy;
    private final LinkedBlockingQueue<TopicOperator> operatorQueue;
    private final ConnectionFactor[] gossipConnectionFactors;

    public GossipTransmitter(TransportManagedObjects gossipManagedObjects, MessagePolicy messagePolicy, ReconnectionPolicy reconnectionPolicy, MessageStrategy messageStrategy, int clusterSize, int concurrencyLimit) throws IOException, JMSException, TransportException {
        int gossipIndex = 0;
        this.messageStrategy = messageStrategy;
        this.gossipConnectionFactors = new ConnectionFactor[clusterSize];
        int index = 0;
        while (index < this.gossipConnectionFactors.length) {
            this.gossipConnectionFactors[index] = new ConnectionFactor(gossipManagedObjects, messagePolicy, reconnectionPolicy);
            ++index;
        }
        this.operatorQueue = new LinkedBlockingQueue();
        index = 0;
        while (index < Math.max(clusterSize, concurrencyLimit)) {
            this.operatorQueue.add(new TopicOperator(this.gossipConnectionFactors[gossipIndex], (Topic)gossipManagedObjects.getDestination()));
            if (++gossipIndex == this.gossipConnectionFactors.length) {
                gossipIndex = 0;
            }
            ++index;
        }
    }

    public void sendMessage(final InvocationSignal invocationSignal, final String serviceSelector) throws Exception {
        final TopicOperator topicOperator = (TopicOperator)InstrumentationManager.execute((InstrumentAndReturn)new ChronometerInstrumentAndReturn<TopicOperator>((MetricConfigurationProvider)TransportManager.getTransport(), new MetricProperty[]{new MetricProperty("gossip", "true"), new MetricProperty("event", MetricInteraction.ACQUIRE_TOPIC.getDisplay())}){

            public TopicOperator withChronometer() throws TransportException, InterruptedException {
                TopicOperator topicOperator;
                do {
                    topicOperator = (TopicOperator)GossipTransmitter.this.operatorQueue.poll(1L, TimeUnit.SECONDS);
                } while (!GossipTransmitter.this.closed.get() && topicOperator == null);
                if (topicOperator == null) {
                    throw new TransportException("Message transmission has been closed", new Object[0]);
                }
                return topicOperator;
            }
        });
        try {
            Message gossipMessage = (Message)InstrumentationManager.execute((InstrumentAndReturn)new ChronometerInstrumentAndReturn<Message>((MetricConfigurationProvider)TransportManager.getTransport(), new MetricProperty[]{new MetricProperty("gossip", "true"), new MetricProperty("event", MetricInteraction.CONSTRUCT_MESSAGE.getDisplay())}){

                public Message withChronometer() throws Exception {
                    Message gossipMessage = GossipTransmitter.this.messageStrategy.wrapInMessage(topicOperator.getTopicSession(), invocationSignal);
                    gossipMessage.setStringProperty(MessageProperty.SERVICE.getKey(), serviceSelector);
                    gossipMessage.setLongProperty(MessageProperty.CLOCK.getKey(), System.currentTimeMillis());
                    return gossipMessage;
                }
            });
            topicOperator.publish(gossipMessage);
        }
        finally {
            this.operatorQueue.put(topicOperator);
        }
    }

    public void close() throws JMSException, InterruptedException {
        if (this.closed.compareAndSet(false, true)) {
            ConnectionFactor gossipConnectionFactor;
            ConnectionFactor[] connectionFactorArray = this.gossipConnectionFactors;
            int n = this.gossipConnectionFactors.length;
            int n2 = 0;
            while (n2 < n) {
                gossipConnectionFactor = connectionFactorArray[n2];
                gossipConnectionFactor.stop();
                ++n2;
            }
            connectionFactorArray = this.gossipConnectionFactors;
            n = this.gossipConnectionFactors.length;
            n2 = 0;
            while (n2 < n) {
                gossipConnectionFactor = connectionFactorArray[n2];
                gossipConnectionFactor.close();
                ++n2;
            }
        }
    }
}

