/*
 * Decompiled with CFR 0.152.
 */
package ru.kiryam.storm.rabbitmq;

import backtype.storm.topology.ReportedFailedException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.BlockedListener;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.kiryam.storm.rabbitmq.Declarator;
import ru.kiryam.storm.rabbitmq.Message;
import ru.kiryam.storm.rabbitmq.config.ConnectionConfig;

public class RabbitMQProducer
implements Serializable {
    private final Declarator declarator;
    private transient Logger logger;
    private transient ConnectionConfig connectionConfig;
    private transient Connection connection;
    private transient Channel channel;
    private boolean blocked = false;

    public RabbitMQProducer() {
        this(new Declarator.NoOp());
    }

    public RabbitMQProducer(Declarator declarator) {
        this.declarator = declarator;
    }

    public void send(Message message) {
        if (message == Message.NONE) {
            return;
        }
        this.sendMessageWhenNotBlocked((Message.MessageForSending)message);
    }

    private void sendMessageWhenNotBlocked(Message.MessageForSending message) {
        while (this.blocked) {
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException interruptedException) {}
        }
        this.sendMessageActual(message);
    }

    private void sendMessageActual(Message.MessageForSending message) {
        this.reinitIfNecessary();
        if (this.channel == null) {
            throw new ReportedFailedException("No connection to RabbitMQ");
        }
        try {
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().contentType(message.getContentType()).contentEncoding(message.getContentEncoding()).deliveryMode(Integer.valueOf(message.isPersistent() ? 2 : 1)).headers(message.getHeaders()).build();
            this.channel.basicPublish(message.getExchangeName(), message.getRoutingKey(), properties, message.getBody());
        }
        catch (AlreadyClosedException ace) {
            this.logger.error("already closed exception while attempting to send message", (Throwable)ace);
            this.reset();
            throw new ReportedFailedException((Throwable)ace);
        }
        catch (IOException ioe) {
            this.logger.error("io exception while attempting to send message", (Throwable)ioe);
            this.reset();
            throw new ReportedFailedException((Throwable)ioe);
        }
        catch (Exception e) {
            this.logger.warn("Unexpected error while sending message. Backing off for a bit before trying again (to allow time for recovery)", (Throwable)e);
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }

    public void open(Map config) {
        this.logger = LoggerFactory.getLogger(RabbitMQProducer.class);
        this.connectionConfig = ConnectionConfig.getFromStormConfig(config);
        this.internalOpen();
    }

    private void internalOpen() {
        try {
            this.connection = this.createConnection();
            this.channel = this.connection.createChannel();
            this.declarator.execute(this.channel);
        }
        catch (Exception e) {
            this.logger.error("could not open connection on rabbitmq", (Throwable)e);
            this.reset();
        }
    }

    public void close() {
        try {
            if (this.channel != null && this.channel.isOpen()) {
                this.channel.close();
            }
        }
        catch (Exception e) {
            this.logger.debug("error closing channel", (Throwable)e);
        }
        try {
            this.logger.info("closing connection to rabbitmq: " + this.connection);
            this.connection.close();
        }
        catch (Exception e) {
            this.logger.debug("error closing connection", (Throwable)e);
        }
        this.channel = null;
        this.connection = null;
    }

    private void reset() {
        this.channel = null;
    }

    private void reinitIfNecessary() {
        if (this.channel == null) {
            this.close();
            this.internalOpen();
        }
    }

    private Connection createConnection() throws IOException {
        ConnectionFactory connectionFactory = this.connectionConfig.asConnectionFactory();
        Connection connection = this.connectionConfig.getHighAvailabilityHosts().isEmpty() ? connectionFactory.newConnection() : connectionFactory.newConnection(this.connectionConfig.getHighAvailabilityHosts().toAddresses());
        connection.addShutdownListener(new ShutdownListener(){

            public void shutdownCompleted(ShutdownSignalException cause) {
                RabbitMQProducer.this.logger.error("shutdown signal received", (Throwable)cause);
                RabbitMQProducer.this.reset();
            }
        });
        connection.addBlockedListener(new BlockedListener(){

            public void handleBlocked(String reason) throws IOException {
                RabbitMQProducer.this.blocked = true;
                RabbitMQProducer.this.logger.warn(String.format("Got blocked by rabbitmq with reason = %s", reason));
            }

            public void handleUnblocked() throws IOException {
                RabbitMQProducer.this.blocked = false;
                RabbitMQProducer.this.logger.warn(String.format("Got unblocked by rabbitmq", new Object[0]));
            }
        });
        this.logger.info("connected to rabbitmq: " + connection);
        return connection;
    }
}

