package com.luoluo.delaymq.producer;

import com.luoluo.delaymq.common.Message;
import com.luoluo.delaymq.common.MessageOperate;
import com.luoluo.delaymq.common.SendResultCallback;
import com.luoluo.delaymq.common.SendResultFuture;
import com.luoluo.delaymq.common.TopicManager;
import com.luoluo.delaymq.common.TopicQueue;
import com.luoluo.delaymq.common.TopicQueueData;
import com.luoluo.delaymq.config.DelayMQProperties;
import com.luoluo.delaymq.constant.ErrorCode;
import com.luoluo.delaymq.constant.MQConstant;
import com.luoluo.delaymq.constant.QueueTypeEnum;
import com.luoluo.delaymq.exception.BizException;
import com.luoluo.delaymq.lock.DistributedLock;
import com.luoluo.delaymq.mysql.MySQLMessageOperate;
import com.luoluo.delaymq.redis.RedisMessageOperate;
import com.luoluo.delaymq.service.rebalance.RebalanceStrategyEnum;
import com.luoluo.delaymq.service.rebalance.strategy.HashRebalanceImpl;
import com.luoluo.delaymq.utils.JSONUtil;
import com.luoluo.delaymq.utils.ThreadFactoryImpl;
import com.luoluo.delaymq.utils.UtilAll;
import java.util.ArrayList;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;

/* loaded from: input_file:com/luoluo/delaymq/producer/DefaultMQProducer.class */
public class DefaultMQProducer implements MQProducer {
    private static final Logger log = LoggerFactory.getLogger(DefaultMQProducer.class);
    protected DistributedLock distributedLock;
    private AtomicBoolean running;
    protected TopicManager topicManager;
    protected DelayMQProperties delayMQProperties;
    protected MessageOperate messageOperate;
    private TransactionListenerContainer transactionListenerContainer;
    private TransactionConsumeMessageService consumeMessageService;
    private QueueTypeEnum queueType;
    ThreadPoolExecutor producerExecutor;

