package com.luoluo.delaymq.consumer;

import com.luoluo.delaymq.common.BackTrackCacheValues;
import com.luoluo.delaymq.common.Message;
import com.luoluo.delaymq.common.MessageOperate;
import com.luoluo.delaymq.common.SpringContext;
import com.luoluo.delaymq.common.TopicManager;
import com.luoluo.delaymq.config.DelayMQProperties;
import com.luoluo.delaymq.constant.ConsumeMode;
import com.luoluo.delaymq.constant.MQConstant;
import com.luoluo.delaymq.consumer.annotation.DelayMQMessageListener;
import com.luoluo.delaymq.exception.BizException;
import com.luoluo.delaymq.lock.DistributedLock;
import com.luoluo.delaymq.mysql.ConsumerMsg;
import com.luoluo.delaymq.mysql.RecordConsumeTime;
import com.luoluo.delaymq.producer.DelayMQLocalTransactionListener;
import com.luoluo.delaymq.redis.RedisUtils;
import com.luoluo.delaymq.utils.DateNumUtil;
import com.luoluo.delaymq.utils.JSONUtil;
import java.io.IOException;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.AopProxyUtils;
import org.springframework.core.MethodParameter;
import org.springframework.transaction.annotation.Transactional;

/* loaded from: input_file:com/luoluo/delaymq/consumer/CommonConsumeMessageServiceImpl.class */
public class CommonConsumeMessageServiceImpl implements ConsumeMessageService {
    private static final Logger log = LoggerFactory.getLogger(CommonConsumeMessageServiceImpl.class);
    protected DistributedLock distributedLock;
    protected DelayMQConsumerListener delayMQConsumerListener;
    protected String consumerGroup;
    protected String topic;
    protected int backTrackTime;
    protected DelayMQMessageListener anno;
    protected DelayMQProperties.Consumer consumerProperties;
    protected ConsumeMode consumeMode;
    protected long consumeTimeout;
    protected int retryDelayTime;
    protected int retryCount;
    protected int pullMessageSize;
    protected boolean supportTransaction;
    private BackTrackCacheValues<String> backTrackValues;
    protected Type messageType;
    protected MethodParameter methodParameter;
    private AtomicInteger counter = new AtomicInteger(0);
    protected MessageOperate messageOperate;
    private CommonConsumeMessageServiceImpl consumeMessageServiceProxy;
    private String delayMQConsumerListenerName;
    private RedisUtils redisUtils;

    @Override // com.luoluo.delaymq.consumer.ConsumeMessageService
    public void init(String str) {
        if (this.supportTransaction) {
            this.consumeMessageServiceProxy = (CommonConsumeMessageServiceImpl) SpringContext.getBean(str);
        } else {
            this.consumeMessageServiceProxy = this;
        }
        this.redisUtils = (RedisUtils) SpringContext.getBean(RedisUtils.class);
    }

    @Override // com.luoluo.delaymq.consumer.ConsumeMessageService
    public List<RecordConsumeTime> getRecordTopicConsumeTime(String str, String str2) {
        return this.messageOperate.getRecordTopicConsumeTime(str, str2);
    }

    public CommonConsumeMessageServiceImpl(String str, String str2, DelayMQMessageListener delayMQMessageListener, DelayMQProperties.Consumer consumer, DistributedLock distributedLock, DelayMQConsumerListener delayMQConsumerListener) {
        this.delayMQConsumerListener = delayMQConsumerListener;
        this.delayMQConsumerListenerName = AopProxyUtils.ultimateTargetClass(delayMQConsumerListener).getName();
        this.distributedLock = distributedLock;
        this.consumerGroup = str;
        this.topic = str2;
        this.consumerProperties = consumer;
        this.anno = delayMQMessageListener;
        this.consumeMode = delayMQMessageListener.consumeMode();
        this.consumeTimeout = delayMQMessageListener.consumeTimeout();
        if (this.consumeTimeout == -2147483648L) {
            this.consumeTimeout = consumer.getConsumeTimeout();
        }
        if (this.consumeTimeout <= 0) {
            this.consumeTimeout = 15L;
        }
        this.pullMessageSize = delayMQMessageListener.pullMessageSize();
        if (this.pullMessageSize == Integer.MIN_VALUE) {
            this.pullMessageSize = consumer.getPullMessageSize();
        }
        if (this.pullMessageSize <= 0) {
            this.pullMessageSize = MQConstant.DEFAULT_CONSUME_PULL_MESSAGE_SIZE;
        }
        this.supportTransaction = delayMQMessageListener.supportTransaction();
        if (!this.supportTransaction) {
            this.supportTransaction = consumer.isSupportTransaction();
        }
        this.backTrackTime = delayMQMessageListener.backTrackTime();
        if (this.backTrackTime == Integer.MIN_VALUE) {
            this.backTrackTime = consumer.getBackTrackTime();
        }
        if (this.backTrackTime <= 0) {
            this.backTrackTime = 3;
        }
        this.backTrackValues = new BackTrackCacheValues<>(this.backTrackTime);
        this.messageType = getMessageType();
        this.methodParameter = getMethodParameter();
    }

