/*
 * Decompiled with CFR 0.152.
 */
package org.hawkular.bus.common;

import com.google.common.util.concurrent.ListenableFuture;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.lang.reflect.Method;
import java.util.Map;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.BlobMessage;
import org.hawkular.bus.common.BasicMessage;
import org.hawkular.bus.common.BasicMessageWithExtraData;
import org.hawkular.bus.common.BinaryData;
import org.hawkular.bus.common.ConnectionContext;
import org.hawkular.bus.common.MessageId;
import org.hawkular.bus.common.consumer.AbstractBasicMessageListener;
import org.hawkular.bus.common.consumer.BasicMessageListener;
import org.hawkular.bus.common.consumer.ConsumerConnectionContext;
import org.hawkular.bus.common.consumer.FutureBasicMessageListener;
import org.hawkular.bus.common.consumer.RPCConnectionContext;
import org.hawkular.bus.common.producer.ProducerConnectionContext;
import org.jboss.logging.Logger;

public class MessageProcessor {
    private final Logger log = Logger.getLogger(MessageProcessor.class);
    public static final String HEADER_BASIC_MESSAGE_CLASS = "org.hawkular.bus.common.BasicMessage.className";

    public <T extends BasicMessage> void listen(ConsumerConnectionContext context, AbstractBasicMessageListener<T> listener) throws JMSException {
        if (context == null) {
            throw new NullPointerException("context must not be null");
        }
        if (listener == null) {
            throw new NullPointerException("listener must not be null");
        }
        MessageConsumer consumer = context.getMessageConsumer();
        if (consumer == null) {
            throw new NullPointerException("context had a null consumer");
        }
        listener.setConsumerConnectionContext(context);
        consumer.setMessageListener(listener);
    }

    public MessageId send(ProducerConnectionContext context, BasicMessage basicMessage) throws JMSException {
        return this.send(context, basicMessage, null);
    }

    public MessageId send(ProducerConnectionContext context, BasicMessage basicMessage, Map<String, String> headers) throws JMSException {
        MessageProducer producer;
        if (context == null) {
            throw new IllegalArgumentException("context must not be null");
        }
        if (basicMessage == null) {
            throw new IllegalArgumentException("message must not be null");
        }
        Message msg = this.createMessage(context, basicMessage, headers);
        if (basicMessage.getCorrelationId() != null) {
            msg.setJMSCorrelationID(basicMessage.getCorrelationId().toString());
        }
        if (basicMessage.getMessageId() != null) {
            this.log.debugf("Non-null message ID [%s] will be ignored and a new one generated", (Object)basicMessage.getMessageId());
            basicMessage.setMessageId(null);
        }
        if ((producer = context.getMessageProducer()) == null) {
            throw new IllegalStateException("context had a null producer");
        }
        producer.send(msg);
        MessageId messageId = new MessageId(msg.getJMSMessageID());
        basicMessage.setMessageId(messageId);
        return messageId;
    }

    public MessageId sendWithBinaryData(ProducerConnectionContext context, BasicMessage basicMessage, InputStream inputStream) throws JMSException {
        return this.sendWithBinaryData(context, basicMessage, inputStream, null);
    }

    public MessageId sendWithBinaryData(ProducerConnectionContext context, BasicMessage basicMessage, File file) throws JMSException, FileNotFoundException {
        return this.sendWithBinaryData(context, basicMessage, new FileInputStream(file), null);
    }

    public MessageId sendWithBinaryData(ProducerConnectionContext context, BasicMessage basicMessage, File file, Map<String, String> headers) throws JMSException, FileNotFoundException {
        return this.sendWithBinaryData(context, basicMessage, new FileInputStream(file), headers);
    }

    public <T extends BasicMessage> MessageId send(ProducerConnectionContext context, BasicMessageWithExtraData<T> message, Map<String, String> headers) throws JMSException {
        if (message.getBinaryData() == null) {
            return this.send(context, (BasicMessage)message.getBasicMessage(), headers);
        }
        return this.sendWithBinaryData(context, (BasicMessage)message.getBasicMessage(), message.getBinaryData(), headers);
    }

    public MessageId sendWithBinaryData(ProducerConnectionContext context, BasicMessage basicMessage, InputStream inputStream, Map<String, String> headers) throws JMSException {
        MessageProducer producer;
        if (context == null) {
            throw new IllegalArgumentException("context must not be null");
        }
        if (basicMessage == null) {
            throw new IllegalArgumentException("message must not be null");
        }
        if (inputStream == null) {
            throw new IllegalArgumentException("binary data must not be null");
        }
        Message msg = this.createMessageWithBinaryData(context, basicMessage, inputStream, headers);
        if (basicMessage.getCorrelationId() != null) {
            msg.setJMSCorrelationID(basicMessage.getCorrelationId().toString());
        }
        if (basicMessage.getMessageId() != null) {
            this.log.debugf("Non-null message ID [%s] will be ignored and a new one generated", (Object)basicMessage.getMessageId());
            basicMessage.setMessageId(null);
        }
        if ((producer = context.getMessageProducer()) == null) {
            throw new IllegalStateException("context had a null producer");
        }
        producer.send(msg);
        MessageId messageId = new MessageId(msg.getJMSMessageID());
        basicMessage.setMessageId(messageId);
        return messageId;
    }

    public <T extends BasicMessage> RPCConnectionContext sendAndListen(ProducerConnectionContext context, BasicMessage basicMessage, BasicMessageListener<T> responseListener) throws JMSException {
        return this.sendAndListen(context, basicMessage, responseListener, null);
    }

