/*
 * Decompiled with CFR 0.152.
 */
package org.smallmind.quorum.transport.message;

import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import org.smallmind.quorum.transport.message.AcknowledgeMode;
import org.smallmind.quorum.transport.message.MessageProperty;
import org.smallmind.quorum.transport.message.MessageTransmitter;

public class TransmissionListener
implements MessageListener {
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final MessageTransmitter messageTransmitter;
    private final TopicConnection responseConnection;
    private final TopicSession responseSession;
    private final TopicSubscriber responseSubscriber;

    public TransmissionListener(MessageTransmitter messageTransmitter, TopicConnection responseConnection, Topic responseTopic, AcknowledgeMode acknowledgeMode) throws JMSException {
        this.messageTransmitter = messageTransmitter;
        this.responseConnection = responseConnection;
        this.responseSession = responseConnection.createTopicSession(false, acknowledgeMode.getJmsValue());
        this.responseSubscriber = this.responseSession.createSubscriber(responseTopic, MessageProperty.INSTANCE.getKey() + "='" + messageTransmitter.getInstanceId() + "'", false);
        this.responseSubscriber.setMessageListener((MessageListener)this);
        responseConnection.start();
    }

    public void close() throws JMSException {
        if (this.closed.compareAndSet(false, true)) {
            this.responseConnection.stop();
            this.responseSubscriber.close();
            this.responseSession.close();
            this.responseConnection.close();
        }
    }

    public void onMessage(Message message) {
        this.messageTransmitter.completeCallback(message);
    }
}