    public DefaultMQProducer(DelayMQProperties delayMQProperties, MessageOperate messageOperate, DistributedLock distributedLock) {
        this.running = new AtomicBoolean(false);
        this.topicManager = TopicManager.getInstance();
        this.transactionListenerContainer = TransactionListenerContainer.getInstance();
        this.messageOperate = messageOperate;
        this.delayMQProperties = delayMQProperties;
        this.distributedLock = distributedLock;
        if (messageOperate instanceof RedisMessageOperate) {
            this.queueType = QueueTypeEnum.REDIS_QUEUE;
            this.producerExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 60000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(50000), new ThreadFactoryImpl("redisPushTask"), new ThreadPoolExecutor.AbortPolicy());
        } else {
            if (!(messageOperate instanceof MySQLMessageOperate)) {
                throw new BizException("No matching queue operation type found", (Throwable) null);
            }
            this.queueType = QueueTypeEnum.MYSQL_QUEUE;
            this.producerExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 60000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(3000), new ThreadFactoryImpl("mysqlPushTask"), new ThreadPoolExecutor.AbortPolicy());
        }
    }

    public void afterSingletonsInstantiated() {
        start();
    }

    @Override // com.luoluo.delaymq.Lifecycle
    public void start() {
        this.running.compareAndSet(false, true);
        if (this.transactionListenerContainer.getListenerSize(this.queueType) > 0) {
            this.consumeMessageService = new TransactionConsumeMessageService(this.distributedLock, this.messageOperate, this.queueType);
            this.consumeMessageService.start();
        }
    }

    @Override // com.luoluo.delaymq.Lifecycle
    public void stop() {
        if (this.running.get()) {
            return;
        }
        this.running.compareAndSet(true, false);
        this.consumeMessageService.stop();
    }

    @Override // com.luoluo.delaymq.Lifecycle
    public boolean isRunning() {
        return this.running.get();
    }

    @Override // com.luoluo.delaymq.producer.MQProducer
    public <T> String sendMessage(T t, String str) {
        return sendMessage((DefaultMQProducer) t, str, Long.valueOf(System.currentTimeMillis()));
    }

    @Override // com.luoluo.delaymq.producer.MQProducer
    public <T> String sendMessage(T t, String str, Long l) {
        Assert.notNull(l, "'timestamp' must be not null");
        return sendMessage(t, str, l, null);
    }

    @Override // com.luoluo.delaymq.producer.MQProducer
    public <T> String sendMessage(T t, String str, String str2) {
        return sendMessage(t, str, Long.valueOf(System.currentTimeMillis()), str2);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // com.luoluo.delaymq.producer.MQProducer
    public <T> String sendMessage(T t, String str, Long l, String str2) {
        Assert.notNull(l, "'timestamp' must be not null");
        Message<?> message = new Message<>();
        message.setTopicName(str);
        message.setExecuteTime(l.longValue());
        message.setBody(t);
        message.setMsgId(str2);
        return sendMessage(message);
    }

    @Override // com.luoluo.delaymq.producer.MQProducer
    public String sendMessage(Message<?> message) {
        return pushMessageToQueue(validateAndGetQueueNum(message), message);
    }

    @Override // com.luoluo.delaymq.producer.MQProducer
    public <T> String hashSendMessage(T t, String str, String str2) {
        return hashSendMessage((DefaultMQProducer) t, str, str2, Long.valueOf(System.currentTimeMillis()));
    }

    @Override // com.luoluo.delaymq.producer.MQProducer
    public <T> String hashSendMessage(T t, String str, String str2, Long l) {
        Assert.notNull(l, "'timestamp' must be not null");
        return hashSendMessage(t, str, str2, l, null);
    }

    @Override // com.luoluo.delaymq.producer.MQProducer
    public <T> String hashSendMessage(T t, String str, String str2, String str3) {
        return hashSendMessage(t, str, str2, Long.valueOf(System.currentTimeMillis()), str3);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // com.luoluo.delaymq.producer.MQProducer
    public <T> String hashSendMessage(T t, String str, String str2, Long l, String str3) {
        Message<?> message = new Message<>();
        message.setTopicName(str);
        message.setExecuteTime(l.longValue());
        message.setBody(t);
        message.setMsgId(str3);
        return hashSendMessage(message, str2);
    }

    @Override // com.luoluo.delaymq.producer.MQProducer
    public String hashSendMessage(Message<?> message, String str) {
        return pushMessageToQueue(validateAndHashGetQueueNum(message, str), message);
    }

    @Override // com.luoluo.delaymq.producer.MQProducer
    public <T> SendResultFuture syncSendMessage(T t, String str, Long l) {
        return syncSendMessage(t, str, l, null, null);
    }

    @Override // com.luoluo.delaymq.producer.MQProducer
    public <T> SendResultFuture syncSendMessage(T t, String str, String str2) {
        return syncSendMessage(t, str, null, str2, null);
    }

    @Override // com.luoluo.delaymq.producer.MQProducer
    public <T> SendResultFuture syncSendMessage(T t, String str, Integer num) {
        return syncSendMessage(t, str, null, null, num);
    }

    @Override // com.luoluo.delaymq.producer.MQProducer
    public <T> SendResultFuture syncSendMessage(T t, String str, Long l, String str2) {
        return syncSendMessage(t, str, l, str2, null);
    }

    @Override // com.luoluo.delaymq.producer.MQProducer
    public <T> SendResultFuture syncSendMessage(T t, String str, Long l, Integer num) {
        return syncSendMessage(t, str, l, null, num);
    }

    @Override // com.luoluo.delaymq.producer.MQProducer
    public <T> SendResultFuture syncSendMessage(T t, String str, String str2, Integer num) {
        return syncSendMessage(t, str, null, str2, num);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // com.luoluo.delaymq.producer.MQProducer
    public <T> SendResultFuture syncSendMessage(T t, String str, Long l, String str2, Integer num) {
        Message<?> message = new Message<>();
        message.setTopicName(str);
        message.setExecuteTime(l.longValue());
        message.setBody(t);
        message.setMsgId(str2);
        return syncSendMessage(message, num);
    }

    @Override // com.luoluo.delaymq.producer.MQProducer
    public SendResultFuture syncSendMessage(Message<?> message) {
        return syncSendMessage(message, null);
    }

    @Override // com.luoluo.delaymq.producer.MQProducer
    public SendResultFuture syncSendMessage(Message<?> message, Integer num) {
        String validateAndGetQueueNum = validateAndGetQueueNum(message);
        Future submit = this.producerExecutor.submit(() -> {
            try {
                return new SendResultFuture(true, pushMessageToQueue(validateAndGetQueueNum, message));
            } catch (Exception e) {
                return new SendResultFuture(false, (Throwable) e);
            }
        });
        try {
            return num != null ? (SendResultFuture) submit.get(num.intValue(), TimeUnit.MILLISECONDS) : (SendResultFuture) submit.get();
        } catch (Exception e) {
            submit.cancel(true);
            return new SendResultFuture(false, (Throwable) e);
        }
    }

    @Override // com.luoluo.delaymq.producer.MQProducer
    public <T> Future<SendResultFuture> asyncSendMessageFuture(T t, String str) {
        return asyncSendMessageFuture((DefaultMQProducer) t, str, Long.valueOf(System.currentTimeMillis()));
    }

    @Override // com.luoluo.delaymq.producer.MQProducer
    public <T> Future<SendResultFuture> asyncSendMessageFuture(T t, String str, Long l) {
        Assert.notNull(l, "'timestamp' must be not null");
        return asyncSendMessageFuture(t, str, l, null);
    }

    @Override // com.luoluo.delaymq.producer.MQProducer
    public <T> Future<SendResultFuture> asyncSendMessageFuture(T t, String str, String str2) {
        return asyncSendMessageFuture(t, str, Long.valueOf(System.currentTimeMillis()), str2);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // com.luoluo.delaymq.producer.MQProducer
    public <T> Future<SendResultFuture> asyncSendMessageFuture(T t, String str, Long l, String str2) {
        Message<?> message = new Message<>();
        message.setTopicName(str);
        message.setExecuteTime(l.longValue());
        message.setBody(t);
        message.setMsgId(str2);
        return asyncSendMessageFuture(message);
    }

    @Override // com.luoluo.delaymq.producer.MQProducer
    public Future<SendResultFuture> asyncSendMessageFuture(Message<?> message) {
        String validateAndGetQueueNum = validateAndGetQueueNum(message);
        return this.producerExecutor.submit(() -> {
            try {
                return new SendResultFuture(true, pushMessageToQueue(validateAndGetQueueNum, message));
            } catch (Exception e) {
                return new SendResultFuture(false, (Throwable) e);
            }
        });
    }

    @Override // com.luoluo.delaymq.producer.MQProducer
    public <T> void asyncSendMessageCallback(T t, String str, SendResultCallback sendResultCallback) {
        asyncSendMessageCallback((DefaultMQProducer) t, str, Long.valueOf(System.currentTimeMillis()), sendResultCallback);
    }

    @Override // com.luoluo.delaymq.producer.MQProducer
    public <T> void asyncSendMessageCallback(T t, String str, Long l, SendResultCallback sendResultCallback) {
        asyncSendMessageCallback(t, str, l, null, sendResultCallback);
    }

    @Override // com.luoluo.delaymq.producer.MQProducer
    public <T> void asyncSendMessageCallback(T t, String str, String str2, SendResultCallback sendResultCallback) {
        asyncSendMessageCallback(t, str, Long.valueOf(System.currentTimeMillis()), str2, sendResultCallback);
    }

    @Override // com.luoluo.delaymq.producer.MQProducer
    public <T> void asyncSendMessageCallback(T t, String str, Long l, String str2, SendResultCallback sendResultCallback) {
        Message<T> message = new Message<>();
        message.setTopicName(str);
        message.setExecuteTime(System.currentTimeMillis());
        message.setBody(t);
        message.setExecuteTime(l.longValue());
        message.setMsgId(str2);
        asyncSendMessageCallback(message, sendResultCallback);
    }

    @Override // com.luoluo.delaymq.producer.MQProducer
    public <T> void asyncSendMessageCallback(Message<T> message, SendResultCallback sendResultCallback) {
        String validateAndGetQueueNum = validateAndGetQueueNum(message);
        CompletableFuture.runAsync(() -> {
            try {
                sendResultCallback.success(pushMessageToQueue(validateAndGetQueueNum, message));
            } catch (Exception e) {
                sendResultCallback.fail(e);
            }
        });
    }

    @Override // com.luoluo.delaymq.producer.MQProducer
    public <T> String sendTransactionMessage(T t, String str, Long l) {
        Assert.notNull(l, "'timestamp' must be not null");
        return sendTransactionMessage(t, str, l, null);
    }

    @Override // com.luoluo.delaymq.producer.MQProducer
    public <T> String sendTransactionMessage(T t, String str, String str2) {
        return sendTransactionMessage(t, str, Long.valueOf(System.currentTimeMillis()), str2);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // com.luoluo.delaymq.producer.MQProducer
    public <T> String sendTransactionMessage(T t, String str, Long l, String str2) {
        Message<?> message = new Message<>();
        message.setTopicName(str);
        message.setExecuteTime(l.longValue());
        message.setBody(t);
        message.setMsgId(str2);
        return sendTransactionMessage(message);
    }

    @Override // com.luoluo.delaymq.producer.MQProducer
    public String sendTransactionMessage(Message<?> message) {
        validateAndCreateTopic(message);
        String pushTransactionMessageToQueue = this.messageOperate.pushTransactionMessageToQueue(message);
        TransactionListenerPhrase delayMQLocalTransactionListener = this.transactionListenerContainer.getDelayMQLocalTransactionListener(message, this.queueType);
        if (delayMQLocalTransactionListener == null) {
            throw new BizException("no such DelayMQLocalTransactionListener", (Throwable) null);
        }
        DelayMQTransactionState executeLocalTransaction = delayMQLocalTransactionListener.getDelayMQLocalTransactionListener().executeLocalTransaction(message.getBody(), pushTransactionMessageToQueue);
        if (executeLocalTransaction.equals(DelayMQTransactionState.COMMIT)) {
            this.messageOperate.commitTransactionMessageToQueue(message, pushTransactionMessageToQueue);
            delayMQLocalTransactionListener.getDelayMQLocalTransactionListener().afterCommitTransactionMessage(message.getBody(), pushTransactionMessageToQueue);
        } else if (executeLocalTransaction.equals(DelayMQTransactionState.ROLLBACK)) {
            this.messageOperate.rollbackTransactionMessageToQueue(pushTransactionMessageToQueue);
            delayMQLocalTransactionListener.getDelayMQLocalTransactionListener().afterRollbackTransactionMessage(message.getBody(), pushTransactionMessageToQueue);
        }
        return pushTransactionMessageToQueue;
    }

    private String pushMessageToQueue(String str, Message<?> message) {
        String msgId = UtilAll.isNotBlank(message.getMsgId()) ? message.getMsgId() : UUID.randomUUID().toString();
        this.messageOperate.storeMessage(str, message, msgId);
        return msgId;
    }

    private String validateAndGetQueueNum(Message<?> message) {
        TopicQueue validateAndCreateTopic = validateAndCreateTopic(message);
        return RebalanceStrategyEnum.match(validateAndCreateTopic.getTopicQueueData().getRebalanceStrategyEnum(), RebalanceStrategyEnum.ROUND).getRebalancePushQueue(validateAndCreateTopic);
    }

    private TopicQueue validateAndCreateTopic(Message<?> message) {
        validateProducer();
        validateMessage(message);
        TopicQueue topicQueue = this.topicManager.getTopicQueue(message.getTopicName(), this.queueType, false);
        if (topicQueue == null) {
            System.out.println("2222" + JSONUtil.toJSONString(topicQueue));
            checkTopicIsAutoCreate(this.delayMQProperties);
            topicQueue = createTopicQueue(message.getTopicName(), this.delayMQProperties.getProducer());
            this.topicManager.updateTopicQueueTable(topicQueue, this.queueType);
            System.out.println("111" + JSONUtil.toJSONString(topicQueue));
        }
        return topicQueue;
    }

    private void validateProducer() {
        if (!isRunning()) {
            throw new BizException(ErrorCode.PRODUCER_NOT_START.getDescription(), (Throwable) null);
        }
    }

    protected void validateMessage(Message message) {
        if (UtilAll.isBlank(message.getTopicName())) {
            throw new BizException("check topic not null", (Throwable) null);
        }
    }

    protected void checkTopicIsAutoCreate(DelayMQProperties delayMQProperties) throws BizException {
        if (!delayMQProperties.getProducer().isAutoCreatTopic()) {
            throw new BizException(ErrorCode.CANNOT_AUTO_CREATE_TOPIC.getDescription(), (Throwable) null);
        }
    }

    private TopicQueue createTopicQueue(String str, DelayMQProperties.Producer producer) {
        int writeTopicQueueNum = producer.getWriteTopicQueueNum();
        if (writeTopicQueueNum <= 0) {
            throw new BizException("property 'com.luoluo.delaymq.producer.write-topic-queue-num' must >0 ", (Throwable) null);
        }
        TopicQueue topicQueue = new TopicQueue();
        TopicQueueData topicQueueData = new TopicQueueData();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < writeTopicQueueNum; i++) {
            String topicMsgQueue = TopicManager.getTopicMsgQueue(str, i);
            this.messageOperate.createTopicQueue(topicMsgQueue, topicMsgQueue);
            String topicMsgQueue2 = TopicManager.getTopicMsgQueue(str + MQConstant.MSG_DELAY, i);
            createRetryTopicQueue(topicMsgQueue2, topicMsgQueue2);
            arrayList.add(topicMsgQueue);
        }
        topicQueueData.setQueueNames(arrayList);
        topicQueueData.setRebalanceStrategyEnum(RebalanceStrategyEnum.ROUND);
        topicQueue.setTopicName(str);
        topicQueue.setTopicQueueData(topicQueueData);
        this.messageOperate.createTopicQueueData(str, JSONUtil.toJSONString(topicQueueData));
        return topicQueue;
    }

    private void createRetryTopicQueue(String str, String str2) {
        this.messageOperate.createTopicQueue(str, str2);
    }

    private String validateAndHashGetQueueNum(Message<?> message, String str) {
        return ((HashRebalanceImpl) RebalanceStrategyEnum.match(null, RebalanceStrategyEnum.HASH)).getRebalancePushQueue(validateAndCreateTopic(message), str);
    }

    public DefaultMQProducer(DistributedLock distributedLock, AtomicBoolean atomicBoolean, TopicManager topicManager, DelayMQProperties delayMQProperties, MessageOperate messageOperate, TransactionListenerContainer transactionListenerContainer, TransactionConsumeMessageService transactionConsumeMessageService, QueueTypeEnum queueTypeEnum, ThreadPoolExecutor threadPoolExecutor) {
        this.running = new AtomicBoolean(false);
        this.topicManager = TopicManager.getInstance();
        this.transactionListenerContainer = TransactionListenerContainer.getInstance();
        this.distributedLock = distributedLock;
        this.running = atomicBoolean;
        this.topicManager = topicManager;
        this.delayMQProperties = delayMQProperties;
        this.messageOperate = messageOperate;
        this.transactionListenerContainer = transactionListenerContainer;
        this.consumeMessageService = transactionConsumeMessageService;
        this.queueType = queueTypeEnum;
        this.producerExecutor = threadPoolExecutor;
    }

    public DefaultMQProducer() {
        this.running = new AtomicBoolean(false);
        this.topicManager = TopicManager.getInstance();
        this.transactionListenerContainer = TransactionListenerContainer.getInstance();
    }
}
