/*
 * Decompiled with CFR 0.152.
 */
package org.vatplanner.commons.amqp;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.vatplanner.commons.amqp.ConnectionFailed;
import org.vatplanner.commons.amqp.ExchangeParameters;

public class AmqpSubscriptionCreator {
    private static final Logger LOGGER = LoggerFactory.getLogger(AmqpSubscriptionCreator.class);
    final Channel channel;
    String logPrefix = "";
    private String queueName;
    private boolean declareQueue = true;
    private final LinkedHashSet<String> queueRoutingKeys = new LinkedHashSet();
    private ExchangeParameters exchangeParameters;
    boolean autoAck = true;
    private AmqpMessageHandler amqpMessageHandler;
    ReceiptAction errorReceiptAction;

    AmqpSubscriptionCreator(Channel channel) {
        if (channel == null) {
            throw new IllegalArgumentException("No channel provided (may be unavailable on AMQP connection).");
        }
        this.channel = channel;
    }

    public static AmqpSubscriptionCreator usingChannel(Channel channel) {
        return new AmqpSubscriptionCreator(channel);
    }

    public AmqpSubscriptionCreator onError(ReceiptAction receiptAction) {
        this.errorReceiptAction = receiptAction;
        if (this.errorReceiptAction == ReceiptAction.ACKNOWLEDGE) {
            LOGGER.warn("{}Configured to send positive receipt in case of errors: {}", (Object)this.logPrefix, (Object)this.errorReceiptAction);
        }
        return this;
    }

    public AmqpSubscriptionCreator forExistingQueue(String queueName) {
        this.queueName = queueName;
        this.declareQueue = false;
        return this;
    }

    public AmqpSubscriptionCreator withQueueRoutingKeys(String ... queueRoutingKeys) {
        return this.withQueueRoutingKeys(Arrays.asList(queueRoutingKeys));
    }

    public AmqpSubscriptionCreator withQueueRoutingKeys(Collection<String> queueRoutingKeys) {
        this.queueRoutingKeys.addAll(queueRoutingKeys);
        return this;
    }

    public AmqpSubscriptionCreator withLogPrefix(String logPrefix) {
        this.logPrefix = "[" + logPrefix + "] ";
        return this;
    }

    public AmqpSubscriptionCreator withExchangeParameters(ExchangeParameters exchangeParameters) {
        this.exchangeParameters = exchangeParameters;
        return this;
    }

    public AmqpSubscriptionCreator withAutoAck(boolean autoAck) {
        this.autoAck = autoAck;
        return this;
    }

    public AmqpSubscriptionCreator onMessage(AmqpMessageHandler amqpMessageHandler) {
        this.amqpMessageHandler = amqpMessageHandler;
        return this;
    }

