package com.luoluo.delaymq.producer;

import com.luoluo.delaymq.common.Message;
import com.luoluo.delaymq.common.MessageOperate;
import com.luoluo.delaymq.common.TransactionMsgData;
import com.luoluo.delaymq.constant.MQConstant;
import com.luoluo.delaymq.constant.QueueTypeEnum;
import com.luoluo.delaymq.exception.BizException;
import com.luoluo.delaymq.lock.DistributedLock;
import com.luoluo.delaymq.utils.DateNumUtil;
import com.luoluo.delaymq.utils.JSONUtil;
import com.luoluo.delaymq.utils.ThreadFactoryImpl;
import java.util.Iterator;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.SmartLifecycle;

/* loaded from: input_file:com/luoluo/delaymq/producer/TransactionConsumeMessageService.class */
public class TransactionConsumeMessageService implements SmartLifecycle, Runnable {
    private static final Logger log = LoggerFactory.getLogger(TransactionConsumeMessageService.class);
    protected DistributedLock distributedLock;
    protected MessageOperate messageOperate;
    protected QueueTypeEnum queueType;
    private boolean running = false;
    private final ScheduledExecutorService scanConsumerSchedule = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("scanTransactionConsumerSchedule", true));

    public TransactionConsumeMessageService(DistributedLock distributedLock, MessageOperate messageOperate, QueueTypeEnum queueTypeEnum) {
        this.distributedLock = distributedLock;
        this.messageOperate = messageOperate;
        this.queueType = queueTypeEnum;
    }

    public void start() {
        this.scanConsumerSchedule.scheduleAtFixedRate(this, 3000L, 1000L, TimeUnit.MILLISECONDS);
        this.running = true;
    }

    public void stop() {
        if (isRunning()) {
            this.scanConsumerSchedule.shutdownNow();
            this.running = false;
        }
    }

    public boolean isRunning() {
        return this.running;
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            consumeTransactionTopicMessage();
        } catch (Throwable th) {
            log.info("schedule transaction message fail", th);
        }
    }

    public void consumeTransactionTopicMessage() {
        Iterator<String> it = this.messageOperate.pullMessageFromBeginToEnd(MQConstant.TRANSACTION_GLOBAL_NAME, 0L, System.currentTimeMillis()).iterator();
        while (it.hasNext()) {
            consumeTransactionSingleMessage(it.next());
        }
    }

    private void consumeTransactionSingleMessage(String str) {
        String str2 = "MESSAGE:LOCK:POOL:MESSAGE:QUEUE:TRANSACTION:" + str;
        if (!this.distributedLock.tryLock(str2)) {
            log.info("other applications are processing,transactionId:{}", str);
            return;
        }
        try {
            String transactionMessage = this.messageOperate.getTransactionMessage(str);
            if (transactionMessage == null) {
                this.messageOperate.rollbackTransactionMessageToQueue(str);
                this.distributedLock.unlock(str2);
                return;
            }
            TransactionMsgData messageTransactionRetryData = this.messageOperate.getMessageTransactionRetryData(str);
            if (messageTransactionRetryData == null) {
                messageTransactionRetryData = new TransactionMsgData(0);
            }
            Message<?> message = (Message) JSONUtil.parseObject(transactionMessage, Message.class);
            TransactionListenerPhrase delayMQLocalTransactionListener = TransactionListenerContainer.getInstance().getDelayMQLocalTransactionListener(message, this.queueType);
            Object parseMessageType = parseMessageType(message, (Class) delayMQLocalTransactionListener.getMessageType());
            DelayMQLocalTransactionListener delayMQLocalTransactionListener2 = null;
            if (delayMQLocalTransactionListener != null) {
                try {
                    DelayMQLocalTransactionListener delayMQLocalTransactionListener3 = delayMQLocalTransactionListener.getDelayMQLocalTransactionListener();
                    DelayMQTransactionState checkLocalTransaction = delayMQLocalTransactionListener3.checkLocalTransaction(parseMessageType, str);
                    if (DelayMQTransactionState.COMMIT.equals(checkLocalTransaction)) {
                        log.info("commit message,id:{}", str);
                        this.messageOperate.commitTransactionMessageToQueue(message, str);
                        delayMQLocalTransactionListener3.afterCommitTransactionMessage(parseMessageType, str);
                    } else if (DelayMQTransactionState.ROLLBACK.equals(checkLocalTransaction)) {
                        log.info("rollback message,id:{}", str);
                        this.messageOperate.rollbackTransactionMessageToQueue(str);
                        delayMQLocalTransactionListener3.afterRollbackTransactionMessage(parseMessageType, str);
                    } else if (DelayMQTransactionState.UNKNOWN.equals(checkLocalTransaction)) {
                        throw new BizException("unsure message status", (Throwable) null);
                    }
                } catch (Exception e) {
                    if (messageTransactionRetryData.getRetryCount() >= 5) {
                        log.warn("rollback transaction Message after over retry count, id:{}", str);
                        this.messageOperate.rollbackTransactionMessageToQueue(str);
                        delayMQLocalTransactionListener2.afterOverRetryTransactionMessage(parseMessageType, str);
                        this.distributedLock.unlock(str2);
                        return;
                    }
                    log.info("delay Message,id:{}", str);
                    this.messageOperate.delayTransactionMessageToQueue(str, DateNumUtil.FIVE_SECOND.longValue(), messageTransactionRetryData);
                    delayMQLocalTransactionListener2.afterDelayTransactionMessage(parseMessageType, str);
                    this.distributedLock.unlock(str2);
                    return;
                }
            }
            this.distributedLock.unlock(str2);
        } catch (Throwable th) {
            this.distributedLock.unlock(str2);
            throw th;
        }
    }

    private <T> T parseMessageType(Message message, Class<T> cls) {
        return (T) JSONUtil.parseObject(message.getBody(), cls);
    }
}
