/*
 * Decompiled with CFR 0.152.
 */
package org.atmosphere.plugin.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.MessageProperties;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.net.URI;
import org.atmosphere.cpr.AtmosphereConfig;
import org.atmosphere.cpr.Broadcaster;
import org.atmosphere.cpr.BroadcasterFuture;
import org.atmosphere.cpr.Deliver;
import org.atmosphere.plugin.rabbitmq.RabbitMQConnectionFactory;
import org.atmosphere.util.SimpleBroadcaster;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RabbitMQBroadcaster
extends SimpleBroadcaster
implements ShutdownListener {
    private static final Logger logger = LoggerFactory.getLogger(RabbitMQBroadcaster.class);
    private String queueName;
    private String consumerTag;
    private RabbitMQConnectionFactory factory;
    private Channel channel;
    private String exchangeName;

    public Broadcaster initialize(String id, AtmosphereConfig config) {
        super.initialize(id, config);
        this.init(config);
        return this;
    }

    public void init(AtmosphereConfig config) {
        this.factory = RabbitMQConnectionFactory.getFactory(config);
        this.channel = this.factory.channel();
        this.channel.addShutdownListener((ShutdownListener)this);
        this.exchangeName = this.factory.exchangeName();
        this.restartConsumer();
    }

    public Broadcaster initialize(String name, URI uri, AtmosphereConfig config) {
        super.initialize(name, uri, config);
        this.init(config);
        return this;
    }

    public void setID(String id) {
        super.setID(id);
        this.restartConsumer();
    }

    public String getID() {
        String id = super.getID();
        if (id.startsWith("/*")) {
            id = "atmosphere";
        }
        return id;
    }

    protected void push(Deliver entry) {
        if (this.destroyed.get()) {
            return;
        }
        this.outgoingBroadcast(entry.getMessage());
    }

    public void outgoingBroadcast(Object message) {
        try {
            String id = this.getID();
            logger.trace("Outgoing broadcast : {}", message);
            this.channel.basicPublish(this.exchangeName, id, MessageProperties.PERSISTENT_TEXT_PLAIN, message.toString().getBytes());
        }
        catch (IOException e) {
            logger.warn("Failed to send message over RabbitMQ", (Throwable)e);
        }
    }

    void restartConsumer() {
        try {
            final String id = this.getID();
            if (this.consumerTag != null) {
                logger.debug("Delete consumer {}", (Object)this.consumerTag);
                this.channel.basicCancel(this.consumerTag);
                this.consumerTag = null;
            }
            if (this.queueName != null) {
                logger.debug("Delete queue {}", (Object)this.queueName);
                this.channel.queueUnbind(this.queueName, this.exchangeName, id);
                this.channel.queueDelete(this.queueName);
                this.queueName = null;
            }
            this.queueName = this.channel.queueDeclare().getQueue();
            this.channel.queueBind(this.queueName, this.exchangeName, id);
            logger.info("Create AMQP consumer on queue {}, for routing key {}", (Object)this.queueName, (Object)id);
            DefaultConsumer queueConsumer = new DefaultConsumer(this.channel){

                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    if (!envelope.getRoutingKey().equalsIgnoreCase(id)) {
                        return;
                    }
                    String message = new String(body);
                    try {
                        Object newMsg = RabbitMQBroadcaster.this.filter(message);
                        if (newMsg != null) {
                            RabbitMQBroadcaster.this.deliverPush(new Deliver(newMsg, new BroadcasterFuture(newMsg), (Object)message), true);
                        }
                    }
                    catch (Throwable t) {
                        logger.error("failed to push message: " + message, t);
                    }
                }
            };
            this.consumerTag = this.channel.basicConsume(this.queueName, true, (Consumer)queueConsumer);
            logger.info("Consumer " + this.consumerTag + " for queue {}, on routing key {}", (Object)this.queueName, (Object)id);
        }
        catch (Throwable ex) {
            String msg = "Unable to initialize RabbitMQBroadcaster";
            logger.error(msg, ex);
            throw new IllegalStateException(msg, ex);
        }
    }

    public synchronized void releaseExternalResources() {
        try {
            if (this.channel != null && this.channel.isOpen() && this.consumerTag != null) {
                this.channel.basicCancel(this.consumerTag);
            }
        }
        catch (Exception ex) {
            logger.trace("", (Throwable)ex);
        }
    }

    public void shutdownCompleted(ShutdownSignalException cause) {
        this.destroy();
    }
}