    public void setMessageOperate(MessageOperate messageOperate) {
        this.messageOperate = messageOperate;
    }

    @Override // com.luoluo.delaymq.consumer.ConsumeMessageService
    public void backTrackTopicMessage(String str, int i, long j, long j2, int i2) {
        if (this.backTrackValues != null) {
            ArrayList arrayList = new ArrayList(256);
            backTrackTopicMessage(str, i, j, j2, i2, arrayList);
            this.backTrackValues.addValue((List) arrayList.stream().map((v0) -> {
                return v0.getMsgId();
            }).collect(Collectors.toList()));
        }
    }

    public void backTrackTopicMessage(String str, int i, long j, long j2, int i2, List<ConsumerMsg> list) {
        if (this.backTrackValues != null) {
            Collection<String> pullMessageFromBeginToEnd = this.messageOperate.pullMessageFromBeginToEnd(str, i, j - (this.backTrackTime * DateNumUtil.SECOND.longValue()), j2 - DateNumUtil.SECOND.longValue(), i2, this.pullMessageSize);
            int size = pullMessageFromBeginToEnd.size();
            pullMessageFromBeginToEnd.removeAll((Set) this.backTrackValues.getAllValue().stream().collect(Collectors.toSet()));
            consumeKeysMessage(str, i, j, pullMessageFromBeginToEnd, list, false);
            if (size == this.pullMessageSize) {
                backTrackTopicMessage(str, i, j, j2, i2 + this.pullMessageSize, list);
            }
        }
    }

    @Override // com.luoluo.delaymq.consumer.ConsumeMessageService
    public void consumeRetryTopicMessage(String str, int i, long j, long j2, int i2) {
        consumeTopicMessage(str, i, j, j2, i2, true);
    }

    @Override // com.luoluo.delaymq.consumer.ConsumeMessageService
    public void recordTopicConsumeTime(int i, long j) {
        this.messageOperate.recordTopicConsumeTime(this.topic, this.consumerGroup, i, j);
    }

    @Override // com.luoluo.delaymq.consumer.ConsumeMessageService
    public long consumeTopicMessage(String str, int i, long j, long j2, int i2, boolean z) {
        ArrayList arrayList = new ArrayList(256);
        consumeTopicMessage(str, i, j, j2, i2, arrayList, z);
        if (this.backTrackValues != null) {
            this.backTrackValues.addValue((List) arrayList.stream().map((v0) -> {
                return v0.getMsgId();
            }).collect(Collectors.toList()));
        }
        return j2 + 1;
    }

    public void consumeTopicMessage(String str, int i, long j, long j2, int i2, List<ConsumerMsg> list, boolean z) {
        Collection<String> pullMessageFromBeginToEnd = this.messageOperate.pullMessageFromBeginToEnd(str, i, j, j2, i2, this.pullMessageSize);
        consumeKeysMessage(str, i, j, pullMessageFromBeginToEnd, list, z);
        if (pullMessageFromBeginToEnd.size() == this.pullMessageSize) {
            consumeTopicMessage(str, i, j, j2, i2 + this.pullMessageSize, list, z);
        }
    }

