/*
 * Decompiled with CFR 0.152.
 */
package org.dromara.mendmix.amqp.adapter.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.dromara.mendmix.amqp.MQConsumer;
import org.dromara.mendmix.amqp.MQContext;
import org.dromara.mendmix.amqp.MQMessage;
import org.dromara.mendmix.amqp.MessageHandler;
import org.dromara.mendmix.amqp.adapter.rabbitmq.CachingChannelFactory;
import org.dromara.mendmix.common.ThreadLocalContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RabbitmqConsumerAdapter
implements MQConsumer {
    private static final Logger logger = LoggerFactory.getLogger((String)"org.dromara.mendmix.amqp.adapter");
    private Map<String, MessageHandler> messageHandlers = new HashMap<String, MessageHandler>();
    private MQContext context;
    private String queueName;

    public RabbitmqConsumerAdapter(MQContext context, Map<String, MessageHandler> messageHandlers) {
        this.context = context;
        this.messageHandlers = messageHandlers;
    }

    @Override
    public void start() throws Exception {
        this.queueName = this.context.getGroupName();
        Set<String> topics = this.messageHandlers.keySet();
        Channel channel = CachingChannelFactory.createChannel(this.context);
        for (String topic : topics) {
            channel.queueBind(this.queueName, CachingChannelFactory.getExchangeName(), topic);
        }
        TopicDeliverCallback deliverCallback = new TopicDeliverCallback(channel);
        channel.basicConsume(this.queueName, false, (DeliverCallback)deliverCallback, consumerTag -> {});
    }

    @Override
    public void shutdown() {
        CachingChannelFactory.closeAll();
    }

    private class TopicDeliverCallback
    implements DeliverCallback {
        private Channel channel;

        public TopicDeliverCallback(Channel channel) {
            this.channel = channel;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void handle(String consumerTag, Delivery message) throws IOException {
            String routingKey = message.getEnvelope().getRoutingKey();
            MessageHandler messageHandler = (MessageHandler)RabbitmqConsumerAdapter.this.messageHandlers.get(routingKey);
            if (messageHandler == null) {
                return;
            }
            MQMessage mqMessage = new MQMessage(routingKey, message.getBody());
            AMQP.BasicProperties properties = message.getProperties();
            mqMessage.setMsgId(properties.getMessageId());
            Map headers = properties.getHeaders();
            if (headers != null) {
                headers.forEach((k, v) -> {
                    if (v != null) {
                        mqMessage.addHeader((String)k, v.toString());
                    }
                });
            }
            long deliveryTag = message.getEnvelope().getDeliveryTag();
            try {
                if (RabbitmqConsumerAdapter.this.context.matchedOnFilter(mqMessage)) {
                    mqMessage.setUserContextOnConsume();
                    messageHandler.process(mqMessage);
                    if (logger.isDebugEnabled()) {
                        logger.debug("MQ_MESSAGE_CONSUME_SUCCESS ->message:{}", (Object)message);
                    }
                    MQContext.processMessageLog(RabbitmqConsumerAdapter.this.context, mqMessage, MQContext.ActionType.sub, null);
                }
                this.channel.basicAck(deliveryTag, false);
            }
            catch (Exception e) {
                this.channel.basicReject(deliveryTag, true);
                logger.error(String.format("MQ_MESSAGE_CONSUME_ERROR ->message:%s", message.toString()), (Throwable)e);
                MQContext.processMessageLog(RabbitmqConsumerAdapter.this.context, mqMessage, MQContext.ActionType.sub, e);
            }
            finally {
                ThreadLocalContext.unset();
            }
        }
    }
}