    public <T extends BasicMessage> RPCConnectionContext sendAndListen(ProducerConnectionContext context, BasicMessage basicMessage, BasicMessageListener<T> responseListener, Map<String, String> headers) throws JMSException {
        MessageProducer producer;
        if (context == null) {
            throw new IllegalArgumentException("context must not be null");
        }
        if (basicMessage == null) {
            throw new IllegalArgumentException("message must not be null");
        }
        if (responseListener == null) {
            throw new IllegalArgumentException("response listener must not be null");
        }
        Message msg = this.createMessage(context, basicMessage, headers);
        if (basicMessage.getCorrelationId() != null) {
            msg.setJMSCorrelationID(basicMessage.getCorrelationId().toString());
        }
        if (basicMessage.getMessageId() != null) {
            this.log.debugf("Non-null message ID [%s] will be ignored and a new one generated", (Object)basicMessage.getMessageId());
            basicMessage.setMessageId(null);
        }
        if ((producer = context.getMessageProducer()) == null) {
            throw new NullPointerException("Cannot send request-response message - the producer is null");
        }
        Session session = context.getSession();
        if (session == null) {
            throw new NullPointerException("Cannot send request-response message - the session is null");
        }
        TemporaryQueue responseQueue = session.createTemporaryQueue();
        MessageConsumer responseConsumer = session.createConsumer((Destination)responseQueue);
        RPCConnectionContext rpcContext = new RPCConnectionContext();
        rpcContext.copy(context);
        rpcContext.setDestination((Destination)responseQueue);
        rpcContext.setMessageConsumer(responseConsumer);
        rpcContext.setRequestMessage(msg);
        rpcContext.setResponseListener(responseListener);
        responseListener.setConsumerConnectionContext(rpcContext);
        responseConsumer.setMessageListener(responseListener);
        msg.setJMSReplyTo((Destination)responseQueue);
        producer.send(msg);
        MessageId messageId = new MessageId(msg.getJMSMessageID());
        basicMessage.setMessageId(messageId);
        return rpcContext;
    }

    public <R extends BasicMessage> ListenableFuture<BasicMessageWithExtraData<R>> sendRPC(ProducerConnectionContext context, BasicMessage basicMessage, Class<R> expectedResponseMessageClass) throws JMSException {
        return this.sendRPC(context, basicMessage, expectedResponseMessageClass, null);
    }

    public <R extends BasicMessage> ListenableFuture<BasicMessageWithExtraData<R>> sendRPC(ProducerConnectionContext context, BasicMessage basicMessage, Class<R> expectedResponseMessageClass, Map<String, String> headers) throws JMSException {
        FutureBasicMessageListener<R> futureListener = new FutureBasicMessageListener<R>(expectedResponseMessageClass);
        this.sendAndListen(context, basicMessage, futureListener, headers);
        return futureListener;
    }

    protected Message createMessage(ConnectionContext context, BasicMessage basicMessage) throws JMSException {
        return this.createMessage(context, basicMessage, null);
    }

    protected Message createMessage(ConnectionContext context, BasicMessage basicMessage, Map<String, String> headers) throws JMSException {
        if (context == null) {
            throw new IllegalArgumentException("The context is null");
        }
        if (basicMessage == null) {
            throw new IllegalArgumentException("The message is null");
        }
        Session session = context.getSession();
        if (session == null) {
            throw new IllegalArgumentException("The context had a null session");
        }
        TextMessage msg = session.createTextMessage(basicMessage.toJSON());
        this.setHeaders(basicMessage, headers, (Message)msg);
        return msg;
    }

    protected void setHeaders(BasicMessage basicMessage, Map<String, String> headers, Message destination) throws JMSException {
        destination.setStringProperty(HEADER_BASIC_MESSAGE_CLASS, basicMessage.getClass().getName());
        Map<String, String> basicMessageHeaders = basicMessage.getHeaders();
        if (basicMessageHeaders != null) {
            for (Map.Entry<String, String> entry : basicMessageHeaders.entrySet()) {
                destination.setStringProperty(entry.getKey(), entry.getValue());
            }
        }
        if (headers != null) {
            for (Map.Entry<String, String> entry : headers.entrySet()) {
                destination.setStringProperty(entry.getKey(), entry.getValue());
            }
        }
    }

    protected Message createMessageWithBinaryData(ConnectionContext context, BasicMessage basicMessage, InputStream inputStream) throws JMSException {
        return this.createMessageWithBinaryData(context, basicMessage, inputStream, null);
    }

    protected Message createMessageWithBinaryData(ConnectionContext context, BasicMessage basicMessage, InputStream inputStream, Map<String, String> headers) throws JMSException {
        if (context == null) {
            throw new IllegalArgumentException("The context is null");
        }
        if (basicMessage == null) {
            throw new IllegalArgumentException("The message is null");
        }
        if (inputStream == null) {
            throw new IllegalArgumentException("The binary data is null");
        }
        Session session = context.getSession();
        if (session == null) {
            throw new IllegalArgumentException("The context had a null session");
        }
        BinaryData messagePlusBinaryData = new BinaryData(basicMessage.toJSON().getBytes(), inputStream);
        BlobMessage msg = this.getActiveMQSession(session).createBlobMessage((InputStream)messagePlusBinaryData);
        this.setHeaders(basicMessage, headers, (Message)msg);
        return msg;
    }

    protected ActiveMQSession getActiveMQSession(Session session) {
        if (session instanceof ActiveMQSession) {
            return (ActiveMQSession)session;
        }
        try {
            Method m = session.getClass().getDeclaredMethod("getSession", new Class[0]);
            m.setAccessible(true);
            return (ActiveMQSession)m.invoke((Object)session, new Object[0]);
        }
        catch (Exception e) {
            throw new IllegalStateException("Not running with ActiveMQ", e);
        }
    }
}

