package com.github.kancyframework.delay.message.scheduler;

import com.github.kancyframework.delay.message.cache.DelayMessageConfigCache;
import com.github.kancyframework.delay.message.config.DelayMessageTaskExecutor;
import com.github.kancyframework.delay.message.interceptor.DelayMessageSchedulerInterceptor;
import com.github.kancyframework.delay.message.message.MessageStatus;
import com.github.kancyframework.delay.message.scheduler.handler.DelayMessageHandler;
import com.github.kancyframework.delay.message.scheduler.properties.DelayMessageSchedulerProperties;
import com.github.kancyframework.delay.message.service.DelayMessage;
import com.github.kancyframework.delay.message.service.DelayMessageConfig;
import com.github.kancyframework.delay.message.service.DelayMessageConfigService;
import com.github.kancyframework.delay.message.service.DelayMessageService;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:com/github/kancyframework/delay/message/scheduler/DelayMessageSchedulerImpl.class */
public class DelayMessageSchedulerImpl implements DelayMessageScheduler {

    @Autowired(required = false)
    private List<DelayMessageSchedulerInterceptor> interceptors = Collections.emptyList();

    @Autowired
    private DelayMessageSchedulerProperties schedulerProperties;

    @Autowired
    private DelayMessageScheduleOptimizer optimizer;

    @Resource
    private DelayMessageHandler delayMessageHandler;

    @Autowired
    private DelayMessageTaskExecutor delayMessageTaskExecutor;

    @Autowired
    private DelayMessageConfigCache delayMessageConfigCache;

    @Autowired
    private DelayMessageConfigService delayMessageConfigService;

    @Autowired
    private DelayMessageService delayMessageService;
    private static final Logger log = LoggerFactory.getLogger(DelayMessageSchedulerImpl.class);
    private static final AtomicInteger SCHEDULER_COUNT = new AtomicInteger(0);
    private static final Map<String, AtomicBoolean> SCHEDULER_EXEC_STATUS_MAP = new HashMap();

    @Override // com.github.kancyframework.delay.message.scheduler.DelayMessageScheduler
    public int schedule(String str) {
        return schedule(str, this.schedulerProperties.getMaxPrefetchSize());
    }

    @Override // com.github.kancyframework.delay.message.scheduler.DelayMessageScheduler
    public int schedule(String str, int i) {
        return schedule(str, i, null);
    }

    @Override // com.github.kancyframework.delay.message.scheduler.DelayMessageScheduler
    public int schedule(String str, int i, Date date) {
        return schedule(str, i, date, this.delayMessageHandler);
    }

    private int schedule(String str, int i, @Nullable Date date, DelayMessageHandler delayMessageHandler) {
        AtomicBoolean computeIfAbsent;
        synchronized (SCHEDULER_EXEC_STATUS_MAP) {
            computeIfAbsent = SCHEDULER_EXEC_STATUS_MAP.computeIfAbsent(str, str2 -> {
                return new AtomicBoolean(false);
            });
        }
        if (computeIfAbsent.compareAndSet(false, true)) {
            return canSchedule(str, i, date, delayMessageHandler, computeIfAbsent);
        }
        log.warn("delay message table {} scheduler will give up schedule , because the last task [{}] has not been completed！", str, Integer.valueOf(i));
        return 0;
    }

    private int canSchedule(String str, int i, @Nullable Date date, DelayMessageHandler delayMessageHandler, AtomicBoolean atomicBoolean) {
        int i2 = 0;
        try {
            this.interceptors.forEach(delayMessageSchedulerInterceptor -> {
                delayMessageSchedulerInterceptor.scheduleBefore(str, i);
            });
            i2 = doSchedule(str, i, date, delayMessageHandler);
            SCHEDULER_COUNT.decrementAndGet();
            atomicBoolean.set(false);
            Iterator<DelayMessageSchedulerInterceptor> it = this.interceptors.iterator();
            while (it.hasNext()) {
                it.next().scheduleCompleted(str, i, i2);
            }
            return i2;
        } catch (Throwable th) {
            SCHEDULER_COUNT.decrementAndGet();
            atomicBoolean.set(false);
            Iterator<DelayMessageSchedulerInterceptor> it2 = this.interceptors.iterator();
            while (it2.hasNext()) {
                it2.next().scheduleCompleted(str, i, i2);
            }
            throw th;
        }
    }