    public void subscribe() {
        if (this.exchangeParameters == null && (this.declareQueue || this.queueName == null)) {
            throw new IncompleteConfiguration(this.logPrefix + "Neither exchange parameters nor existing queue have been configured");
        }
        if (this.amqpMessageHandler == null) {
            throw new IncompleteConfiguration(this.logPrefix + "Message handler has not been configured");
        }
        if (this.errorReceiptAction == null) {
            this.errorReceiptAction = this.autoAck ? ReceiptAction.ALREADY_CONFIRMED : ReceiptAction.DISCARD;
            LOGGER.trace("{}Error receipt action defaults to {}", (Object)this.logPrefix, (Object)this.errorReceiptAction);
        }
        try {
            if (this.exchangeParameters != null) {
                if (this.exchangeParameters.shouldDeclare()) {
                    this.channel.exchangeDeclare(this.exchangeParameters.getName(), this.exchangeParameters.getType(), this.exchangeParameters.isDurable());
                    LOGGER.debug("{}Exchange {} declared ({}, durable: {})", new Object[]{this.logPrefix, this.exchangeParameters.getName(), this.exchangeParameters.getType(), this.exchangeParameters.isDurable()});
                } else {
                    this.channel.exchangeDeclarePassive(this.exchangeParameters.getName());
                    LOGGER.debug("{}Exchange {} exists", (Object)this.logPrefix, (Object)this.exchangeParameters.getName());
                }
            }
            if (!this.declareQueue) {
                if (this.queueName == null) {
                    throw new IncompleteConfiguration(this.logPrefix + "Existing queue should be used but queue name has not been configured");
                }
                LOGGER.debug("{}Using existing queue: {}", (Object)this.logPrefix, (Object)this.queueName);
            } else {
                this.queueName = this.channel.queueDeclare().getQueue();
                LOGGER.debug("{}Queue declared: {}", (Object)this.logPrefix, (Object)this.queueName);
                if (this.queueRoutingKeys.isEmpty()) {
                    this.queueRoutingKeys.add(this.exchangeParameters.getDefaultRoutingKey());
                    LOGGER.debug("{}Queue routing key has not been configured, using exchange default routing key \"{}\"", (Object)this.logPrefix, (Object)this.exchangeParameters.getDefaultRoutingKey());
                }
                for (String queueRoutingKey : this.queueRoutingKeys) {
                    this.channel.queueBind(this.queueName, this.exchangeParameters.getName(), queueRoutingKey);
                    LOGGER.debug("{}Queue bound to exchange (routing key: \"{}\")", (Object)this.logPrefix, (Object)queueRoutingKey);
                }
            }
            this.channel.basicConsume(this.queueName, this.autoAck, (Consumer)new DefaultConsumer(this.channel){

                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    ReceiptAction receiptAction;
                    try {
                        receiptAction = AmqpSubscriptionCreator.this.amqpMessageHandler.handleDelivery(consumerTag, envelope, properties, body);
                    }
                    catch (Exception ex) {
                        LOGGER.warn("{}Failed to handle message", (Object)AmqpSubscriptionCreator.this.logPrefix, (Object)ex);
                        receiptAction = AmqpSubscriptionCreator.this.errorReceiptAction;
                    }
                    AmqpSubscriptionCreator.this.performReceiptAction(receiptAction, envelope);
                }
            });
        }
        catch (IOException ex) {
            throw new ConnectionFailed(this.logPrefix + "Failed to set up listener", ex);
        }
    }

    private void performReceiptAction(ReceiptAction receiptAction, Envelope envelope) {
        if (receiptAction == ReceiptAction.ALREADY_CONFIRMED) {
            LOGGER.trace("{}Handler has already confirmed message receipt, nothing to be done.", (Object)this.logPrefix);
        } else if (this.autoAck) {
            if (receiptAction == ReceiptAction.ACKNOWLEDGE) {
                LOGGER.trace("{}Auto-Ack is active, ignoring ACK confirmation from handler.", (Object)this.logPrefix);
            } else {
                LOGGER.warn("{}Auto-Ack is active but handler wanted to sent NACK ({}); unable to signal handling via AMQP", (Object)this.logPrefix, (Object)receiptAction);
            }
        } else if (receiptAction.successful) {
            LOGGER.trace("{}Sending ACK", (Object)this.logPrefix);
            try {
                this.channel.basicAck(envelope.getDeliveryTag(), false);
            }
            catch (IOException ex) {
                LOGGER.warn("{}Failed to send ACK", (Object)this.logPrefix, (Object)ex);
            }
        } else {
            LOGGER.trace("{}Sending NACK", (Object)this.logPrefix);
            try {
                this.channel.basicNack(envelope.getDeliveryTag(), false, receiptAction.requeue);
            }
            catch (IOException ex) {
                LOGGER.warn("{}Failed to send NACK (requeue: {})", new Object[]{this.logPrefix, receiptAction.requeue, ex});
            }
        }
    }

    private static class IncompleteConfiguration
    extends RuntimeException {
        IncompleteConfiguration(String msg) {
            super(msg);
        }
    }

    @FunctionalInterface
    public static interface AmqpMessageHandler {
        public ReceiptAction handleDelivery(String var1, Envelope var2, AMQP.BasicProperties var3, byte[] var4) throws IOException;
    }

    public static enum ReceiptAction {
        ACKNOWLEDGE(true),
        REQUEUE(false, true),
        DISCARD(false, false),
        ALREADY_CONFIRMED;

        private final boolean successful;
        private final boolean requeue;

        private ReceiptAction() {
            this(true);
        }

        private ReceiptAction(boolean successful) {
            this(successful, false);
        }

        private ReceiptAction(boolean successful, boolean requeue) {
            this.successful = successful;
            this.requeue = requeue;
        }
    }
}

