/*
 * Decompiled with CFR 0.152.
 */
package ru.kiryam.storm.rabbitmq;

import com.rabbitmq.client.Address;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.io.Serializable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.kiryam.storm.rabbitmq.Declarator;
import ru.kiryam.storm.rabbitmq.ErrorReporter;
import ru.kiryam.storm.rabbitmq.Message;
import ru.kiryam.storm.rabbitmq.config.ConnectionConfig;

public class RabbitMQConsumer
implements Serializable {
    public static final long MS_WAIT_FOR_MESSAGE = 1L;
    private final ConnectionFactory connectionFactory;
    private final Address[] highAvailabilityHosts;
    private final int prefetchCount;
    private final String queueName;
    private final boolean requeueOnFail;
    private final Declarator declarator;
    private final ErrorReporter reporter;
    private final Logger logger;
    private Connection connection;
    private Channel channel;
    private QueueingConsumer consumer;
    private String consumerTag;

    public RabbitMQConsumer(ConnectionConfig connectionConfig, int prefetchCount, String queueName, boolean requeueOnFail, Declarator declarator, ErrorReporter errorReporter) {
        this.connectionFactory = connectionConfig.asConnectionFactory();
        this.highAvailabilityHosts = connectionConfig.getHighAvailabilityHosts().toAddresses();
        this.prefetchCount = prefetchCount;
        this.queueName = queueName;
        this.requeueOnFail = requeueOnFail;
        this.declarator = declarator;
        this.reporter = errorReporter;
        this.logger = LoggerFactory.getLogger(RabbitMQConsumer.class);
    }

    public Message nextMessage() {
        this.reinitIfNecessary();
        if (this.consumerTag == null || this.consumer == null) {
            return Message.NONE;
        }
        try {
            return Message.forDelivery(this.consumer.nextDelivery(1L));
        }
        catch (ShutdownSignalException sse) {
            this.reset();
            this.logger.error("shutdown signal received while attempting to get next message", (Throwable)sse);
            this.reporter.reportError(sse);
            return Message.NONE;
        }
        catch (InterruptedException ie) {
            this.logger.debug("interruepted while waiting for message", (Throwable)ie);
            return Message.NONE;
        }
        catch (ConsumerCancelledException cce) {
            this.reset();
            this.logger.error("consumer got cancelled while attempting to get next message", (Throwable)cce);
            this.reporter.reportError(cce);
            return Message.NONE;
        }
    }

    public void ack(Long msgId) {
        this.reinitIfNecessary();
        try {
            this.channel.basicAck(msgId.longValue(), false);
        }
        catch (ShutdownSignalException sse) {
            this.reset();
            this.logger.error("shutdown signal received while attempting to ack message", (Throwable)sse);
            this.reporter.reportError(sse);
        }
        catch (Exception e) {
            this.logger.error("could not ack for msgId: " + msgId, (Throwable)e);
            this.reporter.reportError(e);
        }
    }

    public void fail(Long msgId) {
        if (this.requeueOnFail) {
            this.failWithRedelivery(msgId);
        } else {
            this.deadLetter(msgId);
        }
    }

    public void failWithRedelivery(Long msgId) {
        this.reinitIfNecessary();
        try {
            this.channel.basicReject(msgId.longValue(), true);
        }
        catch (ShutdownSignalException sse) {
            this.reset();
            this.logger.error("shutdown signal received while attempting to fail with redelivery", (Throwable)sse);
            this.reporter.reportError(sse);
        }
        catch (Exception e) {
            this.logger.error("could not fail with redelivery for msgId: " + msgId, (Throwable)e);
            this.reporter.reportError(e);
        }
    }

    public void deadLetter(Long msgId) {
        this.reinitIfNecessary();
        try {
            this.channel.basicReject(msgId.longValue(), false);
        }
        catch (ShutdownSignalException sse) {
            this.reset();
            this.logger.error("shutdown signal received while attempting to fail with no redelivery", (Throwable)sse);
            this.reporter.reportError(sse);
        }
        catch (Exception e) {
            this.logger.error("could not fail with dead-lettering (when configured) for msgId: " + msgId, (Throwable)e);
            this.reporter.reportError(e);
        }
    }

    public void open() {
        try {
            this.connection = this.createConnection();
            this.channel = this.connection.createChannel();
            if (this.prefetchCount > 0) {
                this.logger.info("setting basic.qos / prefetch count to " + this.prefetchCount + " for " + this.queueName);
                this.channel.basicQos(this.prefetchCount);
            }
            this.declarator.execute(this.channel);
            this.consumer = new QueueingConsumer(this.channel);
            this.consumerTag = this.channel.basicConsume(this.queueName, this.isAutoAcking(), (Consumer)this.consumer);
        }
        catch (Exception e) {
            this.reset();
            this.logger.error("could not open listener on queue " + this.queueName);
            this.reporter.reportError(e);
        }
    }

    protected boolean isAutoAcking() {
        return false;
    }

    public void close() {
        try {
            if (this.channel != null && this.channel.isOpen()) {
                if (this.consumerTag != null) {
                    this.channel.basicCancel(this.consumerTag);
                }
                this.channel.close();
            }
        }
        catch (Exception e) {
            this.logger.debug("error closing channel and/or cancelling consumer", (Throwable)e);
        }
        try {
            this.logger.info("closing connection to rabbitmq: " + this.connection);
            this.connection.close();
        }
        catch (Exception e) {
            this.logger.debug("error closing connection", (Throwable)e);
        }
        this.consumer = null;
        this.consumerTag = null;
        this.channel = null;
        this.connection = null;
    }

    private void reset() {
        this.consumerTag = null;
    }

    private void reinitIfNecessary() {
        if (this.consumerTag == null || this.consumer == null) {
            this.close();
            this.open();
        }
    }

    private Connection createConnection() throws IOException {
        Connection connection = this.highAvailabilityHosts == null || this.highAvailabilityHosts.length == 0 ? this.connectionFactory.newConnection() : this.connectionFactory.newConnection(this.highAvailabilityHosts);
        connection.addShutdownListener(new ShutdownListener(){

            public void shutdownCompleted(ShutdownSignalException cause) {
                RabbitMQConsumer.this.logger.error("shutdown signal received", (Throwable)cause);
                RabbitMQConsumer.this.reporter.reportError((Throwable)cause);
                RabbitMQConsumer.this.reset();
            }
        });
        this.logger.info("connected to rabbitmq: " + connection + " for " + this.queueName);
        return connection;
    }
}

