package com.github.joekerouac.async.task.service;

import com.github.joekerouac.async.task.Const;
import com.github.joekerouac.async.task.entity.AsyncTask;
import com.github.joekerouac.async.task.model.AsyncServiceConfig;
import com.github.joekerouac.async.task.model.AsyncThreadPoolConfig;
import com.github.joekerouac.async.task.model.ExecResult;
import com.github.joekerouac.async.task.model.ExecStatus;
import com.github.joekerouac.async.task.model.TaskFinishCode;
import com.github.joekerouac.async.task.spi.AbstractAsyncTaskProcessor;
import com.github.joekerouac.async.task.spi.AsyncTaskRepository;
import com.github.joekerouac.async.task.spi.MonitorService;
import com.github.joekerouac.async.task.spi.TransactionCallback;
import com.github.joekerouac.common.tools.collection.CollectionUtil;
import com.github.joekerouac.common.tools.collection.Pair;
import com.github.joekerouac.common.tools.constant.ExceptionProviderConst;
import com.github.joekerouac.common.tools.lock.LockTaskUtil;
import com.github.joekerouac.common.tools.log.Logger;
import com.github.joekerouac.common.tools.log.LoggerFactory;
import com.github.joekerouac.common.tools.scheduler.SchedulerTask;
import com.github.joekerouac.common.tools.scheduler.SimpleSchedulerTask;
import com.github.joekerouac.common.tools.string.StringUtils;
import com.github.joekerouac.common.tools.util.Assert;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

