/*
 * Decompiled with CFR 0.152.
 */
package org.streampipes.messaging.jms;

import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.streampipes.commons.exceptions.SpRuntimeException;
import org.streampipes.messaging.EventProducer;
import org.streampipes.model.grounding.JmsTransportProtocol;
import org.streampipes.model.grounding.SimpleTopicDefinition;
import org.streampipes.model.grounding.TopicDefinition;

public class ActiveMQPublisher
implements EventProducer<JmsTransportProtocol> {
    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQPublisher.class);
    private Connection connection;
    private Session session;
    private MessageProducer producer;
    private Boolean connected;

    public ActiveMQPublisher() {
    }

    public ActiveMQPublisher(String url, String topic) {
        JmsTransportProtocol protocol = new JmsTransportProtocol();
        protocol.setBrokerHostname(url.substring(0, url.lastIndexOf(":")));
        protocol.setPort(Integer.parseInt(url.substring(url.lastIndexOf(":") + 1, url.length())));
        protocol.setTopicDefinition((TopicDefinition)new SimpleTopicDefinition(topic));
        try {
            this.connect(protocol);
        }
        catch (SpRuntimeException e) {
            e.printStackTrace();
        }
    }

    public void sendText(String message) throws JMSException {
        this.publish(message.getBytes());
    }

    public void connect(JmsTransportProtocol protocolSettings) throws SpRuntimeException {
        String url = protocolSettings.getBrokerHostname() + ":" + protocolSettings.getPort();
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        boolean co = false;
        do {
            try {
                Thread.sleep(2000L);
                this.connection = connectionFactory.createConnection();
                co = true;
            }
            catch (JMSException e) {
                LOG.error("Trying to connect...", (Throwable)e);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        } while (!co);
        try {
            this.session = this.connection.createSession(false, 1);
            this.producer = this.session.createProducer((Destination)this.session.createTopic(protocolSettings.getTopicDefinition().getActualTopicName()));
            this.producer.setDeliveryMode(1);
            this.connection.start();
            this.connected = true;
        }
        catch (JMSException e) {
            throw new SpRuntimeException("could not connect to activemq broker");
        }
    }

    public void publish(byte[] event) {
        try {
            BytesMessage message = this.session.createBytesMessage();
            message.writeBytes(event);
            this.producer.send((Message)message);
        }
        catch (JMSException e) {
            e.printStackTrace();
        }
    }

    public void disconnect() throws SpRuntimeException {
        try {
            this.producer.close();
            this.session.close();
            this.connection.close();
            this.connected = false;
        }
        catch (JMSException e) {
            throw new SpRuntimeException("could not disconnect from activemq broker");
        }
    }

    public Boolean isConnected() {
        return this.connected;
    }
}

