package com.luoluo.delaymq.starter.autoconfigure;

import com.luoluo.delaymq.common.MessageOperateManager;
import com.luoluo.delaymq.common.TopicManager;
import com.luoluo.delaymq.common.TopicQueue;
import com.luoluo.delaymq.config.DelayMQProperties;
import com.luoluo.delaymq.consumer.ConsumeListenerDTO;
import com.luoluo.delaymq.consumer.DefaultMQConsumer;
import com.luoluo.delaymq.consumer.DelayMQConsumerLifecycleListener;
import com.luoluo.delaymq.consumer.DelayMQListenerContainer;
import com.luoluo.delaymq.consumer.MQConsumer;
import com.luoluo.delaymq.exception.BizException;
import com.luoluo.delaymq.lock.DistributedLock;
import com.luoluo.delaymq.utils.ThreadFactoryImpl;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.SmartLifecycle;
import org.springframework.util.Assert;

/* loaded from: input_file:com/luoluo/delaymq/starter/autoconfigure/DefaultMergeDelayMQListenerContainer.class */
public class DefaultMergeDelayMQListenerContainer implements DelayMQListenerContainer, SmartLifecycle, Runnable {
    private static final Logger log = LoggerFactory.getLogger(DefaultMergeDelayMQListenerContainer.class);
    protected DelayMQProperties.Consumer consumerProperties;
    protected DistributedLock distributedLock;
    private boolean running;
    protected int consumeThread;
    ThreadPoolExecutor consumeExecutor;
    protected MessageOperateManager messageOperateManager = MessageOperateManager.getInstance();
    protected Map<String, List<ConsumeListenerDTO>> topicConsumeListener = new ConcurrentHashMap();
    protected LinkedBlockingQueue<MQConsumer> mqConsumerRunnable = new LinkedBlockingQueue<>(1024);
    private final ScheduledExecutorService scanConsumerSchedule = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("scanConsumerSchedule", true));
    protected TopicManager topicManager = TopicManager.getInstance();

    /* loaded from: input_file:com/luoluo/delaymq/starter/autoconfigure/DefaultMergeDelayMQListenerContainer$ConsumeRunnable.class */
    private class ConsumeRunnable implements Runnable {
        private ConsumeRunnable() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (DefaultMergeDelayMQListenerContainer.this.isRunning()) {
                try {
                    DefaultMergeDelayMQListenerContainer.this.mqConsumerRunnable.take().run();
                } catch (Exception e) {
                    DefaultMergeDelayMQListenerContainer.log.error("ConsumeRunnable error", e);
                }
            }
        }
    }

    public DefaultMergeDelayMQListenerContainer(DelayMQProperties.Consumer consumer, DistributedLock distributedLock) {
        this.consumerProperties = consumer;
        this.distributedLock = distributedLock;
    }

    public void registerTopicConsume(String str, ConsumeListenerDTO consumeListenerDTO) {
        initDelayMQPushConsumer(consumeListenerDTO);
        List<ConsumeListenerDTO> list = this.topicConsumeListener.get(str);
        if (list == null) {
            this.topicConsumeListener.putIfAbsent(str, new ArrayList());
        } else {
            for (ConsumeListenerDTO consumeListenerDTO2 : list) {
                if (consumeListenerDTO.getConsumeGroup().equals(consumeListenerDTO2.getConsumeGroup()) && consumeListenerDTO.getAnnotation().queueType().equals(consumeListenerDTO2.getAnnotation().queueType())) {
                    log.error("please check topic:{} and consumeGroup:{} and queueType:{} already exist", new Object[]{consumeListenerDTO2.getTopic(), consumeListenerDTO2.getConsumeGroup(), consumeListenerDTO2.getAnnotation().queueType()});
                    throw new BizException("please check topic:" + consumeListenerDTO2.getTopic() + " and consumeGroup:" + consumeListenerDTO2.getConsumeGroup() + " and queueType:" + consumeListenerDTO2.getAnnotation().queueType() + " already exist");
                }
            }
        }
        this.topicConsumeListener.get(str).add(consumeListenerDTO);
    }

    protected void initDelayMQPushConsumer(ConsumeListenerDTO consumeListenerDTO) {
        if (consumeListenerDTO.getDelayMQConsumerListener() == null) {
            throw new IllegalArgumentException("no such delayMQConsumerListener");
        }
        Assert.notNull(consumeListenerDTO.getConsumeGroup(), "Property 'consumer group ' is required");
        Assert.notNull(consumeListenerDTO.getTopic(), "Property 'topic' is required");
        Assert.notNull(this.messageOperateManager.getMessageOperate(consumeListenerDTO.getAnnotation().queueType()), "no match queueType");
        DefaultMQConsumer defaultMQConsumer = new DefaultMQConsumer(consumeListenerDTO.getConsumeGroup(), consumeListenerDTO.getTopic(), this.topicManager, consumeListenerDTO.getAnnotation(), this.distributedLock, this.consumerProperties);
        defaultMQConsumer.initialize(consumeListenerDTO.getDelayMQConsumerListener());
        if (consumeListenerDTO.getDelayMQConsumerListener() instanceof DelayMQConsumerLifecycleListener) {
            ((DelayMQConsumerLifecycleListener) consumeListenerDTO.getDelayMQConsumerListener()).prepareStart(defaultMQConsumer);
        }
        consumeListenerDTO.setConsumer(defaultMQConsumer);
    }

    @Override // java.lang.Runnable
    public void run() {
        for (Map.Entry<String, List<ConsumeListenerDTO>> entry : this.topicConsumeListener.entrySet()) {
            String key = entry.getKey();
            for (ConsumeListenerDTO consumeListenerDTO : entry.getValue()) {
                TopicQueue topicQueue = this.topicManager.getTopicQueue(key, consumeListenerDTO.getAnnotation().queueType(), false);
                if (topicQueue != null) {
                    int size = topicQueue.getTopicQueueData().getQueueNames().size();
                    for (int i = 0; i < size; i++) {
                        this.mqConsumerRunnable.offer(consumeListenerDTO.getConsumer());
                    }
                }
            }
        }
    }

    public void destroy() {
        setRunning(false);
        this.scanConsumerSchedule.shutdown();
        this.consumeExecutor.shutdown();
        log.info("container destroyed, {}", toString());
    }

    public void start() {
        if (isRunning()) {
            throw new IllegalStateException("container already running. " + toString());
        }
        setRunning(true);
        this.scanConsumerSchedule.scheduleAtFixedRate(this, 3000L, 1000L, TimeUnit.MILLISECONDS);
        ConsumeRunnable consumeRunnable = new ConsumeRunnable();
        for (int i = 0; i < this.consumeThread; i++) {
            this.consumeExecutor.execute(consumeRunnable);
        }
        log.info("running container: {}", toString());
    }

    public void stop() {
        if (isRunning()) {
            this.scanConsumerSchedule.shutdown();
            this.consumeExecutor.shutdown();
            setRunning(false);
        }
    }

    public boolean isRunning() {
        return this.running;
    }

    private void setRunning(boolean z) {
        this.running = z;
    }

    public int getPhase() {
        return Integer.MAX_VALUE;
    }

    public void setConsumeThread(int i) {
        this.consumeThread = i;
    }

    public int getConsumeThread() {
        return this.consumeThread;
    }

    public void setConsumeExecutor(ThreadPoolExecutor threadPoolExecutor) {
        this.consumeExecutor = threadPoolExecutor;
    }
}