    private void consumeKeysMessage(String str, int i, long j, Collection<String> collection, List<ConsumerMsg> list, boolean z) {
        if (collection == null || collection.size() <= 0) {
            return;
        }
        Iterator<String> it = collection.iterator();
        while (it.hasNext()) {
            this.consumeMessageServiceProxy.consumeSingleMessage(str, i, j, list, it.next(), z);
        }
    }

    @Transactional(rollbackFor = {Throwable.class})
    public void consumeSingleMessage(String str, int i, long j, List<ConsumerMsg> list, String str2, boolean z) {
        String replace = (MQConstant.MSG_LOCK_POOL + TopicManager.getTopicMsgQueue(str, i) + ":" + this.anno.queueType() + ":" + this.consumerGroup + ":" + str2).replace(MQConstant.MSG_DELAY, "");
        try {
            if (!this.distributedLock.tryLock(replace)) {
                log.debug("other applications are processing,taskId:{}", str2);
                return;
            }
            try {
                String message = this.messageOperate.getMessage(str2);
                if (message == null) {
                    this.messageOperate.removeMsgIdInQueue(str, i, str2);
                }
                ConsumerMsg consumerMsgDataClustering = this.messageOperate.getConsumerMsgDataClustering(str2, this.topic, this.consumerGroup);
                if (consumerMsgDataClustering == null) {
                    consumerMsgDataClustering = new ConsumerMsg();
                    consumerMsgDataClustering.setMsgId(str2);
                    consumerMsgDataClustering.setRetryCount(0);
                    consumerMsgDataClustering.setCreatedTime(new Date());
                    consumerMsgDataClustering.setConsumerGroup(this.consumerGroup);
                    consumerMsgDataClustering.setTopic(this.topic);
                } else if (ConsumerStatus.SUCCESS.getDescription().equals(consumerMsgDataClustering.getConsumerStatus()) || ConsumerStatus.COMPLETE_FAIL.getDescription().equals(consumerMsgDataClustering.getConsumerStatus()) || System.currentTimeMillis() < consumerMsgDataClustering.getRetryNextTime().getTime()) {
                    list.add(consumerMsgDataClustering);
                    this.distributedLock.unlock(replace);
                    return;
                }
                Object parseMessageType = parseMessageType(message, (Class) this.messageType);
                try {
                    if (!ConsumerStatus.SUCCESS.equals(this.supportTransaction ? this.delayMQConsumerListener.onTransactionMessage(parseMessageType, str2) : this.delayMQConsumerListener.onMessage(parseMessageType, str2))) {
                        throw new BizException("Unsuccessful consumption");
                    }
                    updateConsumerMsgData(str2, consumerMsgDataClustering, ConsumerStatus.SUCCESS);
                    list.add(consumerMsgDataClustering);
                    this.delayMQConsumerListener.extraOperationAfterMessageSuccess(parseMessageType, str2);
                    this.distributedLock.unlock(replace);
                } catch (Exception e) {
                    log.info("Unsuccessful consumption" + Thread.currentThread().getId(), e);
                    if (consumerMsgDataClustering == null || consumerMsgDataClustering.getRetryCount() < this.retryCount - 1) {
                        this.messageOperate.delayTopicQueueMessage(this.topic, i, str2, this.retryDelayTime);
                        updateConsumerMsgData(str2, consumerMsgDataClustering, ConsumerStatus.FAIL);
                        list.add(consumerMsgDataClustering);
                        this.delayMQConsumerListener.extraOperationAfterMessageFail(parseMessageType, str2);
                        this.distributedLock.unlock(replace);
                        return;
                    }
                    log.error("consumption completely failed", e);
                    updateConsumerMsgData(str2, consumerMsgDataClustering, ConsumerStatus.COMPLETE_FAIL);
                    list.add(consumerMsgDataClustering);
                    this.delayMQConsumerListener.extraOperationAfterMessageCompleteFail(parseMessageType, str2);
                    this.distributedLock.unlock(replace);
                }
            } catch (Exception e2) {
                log.error("not expected Exception", e2);
                this.distributedLock.unlock(replace);
            }
        } catch (Throwable th) {
            this.distributedLock.unlock(replace);
            throw th;
        }
    }

    private <T> T parseMessageType(String str, Class<T> cls) {
        return (T) JSONUtil.parseObject(str, (Class) cls);
    }

