package com.github.kancyframework.delay.message.scheduler.xxljob;

import com.github.kancyframework.delay.message.config.DelayMessageTaskExecutor;
import com.github.kancyframework.delay.message.scheduler.DelayMessageScheduler;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.JobHandler;
import com.xxl.job.core.handler.annotation.XxlJob;
import com.xxl.job.core.log.XxlJobLogger;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import net.dreamlu.mica.core.utils.$;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

@JobHandler("DelayMessageTableJob")
/* loaded from: input_file:com/github/kancyframework/delay/message/scheduler/xxljob/DelayMessageTableJob.class */
public class DelayMessageTableJob extends IJobHandler {

    @Autowired
    private DelayMessageTaskExecutor delayMessageTaskExecutor;

    @Autowired
    private DelayMessageScheduler delayMessageScheduler;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/github/kancyframework/delay/message/scheduler/xxljob/DelayMessageTableJob$JobParam.class */
    public static class JobParam {
        private int limit;
        private String table;
        private String minScanExpiredTime;

        JobParam() {
        }

        public static JobParam parse(String str) {
            return (JobParam) $.readJson(str, JobParam.class);
        }

        public Set<String> getTables() {
            checkParam();
            return StringUtils.commaDelimitedListToSet(this.table);
        }

        public void checkParam() {
            Assert.state(this.limit > 0, "param limit must greater than zero.");
            Assert.hasText(this.table, "param table is empty.");
            if (StringUtils.hasText(this.minScanExpiredTime)) {
                SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                simpleDateFormat.setLenient(false);
                try {
                    simpleDateFormat.parse(this.minScanExpiredTime);
                } catch (ParseException e) {
                    throw new IllegalArgumentException("The date format is incorrect, please use the “yyyy-MM-dd HH:mm:ss” format.");
                }
            }
        }

        public int getLimit() {
            return this.limit;
        }

        public void setLimit(int i) {
            this.limit = i;
        }

        public String getTable() {
            return this.table;
        }

        public void setTable(String str) {
            this.table = str;
        }

        public String getMinScanExpiredTime() {
            return this.minScanExpiredTime;
        }

        public void setMinScanExpiredTime(String str) {
            this.minScanExpiredTime = str;
        }

        public Date convertAndGetMinScanExpiredTime() {
            if (StringUtils.hasText(this.minScanExpiredTime)) {
                return $.parseDate(this.minScanExpiredTime, "yyyy-MM-dd HH:mm:ss");
            }
            return null;
        }
    }

    @XxlJob("DelayMessageJob")
    public ReturnT<String> executeJob(String str) throws Exception {
        return execute(str);
    }

    public ReturnT<String> execute(String str) throws Exception {
        XxlJobLogger.log(String.format("接收到[DelayMessageTableJob]的调度指令：%s", str), new Object[0]);
        JobParam parse = JobParam.parse(str);
        Set<String> tables = parse.getTables();
        if (tables.size() > 1) {
            CountDownLatch countDownLatch = new CountDownLatch(tables.size());
            for (String str2 : tables) {
                this.delayMessageTaskExecutor.execute(() -> {
                    try {
                        try {
                            XxlJobLogger.log(String.format("=====>开始消费[%s]表的延时消息", str2), new Object[0]);
                            XxlJobLogger.log(String.format("消费[%s]表的延时消息数量：%s", str2, Integer.valueOf(this.delayMessageScheduler.schedule(str2, parse.getLimit(), parse.convertAndGetMinScanExpiredTime()))), new Object[0]);
                            countDownLatch.countDown();
                            XxlJobLogger.log(String.format("=====>结束消费[%s]表的延时消息", str2), new Object[0]);
                        } catch (Exception e) {
                            XxlJobLogger.log(String.format("消费[%s]表的延时消息失败：%s", str2, e.getMessage()), new Object[0]);
                            countDownLatch.countDown();
                            XxlJobLogger.log(String.format("=====>结束消费[%s]表的延时消息", str2), new Object[0]);
                        }
                    } catch (Throwable th) {
                        countDownLatch.countDown();
                        XxlJobLogger.log(String.format("=====>结束消费[%s]表的延时消息", str2), new Object[0]);
                        throw th;
                    }
                });
            }
            countDownLatch.await();
        } else {
            XxlJobLogger.log(String.format("=====>开始消费[%s]表的延时消息", parse.getTable()), new Object[0]);
            try {
                XxlJobLogger.log(String.format("消费[%s]表的延时消息数量：%s", parse.getTable(), Integer.valueOf(this.delayMessageScheduler.schedule(parse.getTable(), parse.getLimit(), parse.convertAndGetMinScanExpiredTime()))), new Object[0]);
            } catch (Exception e) {
                XxlJobLogger.log(String.format("消费[%s]表的延时消息失败：%s", parse.getTable(), e.getMessage()), new Object[0]);
            }
            XxlJobLogger.log(String.format("=====>结束消费[%s]表的延时消息", parse.getTable()), new Object[0]);
        }
        XxlJobLogger.log("成功执行[DelayMessageTableJob]的调度", new Object[0]);
        return ReturnT.SUCCESS;
    }
}