/* loaded from: input_file:com/github/joekerouac/async/task/service/AsyncTaskProcessorEngine.class */
class AsyncTaskProcessorEngine {
    private static final Logger LOGGER = LoggerFactory.getLogger(AsyncTaskProcessorEngine.class.getName());
    private static final String DEFAULT_THREAD_NAME = "async-worker";
    private static final int MAX_TIME = 300;
    private final AsyncServiceConfig config;
    private SchedulerTask loadTask;
    private volatile long lastEmptyLoad;
    private Thread[] workerThreads;
    private volatile boolean start = false;
    private final Map<String, AbstractAsyncTaskProcessor<?>> processors = new ConcurrentHashMap();
    private final NavigableSet<Pair<String, LocalDateTime>> queue = new TreeSet((pair, pair2) -> {
        return (int) (((LocalDateTime) pair.getValue()).atZone(ZoneOffset.systemDefault()).toInstant().toEpochMilli() - ((LocalDateTime) pair2.getValue()).atZone(ZoneOffset.systemDefault()).toInstant().toEpochMilli());
    });
    private final ReadWriteLock queueLock = new ReentrantReadWriteLock();
    private final Condition condition = this.queueLock.writeLock().newCondition();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.github.joekerouac.async.task.service.AsyncTaskProcessorEngine$1, reason: invalid class name */
    /* loaded from: input_file:com/github/joekerouac/async/task/service/AsyncTaskProcessorEngine$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$github$joekerouac$async$task$model$ExecResult = new int[ExecResult.values().length];

        static {
            try {
                $SwitchMap$com$github$joekerouac$async$task$model$ExecResult[ExecResult.SUCCESS.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$github$joekerouac$async$task$model$ExecResult[ExecResult.WAIT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$github$joekerouac$async$task$model$ExecResult[ExecResult.RETRY.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$github$joekerouac$async$task$model$ExecResult[ExecResult.ERROR.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    public AsyncTaskProcessorEngine(AsyncServiceConfig asyncServiceConfig) {
        this.config = asyncServiceConfig;
        if (asyncServiceConfig.getProcessors() == null || asyncServiceConfig.getProcessors().isEmpty()) {
            return;
        }
        Iterator<AbstractAsyncTaskProcessor<?>> it = asyncServiceConfig.getProcessors().iterator();
        while (it.hasNext()) {
            addProcessor(it.next());
        }
    }

    public void addProcessor(AbstractAsyncTaskProcessor<?> abstractAsyncTaskProcessor) {
        Assert.notNull(abstractAsyncTaskProcessor, "待添加的异步任务处理器不能为空", ExceptionProviderConst.IllegalArgumentExceptionProvider);
        Assert.assertTrue(!CollectionUtil.isEmpty(abstractAsyncTaskProcessor.processors()), StringUtils.format("处理器可以处理的任务类型不能为空， [{}]", new Object[]{abstractAsyncTaskProcessor}), ExceptionProviderConst.IllegalArgumentExceptionProvider);
        for (String str : abstractAsyncTaskProcessor.processors()) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("注册处理器 [{}:{}]", new Object[]{str, abstractAsyncTaskProcessor});
            }
            AbstractAsyncTaskProcessor<?> put = this.processors.put(str, abstractAsyncTaskProcessor);
            if (put != null) {
                LOGGER.warn("异步任务处理器[{}]发生变更，使用 [{}] 替换 [{}]", new Object[]{str, abstractAsyncTaskProcessor, put});
            }
        }
    }

    public <T, P extends AbstractAsyncTaskProcessor<T>> P removeProcessor(String str) {
        return (P) this.processors.remove(str);
    }

    public <T, P extends AbstractAsyncTaskProcessor<T>> P getProcessor(String str) {
        return (P) this.processors.get(str);
    }

    public void addTask(Collection<AsyncTask> collection) {
        if (collection == null || collection.isEmpty()) {
            return;
        }
        LockTaskUtil.runWithLock(this.queueLock.writeLock(), () -> {
            LocalDateTime localDateTime = this.queue.isEmpty() ? null : (LocalDateTime) this.queue.first().getValue();
            Iterator it = collection.iterator();
            while (it.hasNext()) {
                AsyncTask asyncTask = (AsyncTask) it.next();
                if (!this.queue.add(new Pair(asyncTask.getRequestId(), asyncTask.getExecTime()))) {
                    LOGGER.info("任务 [{}] 已经在队列中了，忽略该任务", new Object[]{asyncTask});
                }
            }
            while (this.queue.size() - this.config.getCacheQueueSize() > 0) {
                this.queue.pollLast();
            }
            if (localDateTime == null) {
                this.condition.signalAll();
            } else if (((LocalDateTime) this.queue.first().getValue()).isBefore(localDateTime)) {
                this.condition.signalAll();
            }
        });
    }

    public synchronized void start() {
        LOGGER.info("异步任务引擎准备启动...", new Object[0]);
        this.start = true;
        AsyncTaskRepository repository = this.config.getRepository();
        this.loadTask = new SimpleSchedulerTask(() -> {
            LocalDateTime now = LocalDateTime.now();
            long currentTimeMillis = System.currentTimeMillis() - this.lastEmptyLoad;
            if (currentTimeMillis < this.config.getLoadInterval()) {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("当前距离上次空捞取的时间间隔为 [{}ms] ，小于系统配置的最小空捞取间隔 [{}ms]，跳过", new Object[]{Long.valueOf(currentTimeMillis), Long.valueOf(this.config.getLoadInterval())});
                    return;
                }
                return;
            }
            List list = (List) LockTaskUtil.runWithLock(this.queueLock.readLock(), () -> {
                return (List) this.queue.stream().map((v0) -> {
                    return v0.getKey();
                }).collect(Collectors.toList());
            });
            int cacheQueueSize = this.config.getCacheQueueSize();
            List<AsyncTask> selectPage = repository.selectPage(ExecStatus.READY, now.plusSeconds(300L), list, 0, Math.min(((cacheQueueSize - list.size()) * 2) + 5, cacheQueueSize));
            if (selectPage.isEmpty()) {
                this.lastEmptyLoad = System.currentTimeMillis();
            } else {
                addTask(selectPage);
            }
        }, "task-load", true);
        this.loadTask.setFixedDelay(this.config.getLoadInterval());
        this.loadTask.start();
        Thread thread = new Thread(() -> {
            while (this.start) {
                try {
                    Thread.sleep(this.config.getMonitorInterval());
                    LockTaskUtil.runWithLock(this.queueLock.readLock(), () -> {
                        this.config.getMonitorService().monitor(this.queue.size());
                    });
                } catch (Throwable th) {
                    if (!(th instanceof InterruptedException)) {
                        LOGGER.info(th, "监听线程异常", new Object[0]);
                    }
                }
            }
        }, "monitor");
        thread.setDaemon(true);
        thread.start();
        AsyncThreadPoolConfig threadPoolConfig = this.config.getThreadPoolConfig();
        this.workerThreads = new Thread[threadPoolConfig.getCorePoolSize()];
        for (int i = 0; i < this.workerThreads.length; i++) {
            Thread thread2 = new Thread(() -> {
                Thread currentThread = Thread.currentThread();
                ClassLoader classLoader = threadPoolConfig.getDefaultContextClassLoader() == null ? AsyncTaskProcessorEngine.class.getClassLoader() : threadPoolConfig.getDefaultContextClassLoader();
                while (this.start) {
                    currentThread.setContextClassLoader(classLoader);
                    try {
                        scheduler();
                    } catch (InterruptedException e) {
                        if (LOGGER.isDebugEnabled()) {
                            LOGGER.debug("异步任务工作线程收到中断消息，忽略该消息", new Object[0]);
                        }
                    } catch (Throwable th) {
                        this.config.getMonitorService().uncaughtException(currentThread, th);
                    }
                }
            }, StringUtils.getOrDefault(threadPoolConfig.getThreadName(), DEFAULT_THREAD_NAME) + "-" + i);
            thread2.setDaemon(false);
            thread2.start();
            this.workerThreads[i] = thread2;
        }
        LOGGER.info("异步任务引擎启动成功...", new Object[0]);
    }

    public synchronized void stop() {
        LOGGER.info("异步任务引擎准备关闭...", new Object[0]);
        this.start = false;
        this.loadTask.stop();
        for (Thread thread : this.workerThreads) {
            thread.interrupt();
        }
        Lock writeLock = this.queueLock.writeLock();
        NavigableSet<Pair<String, LocalDateTime>> navigableSet = this.queue;
        navigableSet.getClass();
        LockTaskUtil.runWithLock(writeLock, navigableSet::clear);
        LOGGER.info("异步任务引擎关闭成功...", new Object[0]);
    }

    private void scheduler() throws InterruptedException {
        ExecResult execResult;
        AsyncTaskRepository repository = this.config.getRepository();
        MonitorService monitorService = this.config.getMonitorService();
        String take = take();
        if (take == null) {
            LOGGER.info("系统关闭，停止调度", new Object[0]);
            return;
        }
        for (int casUpdate = repository.casUpdate(take, ExecStatus.READY, ExecStatus.RUNNING, Const.IP); casUpdate <= 0; casUpdate = repository.casUpdate(take, ExecStatus.READY, ExecStatus.RUNNING, Const.IP)) {
            AsyncTask selectByRequestId = repository.selectByRequestId(take);
            if (selectByRequestId.getStatus() != ExecStatus.READY) {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("任务 [{}] 已经在其他机器处理了，无需重复处理", new Object[]{selectByRequestId});
                    return;
                }
                return;
            }
        }
        AsyncTask selectByRequestId2 = repository.selectByRequestId(take);
        LocalDateTime now = LocalDateTime.now();
        if (selectByRequestId2.getExecTime().isAfter(now)) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("任务 [{}] 未到执行时间，不执行，跳过执行, 当前时间：[{}]", new Object[]{selectByRequestId2, now});
            }
            selectByRequestId2.setStatus(ExecStatus.READY);
            repository.update(take, ExecStatus.READY, null, null, null, null);
            addTask(Collections.singletonList(selectByRequestId2));
            return;
        }
        AbstractAsyncTaskProcessor<?> abstractAsyncTaskProcessor = this.processors.get(selectByRequestId2.getProcessor());
        String requestId = selectByRequestId2.getRequestId();
        if (abstractAsyncTaskProcessor == null) {
            monitorService.noProcessor(requestId, selectByRequestId2.getTask(), selectByRequestId2.getProcessor());
            repository.update(take, ExecStatus.FINISH, TaskFinishCode.NO_PROCESSOR, null, null, Const.IP);
            return;
        }
        HashMap hashMap = new HashMap();
        try {
            Object deserialize = abstractAsyncTaskProcessor.deserialize(requestId, selectByRequestId2.getTask(), hashMap);
            Throwable th = null;
            try {
                ExecResult process = abstractAsyncTaskProcessor.process(requestId, deserialize, hashMap);
                execResult = process == null ? ExecResult.SUCCESS : process;
            } catch (Throwable th2) {
                execResult = ExecResult.RETRY;
                th = th2;
            }
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(th, "任务执行结果：[{}:{}:{}]", new Object[]{requestId, execResult, deserialize});
            }
            switch (AnonymousClass1.$SwitchMap$com$github$joekerouac$async$task$model$ExecResult[execResult.ordinal()]) {
                case TransactionCallback.STATUS_ROLLED_BACK /* 1 */:
                    finishTask(repository, abstractAsyncTaskProcessor, requestId, deserialize, TaskFinishCode.SUCCESS, null, hashMap);
                    return;
                case TransactionCallback.STATUS_UNKNOWN /* 2 */:
                    repository.update(requestId, ExecStatus.WAIT, null, null, null, Const.IP);
                    return;
                case 3:
                    int retry = selectByRequestId2.getRetry() + 1;
                    boolean z = retry > selectByRequestId2.getMaxRetry();
                    if (z || !(th == null || abstractAsyncTaskProcessor.canRetry(requestId, deserialize, th, hashMap))) {
                        if (LOGGER.isDebugEnabled()) {
                            LOGGER.debug(th, "任务不可重试, [{}:{}:{}]", new Object[]{requestId, Boolean.valueOf(z), deserialize});
                        }
                        TaskFinishCode taskFinishCode = z ? TaskFinishCode.RETRY_OVERFLOW : TaskFinishCode.CANNOT_RETRY;
                        monitorService.processError(requestId, taskFinishCode, deserialize, abstractAsyncTaskProcessor, th);
                        finishTask(repository, abstractAsyncTaskProcessor, requestId, deserialize, taskFinishCode, th, hashMap);
                        return;
                    }
                    LocalDateTime plus = LocalDateTime.now().plus(Math.max(abstractAsyncTaskProcessor.nextExecTimeInterval(requestId, retry, deserialize, hashMap), 0L), (TemporalUnit) ChronoUnit.MILLIS);
                    selectByRequestId2.setStatus(ExecStatus.READY);
                    selectByRequestId2.setExecTime(plus);
                    selectByRequestId2.setRetry(retry);
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug(th, "任务重试, [{}:{}:{}]", new Object[]{requestId, plus, deserialize});
                    }
                    monitorService.processRetry(requestId, deserialize, abstractAsyncTaskProcessor, th, plus);
                    repository.update(take, ExecStatus.READY, null, plus, Integer.valueOf(retry), Const.IP);
                    addTask(Collections.singletonList(selectByRequestId2));
                    return;
                case 4:
                    finishTask(repository, abstractAsyncTaskProcessor, requestId, deserialize, TaskFinishCode.USER_ERROR, null, hashMap);
                    return;
                default:
                    throw new IllegalStateException(StringUtils.format("不支持的结果状态： [{}]", new Object[]{execResult}));
            }
        } catch (Throwable th3) {
            monitorService.deserializationError(requestId, selectByRequestId2.getTask(), abstractAsyncTaskProcessor, th3);
            repository.update(take, ExecStatus.FINISH, TaskFinishCode.DESERIALIZATION_ERROR, null, null, Const.IP);
        }
    }

    private void finishTask(AsyncTaskRepository asyncTaskRepository, AbstractAsyncTaskProcessor<Object> abstractAsyncTaskProcessor, String str, Object obj, TaskFinishCode taskFinishCode, Throwable th, Map<String, Object> map) {
        try {
            abstractAsyncTaskProcessor.afterProcess(str, obj, taskFinishCode, th, map);
            asyncTaskRepository.update(str, ExecStatus.FINISH, taskFinishCode, null, null, Const.IP);
        } catch (Error | RuntimeException e) {
            LOGGER.warn(e, "任务 [{}:{}:{}] 的回调执行异常，该异常将导致异步任务被重新执行", new Object[]{str, taskFinishCode, obj});
            throw e;
        }
    }

    private String take() {
        return (String) LockTaskUtil.runWithLock(this.queueLock.writeLock(), () -> {
            while (this.start) {
                long j = 5000;
                if (!this.queue.isEmpty()) {
                    Pair<String, LocalDateTime> first = this.queue.first();
                    j = ChronoUnit.MILLIS.between(LocalDateTime.now(), (LocalDateTime) first.getValue());
                    if (j <= 0) {
                        this.queue.pollFirst();
                        if (this.queue.size() < this.config.getLoadThreshold()) {
                            this.loadTask.scheduler();
                        }
                        return (String) first.getKey();
                    }
                }
                try {
                    if (!this.condition.await(j, TimeUnit.MILLISECONDS) && LOGGER.isDebugEnabled()) {
                        LOGGER.debug("唤醒等待超时，自动唤醒检查", new Object[0]);
                    }
                } catch (InterruptedException e) {
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("队列取任务线程等待被打断", new Object[0]);
                    }
                }
            }
            return null;
        });
    }
}