    private MethodParameter getMethodParameter() {
        Class<?> cls;
        Class ultimateTargetClass = AopProxyUtils.ultimateTargetClass(this.delayMQConsumerListener);
        Type type = this.messageType;
        if (type instanceof ParameterizedType) {
            cls = (Class) ((ParameterizedType) type).getRawType();
        } else {
            if (!(type instanceof Class)) {
                throw new RuntimeException("parameterType:" + type + " of onMessage method is not supported");
            }
            cls = (Class) type;
        }
        try {
            return new MethodParameter(ultimateTargetClass.getMethod("onMessage", cls, String.class), 0);
        } catch (NoSuchMethodException e) {
            e.printStackTrace();
            throw new RuntimeException("parameterType:" + type + " of onMessage method is not supported");
        }
    }

    private Type getMessageType() {
        Class ultimateTargetClass = AopProxyUtils.ultimateTargetClass(this.delayMQConsumerListener);
        Type type = null;
        while (true) {
            if (!Objects.nonNull(ultimateTargetClass)) {
                break;
            }
            Type genericSuperclass = ultimateTargetClass.getGenericSuperclass();
            if (Objects.nonNull(genericSuperclass) && (genericSuperclass instanceof ParameterizedType) && Objects.equals(((ParameterizedType) genericSuperclass).getRawType(), AbstractDelayMQConsumerListener.class)) {
                type = genericSuperclass;
                break;
            }
            ultimateTargetClass = ultimateTargetClass.getSuperclass();
        }
        if (!Objects.isNull(type)) {
            Type[] actualTypeArguments = ((ParameterizedType) type).getActualTypeArguments();
            if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
                return actualTypeArguments[0];
            }
        }
        Class ultimateTargetClass2 = AopProxyUtils.ultimateTargetClass(this.delayMQConsumerListener);
        Type[] genericInterfaces = ultimateTargetClass2.getGenericInterfaces();
        Class superclass = ultimateTargetClass2.getSuperclass();
        while (true) {
            Class cls = superclass;
            if ((Objects.isNull(genericInterfaces) || 0 == genericInterfaces.length) && Objects.nonNull(cls)) {
                genericInterfaces = cls.getGenericInterfaces();
                superclass = ultimateTargetClass2.getSuperclass();
            }
        }
        if (!Objects.nonNull(genericInterfaces)) {
            return Object.class;
        }
        for (Type type2 : genericInterfaces) {
            if (type2 instanceof ParameterizedType) {
                ParameterizedType parameterizedType = (ParameterizedType) type2;
                if (Objects.equals(parameterizedType.getRawType(), DelayMQLocalTransactionListener.class)) {
                    Type[] actualTypeArguments2 = parameterizedType.getActualTypeArguments();
                    return (!Objects.nonNull(actualTypeArguments2) || actualTypeArguments2.length <= 0) ? Object.class : (Class) actualTypeArguments2[0];
                }
            }
        }
        return Object.class;
    }

    private void updateMessage(String str, Message message) {
        this.messageOperate.persistentMessage(str, message);
    }

    private void updateConsumerMsgData(String str, ConsumerMsg consumerMsg, ConsumerStatus consumerStatus) {
        consumerMsg.setConsumerStatus(consumerStatus.getDescription());
        if (ConsumerStatus.SUCCESS.equals(consumerStatus) || ConsumerStatus.COMPLETE_FAIL.equals(consumerStatus)) {
            consumerMsg.setRetryNextTime(null);
            consumerMsg.setConsumerTime(new Date());
        } else {
            consumerMsg.setRetryCount(consumerMsg.getRetryCount() + 1);
            consumerMsg.setRetryNextTime(new Date(System.currentTimeMillis() + (this.retryDelayTime * DateNumUtil.SECOND.longValue())));
        }
        this.messageOperate.persistentConsumerMsgData(str, this.consumerGroup, consumerMsg);
    }

    private void fileAppendConsumerMsgData(ConsumerMsg consumerMsg) throws IOException {
    }

    public void setRetryDelayTime(int i) {
        this.retryDelayTime = i;
    }

    public void setRetryCount(int i) {
        this.retryCount = i;
    }
}
