/*
 * Decompiled with CFR 0.152.
 */
package org.streampipes.messaging.jms;

import java.io.Serializable;
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.util.ByteSequence;
import org.streampipes.commons.exceptions.SpRuntimeException;
import org.streampipes.messaging.EventConsumer;
import org.streampipes.messaging.InternalEventProcessor;
import org.streampipes.messaging.jms.ActiveMQConnectionProvider;
import org.streampipes.model.grounding.JmsTransportProtocol;

public class ActiveMQConsumer
extends ActiveMQConnectionProvider
implements EventConsumer<JmsTransportProtocol>,
AutoCloseable,
Serializable {
    private Session session;
    private MessageConsumer consumer;
    private InternalEventProcessor<byte[]> eventProcessor;
    private Boolean connected = false;

    private void initListener() {
        try {
            this.consumer.setMessageListener(message -> {
                if (message instanceof BytesMessage) {
                    ByteSequence bs = ((ActiveMQBytesMessage)message).getContent();
                    this.eventProcessor.onEvent((Object)bs.getData());
                }
            });
        }
        catch (JMSException e) {
            e.printStackTrace();
        }
    }

    public void connect(JmsTransportProtocol protocolSettings, InternalEventProcessor<byte[]> eventProcessor) throws SpRuntimeException {
        String url = protocolSettings.getBrokerHostname() + ":" + protocolSettings.getPort();
        try {
            this.eventProcessor = eventProcessor;
            this.session = this.startJmsConnection(url).createSession(false, 1);
            this.consumer = this.session.createConsumer((Destination)this.session.createTopic(protocolSettings.getTopicDefinition().getActualTopicName()));
            this.initListener();
            this.connected = true;
        }
        catch (JMSException e) {
            throw new SpRuntimeException("could not connect to activemq broker");
        }
    }

    public void disconnect() throws SpRuntimeException {
        try {
            this.consumer.close();
            this.session.close();
            this.connected = false;
        }
        catch (JMSException e) {
            throw new SpRuntimeException("could not disconnect from activemq broker");
        }
    }

    public Boolean isConnected() {
        return this.connected;
    }

    @Override
    public void close() throws Exception {
        this.disconnect();
    }
}

