/*
 * Decompiled with CFR 0.152.
 */
package org.dromara.mendmix.amqp.adapter.aliyun.mns;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.model.Message;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.dromara.mendmix.amqp.adapter.aliyun.mns.MNSClientInstance;
import org.dromara.mendmix.amqp.adapter.aliyun.mns.MNSQueueProcessHanlder;
import org.dromara.mendmix.common.async.StandardThreadExecutor;
import org.dromara.mendmix.common.util.ResourceUtils;
import org.dromara.mendmix.spring.InstanceFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.PriorityOrdered;

public class MNSConsumer
implements InitializingBean,
DisposableBean,
PriorityOrdered {
    private static Logger logger = LoggerFactory.getLogger((String)"com.aygframework.support");
    private Map<String, MNSQueueProcessHanlder> queueHanlders = new HashMap<String, MNSQueueProcessHanlder>();
    private StandardThreadExecutor fetchExecutor;
    private StandardThreadExecutor defaultProcessExecutor;
    private AtomicBoolean closed = new AtomicBoolean(false);
    @Value(value="${aliyun.mns.consumer.queueName}")
    private String queueName;
    private Semaphore semaphore;

    public void afterPropertiesSet() throws Exception {
        this.start();
    }

    private void start() {
        CloudQueue queue = MNSClientInstance.createQueueIfAbsent(this.queueName);
        this.initTopicHanlders();
        this.fetchExecutor = new StandardThreadExecutor(1, 1, 0L, TimeUnit.SECONDS, 1, (ThreadFactory)new StandardThreadExecutor.StandardThreadFactory("mns-Fetch-Executor"));
        int maxThread = ResourceUtils.getInt((String)"aliyun.mns.consumer.processThreads", (int)50);
        this.semaphore = new Semaphore(maxThread);
        this.defaultProcessExecutor = new StandardThreadExecutor(1, maxThread, 60L, TimeUnit.SECONDS, 1, (ThreadFactory)new StandardThreadExecutor.StandardThreadFactory("mns-defaultProcess-Executor"));
        this.fetchExecutor.submit((Runnable)new Worker(queue));
        logger.info("MENDMIX-TRACE-LOGGGING-->> start work for queue Ok -> queue:{}", (Object)queue.getQueueURL());
    }

    private void initTopicHanlders() {
        Map interfaces = InstanceFactory.getBeansOfType(MNSQueueProcessHanlder.class);
        if (interfaces == null || interfaces.isEmpty()) {
            return;
        }
        for (MNSQueueProcessHanlder hanlder : interfaces.values()) {
            for (String topicName : hanlder.topicNames()) {
                if (this.queueHanlders.containsKey(topicName)) {
                    throw new RuntimeException("ProcessHanlder for topicName [" + topicName + "] existed");
                }
                MNSClientInstance.createTopicIfAbsent(topicName, this.queueName);
                this.queueHanlders.put(topicName, hanlder);
                logger.info("MENDMIX-TRACE-LOGGGING-->> registered MNSHanlder Ok -> queue:{},topic:{},hander:{}", new Object[]{this.queueName, topicName, hanlder.getClass().getName()});
            }
        }
        if (this.queueHanlders.isEmpty()) {
            throw new RuntimeException("not any MNS TopicHanlder found");
        }
    }

    public int getOrder() {
        return Integer.MAX_VALUE;
    }

    public void destroy() throws Exception {
        this.closed.set(true);
        this.fetchExecutor.shutdown();
        this.defaultProcessExecutor.shutdown();
    }

    private class Worker
    implements Runnable {
        CloudQueue queue;

        public Worker(CloudQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            while (!MNSConsumer.this.closed.get()) {
                try {
                    final Message message = this.queue.popMessage(5);
                    if (message == null) continue;
                    String messageBody = message.getMessageBodyAsRawString();
                    JSONObject json = JSON.parseObject((String)messageBody);
                    final String topicName = json.getString("topic");
                    final String bodyString = json.getString("body");
                    final MNSQueueProcessHanlder hanlder = (MNSQueueProcessHanlder)MNSConsumer.this.queueHanlders.get(topicName);
                    if (hanlder == null) continue;
                    MNSConsumer.this.semaphore.acquire();
                    MNSConsumer.this.defaultProcessExecutor.submit(new Runnable(){

                        @Override
                        public void run() {
                            try {
                                logger.debug("MENDMIX-TRACE-LOGGGING-->> processs_topic begin -> topicName:{},messageId:{}", (Object)topicName, (Object)message.getMessageId());
                                hanlder.process(topicName, bodyString);
                                Worker.this.queue.deleteMessage(message.getReceiptHandle());
                                logger.debug("MENDMIX-TRACE-LOGGGING-->> processs_topic end -> topicName:{},messageId:{},DequeueCount:{}", new Object[]{topicName, message.getMessageId(), message.getDequeueCount()});
                            }
                            finally {
                                MNSConsumer.this.semaphore.release();
                            }
                        }
                    });
                }
                catch (Exception e) {
                    logger.error("MENDMIX-TRACE-LOGGGING-->> mns_unknow_error", (Throwable)e);
                }
            }
        }
    }
}