    private int doSchedule(String str, int i, @Nullable Date date, DelayMessageHandler delayMessageHandler) {
        long currentTimeMillis = System.currentTimeMillis();
        boolean z = SCHEDULER_COUNT.incrementAndGet() == 1;
        Assert.notNull(delayMessageHandler, "delayMessageHandler is null.");
        Assert.hasText(str, "tableName is empty.");
        Assert.state(i > 0 && i <= this.schedulerProperties.getMaxPrefetchSize(), "limit range is error.");
        List<DelayMessage> scanExpiredDelayMessages = scanExpiredDelayMessages(str, date, i);
        if (CollectionUtils.isEmpty(scanExpiredDelayMessages)) {
            log.debug("延时消息表[{}]暂无到期消息", str);
            return 0;
        }
        log.info("从延时消息表[{}]中取出到期的消息：{}", str, Integer.valueOf(scanExpiredDelayMessages.size()));
        List queryConfigByMessageKeys = this.delayMessageConfigService.queryConfigByMessageKeys((Set) scanExpiredDelayMessages.stream().map((v0) -> {
            return v0.getMessageKey();
        }).collect(Collectors.toSet()));
        if (CollectionUtils.isEmpty(queryConfigByMessageKeys)) {
            log.error("delay message table [{}] not found any message key.", str);
            scanExpiredDelayMessages.clear();
            return 0;
        }
        this.delayMessageService.batchUpdateOnProcessing(str, (List) scanExpiredDelayMessages.stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        int size = scanExpiredDelayMessages.size();
        Map<String, DelayMessageConfig> map = (Map) queryConfigByMessageKeys.stream().collect(Collectors.toMap((v0) -> {
            return v0.getMessageKey();
        }, delayMessageConfig -> {
            return delayMessageConfig;
        }, (delayMessageConfig2, delayMessageConfig3) -> {
            return delayMessageConfig2;
        }));
        AtomicInteger atomicInteger = new AtomicInteger(0);
        CountDownLatch countDownLatch = new CountDownLatch(scanExpiredDelayMessages.size());
        for (DelayMessage delayMessage : scanExpiredDelayMessages) {
            DelayMessageConfig delayMessageConfig4 = getDelayMessageConfig(map, delayMessage);
            if (Objects.isNull(delayMessageConfig4)) {
                log.error("delay message [{}] config not found : {}", delayMessage.getMessageKey(), delayMessage.getId());
                this.delayMessageService.updateStatus(str, String.valueOf(delayMessage.getId()), MessageStatus.FAIL.ordinal());
                atomicInteger.incrementAndGet();
            } else {
                delayMessageConfig4.setTableName(str);
                if (z || SCHEDULER_COUNT.get() == 1) {
                    this.delayMessageTaskExecutor.execute(() -> {
                        try {
                            handleDelayMessage(delayMessageHandler, delayMessage, delayMessageConfig4, atomicInteger);
                            countDownLatch.countDown();
                        } catch (Throwable th) {
                            countDownLatch.countDown();
                            throw th;
                        }
                    });
                } else {
                    try {
                        handleDelayMessage(delayMessageHandler, delayMessage, delayMessageConfig4, atomicInteger);
                        countDownLatch.countDown();
                    } catch (Throwable th) {
                        countDownLatch.countDown();
                        throw th;
                    }
                }
            }
        }
        try {
            try {
                countDownLatch.await();
                log.info("本次[{}]调度共处理 {} 条延时消息 , 耗时：{} , 其中成功处理：{}， 失败处理：{} ，异步：{}", new Object[]{str, Integer.valueOf(size), Long.valueOf(System.currentTimeMillis() - currentTimeMillis), Integer.valueOf(size - atomicInteger.get()), atomicInteger, Boolean.valueOf(z)});
                Iterator<DelayMessageSchedulerInterceptor> it = this.interceptors.iterator();
                while (it.hasNext()) {
                    it.next().scheduleAfter(str, i, currentTimeMillis, scanExpiredDelayMessages, z, atomicInteger.get());
                }
                scanExpiredDelayMessages.clear();
                queryConfigByMessageKeys.clear();
            } catch (InterruptedException e) {
                log.error(e.getMessage(), e);
                Thread.currentThread().interrupt();
                log.info("本次[{}]调度共处理 {} 条延时消息 , 耗时：{} , 其中成功处理：{}， 失败处理：{} ，异步：{}", new Object[]{str, Integer.valueOf(size), Long.valueOf(System.currentTimeMillis() - currentTimeMillis), Integer.valueOf(size - atomicInteger.get()), atomicInteger, Boolean.valueOf(z)});
                Iterator<DelayMessageSchedulerInterceptor> it2 = this.interceptors.iterator();
                while (it2.hasNext()) {
                    it2.next().scheduleAfter(str, i, currentTimeMillis, scanExpiredDelayMessages, z, atomicInteger.get());
                }
                scanExpiredDelayMessages.clear();
                queryConfigByMessageKeys.clear();
            }
            return size;
        } catch (Throwable th2) {
            log.info("本次[{}]调度共处理 {} 条延时消息 , 耗时：{} , 其中成功处理：{}， 失败处理：{} ，异步：{}", new Object[]{str, Integer.valueOf(size), Long.valueOf(System.currentTimeMillis() - currentTimeMillis), Integer.valueOf(size - atomicInteger.get()), atomicInteger, Boolean.valueOf(z)});
            Iterator<DelayMessageSchedulerInterceptor> it3 = this.interceptors.iterator();
            while (it3.hasNext()) {
                it3.next().scheduleAfter(str, i, currentTimeMillis, scanExpiredDelayMessages, z, atomicInteger.get());
            }
            scanExpiredDelayMessages.clear();
            queryConfigByMessageKeys.clear();
            throw th2;
        }
    }

    private void handleDelayMessage(DelayMessageHandler delayMessageHandler, DelayMessage delayMessage, DelayMessageConfig delayMessageConfig, AtomicInteger atomicInteger) {
        DelayMessage delayMessage2 = null;
        try {
            try {
                String id = delayMessage.getId();
                delayMessage2 = createDelayMessageRef(delayMessage, delayMessageConfig);
                Iterator<DelayMessageSchedulerInterceptor> it = this.interceptors.iterator();
                while (it.hasNext()) {
                    it.next().handleMessageBefore(delayMessage, delayMessageConfig);
                }
                delayMessageHandler.handle(delayMessage2);
                this.delayMessageService.updateStatus(delayMessageConfig.getTableName(), id, MessageStatus.SUCCESS.ordinal());
                log.info("延时消息表[{}]的消息处理成功: {}", delayMessageConfig.getTableName(), id);
                try {
                    Iterator<DelayMessageSchedulerInterceptor> it2 = this.interceptors.iterator();
                    while (it2.hasNext()) {
                        it2.next().handleMessageCompleted(delayMessage, delayMessageConfig, (Exception) null);
                    }
                } catch (Exception e) {
                    log.error(e.getMessage(), e);
                }
            } catch (Throwable th) {
                try {
                    Iterator<DelayMessageSchedulerInterceptor> it3 = this.interceptors.iterator();
                    while (it3.hasNext()) {
                        it3.next().handleMessageCompleted(delayMessage, delayMessageConfig, (Exception) null);
                    }
                } catch (Exception e2) {
                    log.error(e2.getMessage(), e2);
                }
                throw th;
            }
        } catch (Exception e3) {
            atomicInteger.incrementAndGet();
            log.error("handle delay message fail : {} , {}", e3.getMessage(), Objects.isNull(delayMessage2) ? delayMessage : delayMessage2);
            handleConsumeException(delayMessageConfig.getTableName(), delayMessage, delayMessageConfig);
            try {
                Iterator<DelayMessageSchedulerInterceptor> it4 = this.interceptors.iterator();
                while (it4.hasNext()) {
                    it4.next().handleMessageCompleted(delayMessage, delayMessageConfig, e3);
                }
            } catch (Exception e4) {
                log.error(e4.getMessage(), e4);
            }
        }
    }

    private void handleConsumeException(String str, DelayMessage delayMessage, DelayMessageConfig delayMessageConfig) {
        try {
            if (delayMessage.getScanTimes().intValue() >= (delayMessageConfig.getMaxScanTimes() <= 0 ? this.schedulerProperties.getMaxScanTimes() : delayMessageConfig.getMaxScanTimes())) {
                this.delayMessageService.updateStatus(str, delayMessage.getId(), MessageStatus.FAIL.ordinal());
                log.info("更新延时消息表[{}]的消息状态为处理失败：{}", str, delayMessage.getId());
            } else {
                this.delayMessageService.updateStatus(str, delayMessage.getId(), MessageStatus.WAITING.ordinal());
            }
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }

    private DelayMessageConfig getDelayMessageConfig(Map<String, DelayMessageConfig> map, DelayMessage delayMessage) {
        return this.schedulerProperties.isUseConfigCache() ? this.delayMessageConfigCache.queryConfigByMessageKey(delayMessage.getMessageKey()) : map.get(delayMessage.getMessageKey());
    }

    private DelayMessageRef createDelayMessageRef(DelayMessage delayMessage, DelayMessageConfig delayMessageConfig) {
        DelayMessageRef delayMessageRef = new DelayMessageRef();
        if (Objects.nonNull(delayMessage.getPayload())) {
            delayMessageRef.setPayload(delayMessage.getPayload());
            delayMessageRef.setUseCache(true);
        } else {
            delayMessageRef.setPayload(delayMessage.getDataId());
            delayMessageRef.setUseCache(false);
        }
        delayMessageRef.setDelay(delayMessage.getDelay());
        delayMessageRef.setTraceId(delayMessage.getTraceId());
        delayMessageRef.setDelayMessageId(delayMessage.getId());
        delayMessageRef.setTableName(delayMessageConfig.getTableName());
        delayMessageRef.setMessageKey(delayMessage.getMessageKey());
        delayMessageRef.setMessageType(delayMessageConfig.getMessageType());
        delayMessageRef.setDataId(delayMessage.getDataId());
        delayMessageRef.setMaxScanTimes(delayMessageConfig.getMaxScanTimes());
        delayMessageRef.setScanTimes(delayMessage.getScanTimes().intValue());
        delayMessageRef.setNoticeAddress(delayMessageConfig.getNoticeAddress());
        delayMessageRef.setNoticeType(delayMessageConfig.getNoticeType());
        return delayMessageRef;
    }

    private List<DelayMessage> scanExpiredDelayMessages(String str, @Nullable Date date, int i) {
        long minExpireTime = Objects.isNull(date) ? this.optimizer.getMinExpireTime(str) : date.getTime();
        long currentTimeMillis = System.currentTimeMillis();
        this.interceptors.forEach(delayMessageSchedulerInterceptor -> {
            delayMessageSchedulerInterceptor.scheduleBefore(str, i);
        });
        List<DelayMessage> scanExpiredMessage = this.delayMessageService.scanExpiredMessage(str, minExpireTime, i);
        this.interceptors.forEach(delayMessageSchedulerInterceptor2 -> {
            delayMessageSchedulerInterceptor2.scanCompleted(str, i, minExpireTime, currentTimeMillis, scanExpiredMessage);
        });
        return scanExpiredMessage;
    }
}
