/*
 * Decompiled with CFR 0.152.
 */
package org.dromara.mendmix.amqp.adapter.rocketmq;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.dromara.mendmix.amqp.MQConsumer;
import org.dromara.mendmix.amqp.MQContext;
import org.dromara.mendmix.amqp.MQMessage;
import org.dromara.mendmix.amqp.MessageHandler;
import org.dromara.mendmix.common.ThreadLocalContext;
import org.dromara.mendmix.common.util.ResourceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RocketmqConsumerAdapter
implements MQConsumer {
    private static final Logger logger = LoggerFactory.getLogger((String)"org.dromara.mendmix.amqp.adapter");
    private String namesrvAddr;
    private Map<String, MessageHandler> messageHandlers = new HashMap<String, MessageHandler>();
    private DefaultMQPushConsumer consumer;
    private MQContext context;

    public RocketmqConsumerAdapter(MQContext context, Map<String, MessageHandler> messageHandlers) {
        this.context = context;
        this.messageHandlers = messageHandlers;
        this.namesrvAddr = ResourceUtils.getAndValidateProperty((String)(context.getInstance() + ".amqp.rocketmq[namesrvAddr]"));
    }

    public void setNamesrvAddr(String namesrvAddr) {
        this.namesrvAddr = namesrvAddr;
    }

    @Override
    public void start() throws Exception {
        int consumeThreads = this.context.getMaxProcessThreads();
        String groupName = this.context.getGroupName();
        this.consumer = new DefaultMQPushConsumer(groupName);
        this.consumer.setNamesrvAddr(this.namesrvAddr);
        this.consumer.setConsumeMessageBatchMaxSize(1);
        this.consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        this.consumer.setConsumeThreadMin(consumeThreads);
        this.consumer.setConsumeThreadMax(consumeThreads);
        this.consumer.setPullThresholdForQueue(1000);
        this.consumer.setConsumeConcurrentlyMaxSpan(500);
        for (String topic : this.messageHandlers.keySet()) {
            this.consumer.subscribe(topic, "*");
        }
        this.consumer.registerMessageListener((MessageListenerConcurrently)new CustomMessageListener());
        this.consumer.start();
    }

    @Override
    public void shutdown() {
        this.consumer.shutdown();
    }

    private class CustomMessageListener
    implements MessageListenerConcurrently {
        private CustomMessageListener() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeContext) {
            if (msgs.isEmpty()) {
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
            MessageExt msg = msgs.get(0);
            if (!RocketmqConsumerAdapter.this.messageHandlers.containsKey(msg.getTopic())) {
                logger.warn("not messageHandler found for:{}", (Object)msg.getTopic());
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
            if (RocketmqConsumerAdapter.this.context.getConsumeMaxRetryTimes() > 0 && msg.getReconsumeTimes() > RocketmqConsumerAdapter.this.context.getConsumeMaxRetryTimes()) {
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
            if (RocketmqConsumerAdapter.this.context.getConsumeMaxInterval() > 0L && msg.getReconsumeTimes() > 1 && System.currentTimeMillis() - msg.getBornTimestamp() > RocketmqConsumerAdapter.this.context.getConsumeMaxInterval()) {
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
            MQMessage message = new MQMessage(msg.getTopic(), msg.getTags(), msg.getKeys(), msg.getBody());
            message.setHeaders(msg.getProperties());
            if (!RocketmqConsumerAdapter.this.context.matchedOnFilter(message)) {
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
            message.setUserContextOnConsume();
            try {
                ((MessageHandler)RocketmqConsumerAdapter.this.messageHandlers.get(message.getTopic())).process(message);
                if (logger.isDebugEnabled()) {
                    logger.debug("MQ_MESSAGE_CONSUME_SUCCESS ->message:{}", (Object)message);
                }
                MQContext.processMessageLog(RocketmqConsumerAdapter.this.context, message, MQContext.ActionType.sub, null);
                ConsumeConcurrentlyStatus consumeConcurrentlyStatus = ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                return consumeConcurrentlyStatus;
            }
            catch (Exception e) {
                logger.error(String.format("MQ_MESSAGE_CONSUME_ERROR ->message:%s", message.toString()), (Throwable)e);
                MQContext.processMessageLog(RocketmqConsumerAdapter.this.context, message, MQContext.ActionType.sub, e);
                ConsumeConcurrentlyStatus consumeConcurrentlyStatus = ConsumeConcurrentlyStatus.RECONSUME_LATER;
                return consumeConcurrentlyStatus;
            }
            finally {
                ThreadLocalContext.unset();
            }
        }
    }
}

