/*
 * Decompiled with CFR 0.152.
 */
package org.dromara.mendmix.amqp.adapter.redis;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.dromara.mendmix.amqp.MQConsumer;
import org.dromara.mendmix.amqp.MQContext;
import org.dromara.mendmix.amqp.MessageHandler;
import org.dromara.mendmix.amqp.adapter.redis.MessageHandlerDelegate;
import org.dromara.mendmix.common.async.StandardThreadExecutor;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

public class RedisConsumerAdapter
implements MQConsumer {
    private RedisConnectionFactory connectionFactory;
    private RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    private ThreadPoolExecutor fetchExecutor;
    private StandardThreadExecutor asyncProcessExecutor;
    private MQContext context;
    private Map<String, MessageHandler> messageHandlers = new HashMap<String, MessageHandler>();

    public RedisConsumerAdapter(MQContext context, RedisConnectionFactory connectionFactory, Map<String, MessageHandler> messageHandlers) {
        this.context = context;
        this.connectionFactory = connectionFactory;
        this.messageHandlers = messageHandlers;
    }

    @Override
    public void start() throws Exception {
        int maxThread = this.context.getMaxProcessThreads();
        this.fetchExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), (ThreadFactory)new StandardThreadExecutor.StandardThreadFactory("messageFetcher"));
        this.asyncProcessExecutor = new StandardThreadExecutor(1, maxThread, 60L, TimeUnit.SECONDS, 1000, (ThreadFactory)new StandardThreadExecutor.StandardThreadFactory("messageAsyncProcessor"));
        this.container.setConnectionFactory(this.connectionFactory);
        this.container.setSubscriptionExecutor((Executor)this.fetchExecutor);
        this.container.setTaskExecutor((Executor)this.asyncProcessExecutor);
        Set<String> topics = this.messageHandlers.keySet();
        for (String topic : topics) {
            MessageHandlerDelegate delegate = new MessageHandlerDelegate(this.context, topic, this.messageHandlers.get(topic));
            MessageListenerAdapter listener = new MessageListenerAdapter((Object)delegate, "onMessage");
            listener.afterPropertiesSet();
            this.container.addMessageListener((MessageListener)listener, (Topic)new PatternTopic(topic));
        }
        this.container.afterPropertiesSet();
        this.container.start();
    }

    @Override
    public void shutdown() {
        this.fetchExecutor.shutdown();
        this.asyncProcessExecutor.shutdown();
        this.container.stop();
        try {
            this.container.destroy();
        }
        catch (Exception exception) {
            // empty catch block
        }
    }
}

