/*
 * Decompiled with CFR 0.152.
 */
package org.duracloud.common.changenotifier;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.duracloud.common.changenotifier.MessageListener;
import org.duracloud.common.changenotifier.SubscriptionManager;
import org.duracloud.common.error.DuraCloudRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RabbitmqSubscriptionManager
implements SubscriptionManager {
    private Logger log = LoggerFactory.getLogger(RabbitmqSubscriptionManager.class);
    private Channel mqChannel;
    private String mqHost;
    private Integer mqPort;
    private String mqVhost;
    private String queueName;
    private String queueUrl;
    private String mqUsername;
    private String mqPassword;
    private String exchangeName;
    private String consumerName;
    private boolean initialized = false;
    private List<MessageListener> messageListeners = new ArrayList<MessageListener>();

    public RabbitmqSubscriptionManager(String host, Integer port, String vhost, String exchange, String username, String password, String queueName) {
        this.mqHost = host;
        this.mqPort = port;
        this.mqVhost = vhost;
        this.exchangeName = exchange;
        this.mqUsername = username;
        this.mqPassword = password;
        this.queueName = queueName;
        this.consumerName = "consumer-" + queueName;
    }

    @Override
    public void addListener(MessageListener listener) {
        this.messageListeners.add(listener);
    }

    @Override
    public synchronized void connect() {
        if (this.initialized) {
            throw new DuraCloudRuntimeException("this manager is already connected");
        }
        try {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setUsername(this.mqUsername);
            factory.setPassword(this.mqPassword);
            factory.setVirtualHost(this.mqVhost);
            factory.setHost(this.mqHost);
            factory.setPort(this.mqPort);
            Connection conn = factory.newConnection();
            this.mqChannel = conn.createChannel();
            this.queueUrl = "(RabbitMQ) " + conn.getAddress();
            this.mqChannel.queueDeclare(this.queueName, true, false, false, null);
            this.mqChannel.queueBind(this.queueName, this.exchangeName, this.queueName);
            this.log.info("Subscribing consumer {} to queue {} on vhost {} at URL {}", this.consumerName, this.queueName, this.mqVhost, this.queueUrl);
            this.startConsumer();
        }
        catch (Exception ex) {
            this.initialized = false;
            this.log.error("failed to estabilish connection to RabbitMQ with queue name {} and URL {} because {}", this.queueName, this.queueUrl, ex.getMessage());
            throw new DuraCloudRuntimeException(ex);
        }
    }

    private void startConsumer() {
        try {
            this.mqChannel.basicConsume(this.queueName, false, this.consumerName, (Consumer)new DefaultConsumer(this.mqChannel){

                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    long deliveryTag = envelope.getDeliveryTag();
                    String message = new String(body);
                    RabbitmqSubscriptionManager.this.dispatch(message);
                    RabbitmqSubscriptionManager.this.log.debug("{} dispatched", (Object)message);
                    RabbitmqSubscriptionManager.this.mqChannel.basicAck(deliveryTag, false);
                    RabbitmqSubscriptionManager.this.log.debug("{} deleted", (Object)message);
                }

                @Override
                public void handleConsumeOk(String consumerTag) {
                    RabbitmqSubscriptionManager.this.log.info("Consumer registered: {}", (Object)consumerTag);
                    RabbitmqSubscriptionManager.this.initialized = true;
                }

                @Override
                public void handleCancel(String consumerTag) {
                    RabbitmqSubscriptionManager.this.log.warn("Consumer has been cancelled unexpectedly: " + consumerTag);
                }

                @Override
                public void handleCancelOk(String consumerTag) {
                    RabbitmqSubscriptionManager.this.log.info("Consumer has been cancelled successfully: " + consumerTag);
                }

                @Override
                public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
                    RabbitmqSubscriptionManager.this.log.warn("Either the channel or the underlying connection has been shut down for consumer {} because {}", (Object)consumerTag, (Object)sig.getReason().toString());
                    RabbitmqSubscriptionManager.this.initialized = false;
                }
            });
        }
        catch (Exception e) {
            this.log.error("Consumer failed to subscribe: " + e.getMessage(), e);
            this.initialized = false;
        }
    }

    private void dispatch(String message) {
        this.log.debug("Dispatching message {}", (Object)message);
        for (MessageListener listener : this.messageListeners) {
            try {
                listener.onMessage(message);
            }
            catch (Exception ex) {
                this.log.error("Failed to dispatch message " + message + " to " + listener + "due to " + ex.getMessage(), ex);
            }
        }
    }

    private void cancelConsumer() {
        try {
            this.mqChannel.basicCancel(this.consumerName);
            this.log.info("Unsubscripbed consumer {}", (Object)this.consumerName);
        }
        catch (IOException e) {
            this.log.error("Error unsubscribing consumer {}", (Object)this.consumerName, (Object)e);
        }
    }

    private void deleteQueue() {
        try {
            this.mqChannel.queueDelete(this.queueName);
            this.log.info("Deleted queue {}", (Object)this.queueName);
        }
        catch (IOException e) {
            this.log.error("Error deleting queue {}", (Object)this.queueName, (Object)e);
        }
    }

    @Override
    public void disconnect() {
        if (!this.initialized) {
            throw new DuraCloudRuntimeException("This manager is already disconnected");
        }
        this.log.info("Disconnecting");
        this.log.info("Unsubscribing {}", (Object)this.consumerName);
        this.cancelConsumer();
        this.log.info("Deleting queue {}", (Object)this.queueName);
        this.deleteQueue();
        this.initialized = false;
        this.log.info("Disconnection complete");
    }
}

