package com.github.joekerouac.async.task.service;

import com.github.joekerouac.async.task.AsyncTaskService;
import com.github.joekerouac.async.task.Const;
import com.github.joekerouac.async.task.db.TransUtil;
import com.github.joekerouac.async.task.entity.AsyncTask;
import com.github.joekerouac.async.task.impl.AsyncTaskRepositoryImpl;
import com.github.joekerouac.async.task.impl.MonitorServiceWrapper;
import com.github.joekerouac.async.task.model.AsyncServiceConfig;
import com.github.joekerouac.async.task.model.ExecStatus;
import com.github.joekerouac.async.task.model.TaskFinishCode;
import com.github.joekerouac.async.task.model.TransStrategy;
import com.github.joekerouac.async.task.spi.AbstractAsyncTaskProcessor;
import com.github.joekerouac.async.task.spi.AsyncTaskRepository;
import com.github.joekerouac.async.task.spi.IDGenerator;
import com.github.joekerouac.async.task.spi.MonitorService;
import com.github.joekerouac.async.task.spi.TransactionCallback;
import com.github.joekerouac.async.task.spi.TransactionHook;
import com.github.joekerouac.common.tools.constant.ExceptionProviderConst;
import com.github.joekerouac.common.tools.log.Logger;
import com.github.joekerouac.common.tools.log.LoggerFactory;
import com.github.joekerouac.common.tools.string.StringUtils;
import com.github.joekerouac.common.tools.util.Assert;
import java.time.LocalDateTime;
import java.util.Collections;
import javax.validation.constraints.NotNull;

/* loaded from: input_file:com/github/joekerouac/async/task/service/AsyncTaskServiceImpl.class */
public class AsyncTaskServiceImpl implements AsyncTaskService {
    private static final Logger LOGGER = LoggerFactory.getLogger(AsyncTaskServiceImpl.class.getName());
    private final AsyncServiceConfig config;
    private final AsyncTaskProcessorEngine engine;
    private volatile boolean start = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.github.joekerouac.async.task.service.AsyncTaskServiceImpl$2, reason: invalid class name */
    /* loaded from: input_file:com/github/joekerouac/async/task/service/AsyncTaskServiceImpl$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$com$github$joekerouac$async$task$model$TransStrategy = new int[TransStrategy.values().length];

        static {
            try {
                $SwitchMap$com$github$joekerouac$async$task$model$TransStrategy[TransStrategy.REQUIRED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$github$joekerouac$async$task$model$TransStrategy[TransStrategy.SUPPORTS.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$github$joekerouac$async$task$model$TransStrategy[TransStrategy.MANDATORY.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$github$joekerouac$async$task$model$TransStrategy[TransStrategy.REQUIRES_NEW.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$com$github$joekerouac$async$task$model$TransStrategy[TransStrategy.NOT_SUPPORTED.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$com$github$joekerouac$async$task$model$TransStrategy[TransStrategy.NEVER.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
        }
    }

    public AsyncTaskServiceImpl(@NotNull AsyncServiceConfig asyncServiceConfig) {
        Assert.notNull(asyncServiceConfig, "config不能为null", ExceptionProviderConst.IllegalArgumentExceptionProvider);
        Assert.assertTrue((asyncServiceConfig.getRepository() == null && asyncServiceConfig.getConnectionSelector() == null) ? false : true, "仓储服务repository和链接选择器connectionSelector不能同时为空", ExceptionProviderConst.IllegalArgumentExceptionProvider);
        Const.VALIDATION_SERVICE.validate(asyncServiceConfig);
        int cacheQueueSize = asyncServiceConfig.getCacheQueueSize();
        int loadThreshold = asyncServiceConfig.getLoadThreshold();
        Assert.assertTrue(loadThreshold < cacheQueueSize || (loadThreshold == 0 && cacheQueueSize == 0), StringUtils.format("触发捞取任务的队列长度阈值应该小于缓存队列的长度，当前触发捞取任务的队列长度为：[{}],当前缓存队列长度为：[{}]", new Object[]{Integer.valueOf(loadThreshold), Integer.valueOf(cacheQueueSize)}), ExceptionProviderConst.IllegalArgumentExceptionProvider);
        MonitorService monitorService = asyncServiceConfig.getMonitorService();
        monitorService = monitorService instanceof MonitorServiceWrapper ? monitorService : new MonitorServiceWrapper(monitorService);
        AsyncTaskRepository repository = asyncServiceConfig.getRepository();
        AsyncTaskRepository asyncTaskRepositoryImpl = repository != null ? repository : new AsyncTaskRepositoryImpl(asyncServiceConfig.getConnectionSelector());
        AsyncServiceConfig asyncServiceConfig2 = new AsyncServiceConfig();
        asyncServiceConfig2.setRepository(asyncTaskRepositoryImpl);
        asyncServiceConfig2.setConnectionSelector(asyncServiceConfig.getConnectionSelector());
        asyncServiceConfig2.setCacheQueueSize(asyncServiceConfig.getCacheQueueSize());
        asyncServiceConfig2.setLoadThreshold(asyncServiceConfig.getLoadThreshold());
        asyncServiceConfig2.setLoadInterval(asyncServiceConfig.getLoadInterval());
        asyncServiceConfig2.setMonitorInterval(asyncServiceConfig.getMonitorInterval());
        asyncServiceConfig2.setThreadPoolConfig(asyncServiceConfig.getThreadPoolConfig());
        asyncServiceConfig2.setIdGenerator(asyncServiceConfig.getIdGenerator());
        asyncServiceConfig2.setProcessors(asyncServiceConfig.getProcessors());
        asyncServiceConfig2.setTransactionHook(asyncServiceConfig.getTransactionHook());
        asyncServiceConfig2.setMonitorService(monitorService);
        this.config = asyncServiceConfig2;
        this.engine = new AsyncTaskProcessorEngine(asyncServiceConfig2);
        if (asyncServiceConfig.isAutoClear()) {
            Thread thread = new Thread(new TaskClearRunner(asyncServiceConfig.getRepository(), asyncServiceConfig.getFinishTaskReserve()), StringUtils.format("异步任务自动清理线程，清理执行完成超过 [{}] 小时的任务", new Object[]{Integer.valueOf(asyncServiceConfig.getFinishTaskReserve())}));
            thread.setDaemon(true);
            thread.start();
        }
    }

    @Override // com.github.joekerouac.async.task.AsyncTaskService
    public void start() {
        synchronized (this.config) {
            if (this.start) {
                LOGGER.warn("当前异步任务服务已经启动，请勿重复调用启动方法", new Object[0]);
            } else {
                this.engine.start();
                this.start = true;
            }
        }
    }

    @Override // com.github.joekerouac.async.task.AsyncTaskService
    public void stop() {
        synchronized (this.config) {
            if (this.start) {
                this.engine.stop();
                this.start = false;
            } else {
                LOGGER.warn("当前异步任务服务已经关闭，请勿重复调用关闭方法", new Object[0]);
            }
        }
    }

    @Override // com.github.joekerouac.async.task.AsyncTaskService
    public void addProcessor(AbstractAsyncTaskProcessor<?> abstractAsyncTaskProcessor) {
        this.engine.addProcessor(abstractAsyncTaskProcessor);
    }

    @Override // com.github.joekerouac.async.task.AsyncTaskService
    public <T, P extends AbstractAsyncTaskProcessor<T>> P removeProcessor(String str) {
        return (P) this.engine.removeProcessor(str);
    }

    @Override // com.github.joekerouac.async.task.AsyncTaskService
    public <T, P extends AbstractAsyncTaskProcessor<T>> P getProcessor(String str) {
        return (P) this.engine.getProcessor(str);
    }

    @Override // com.github.joekerouac.async.task.AsyncTaskService
    public void addTask(String str, Object obj, int i, LocalDateTime localDateTime, String str2, TransStrategy transStrategy) {
        addTaskInternal(str, obj, i, localDateTime, str2, transStrategy, ExecStatus.READY);
    }

    @Override // com.github.joekerouac.async.task.AsyncTaskService
    public void addTaskWithWait(String str, Object obj, int i, LocalDateTime localDateTime, String str2, TransStrategy transStrategy) {
        addTaskInternal(str, obj, i, localDateTime, str2, transStrategy, ExecStatus.WAIT);
    }

    @Override // com.github.joekerouac.async.task.AsyncTaskService
    public void notifyTask(String str, TransStrategy transStrategy) {
        AsyncTask selectByRequestId = this.config.getRepository().selectByRequestId(str);
        if (selectByRequestId == null || selectByRequestId.getStatus() != ExecStatus.WAIT) {
            return;
        }
        TransUtil.run(transStrategy, () -> {
            if (this.config.getRepository().casUpdate(str, ExecStatus.WAIT, ExecStatus.READY, Const.IP) > 0) {
                selectByRequestId.setStatus(ExecStatus.READY);
                addTaskToEngine(selectByRequestId, transStrategy);
            }
        });
    }

    private void addTaskInternal(String str, Object obj, int i, LocalDateTime localDateTime, String str2, TransStrategy transStrategy, ExecStatus execStatus) {
        Assert.assertTrue(this.start, "当前服务还未启动，请先启动后调用", ExceptionProviderConst.IllegalStateExceptionProvider);
        AbstractAsyncTaskProcessor processor = this.engine.getProcessor(str2);
        Assert.notNull(processor, StringUtils.format("指定的任务处理器 [{}] 不存在", new Object[]{str2}), ExceptionProviderConst.IllegalArgumentExceptionProvider);
        IDGenerator idGenerator = this.config.getIdGenerator();
        String generateId = idGenerator.generateId();
        Assert.notBlank(generateId, StringUtils.format("ID生成器 [{}] 生成的ID为空", new Object[]{idGenerator}), ExceptionProviderConst.IllegalStateExceptionProvider);
        String serialize = processor.serialize(obj);
        AsyncTaskRepository repository = this.config.getRepository();
        AsyncTask asyncTask = new AsyncTask();
        asyncTask.setId(generateId);
        asyncTask.setRequestId(str);
        asyncTask.setTask(serialize);
        asyncTask.setMaxRetry(i);
        asyncTask.setExecTime(localDateTime);
        asyncTask.setProcessor(str2);
        asyncTask.setRetry(0);
        asyncTask.setStatus(execStatus);
        asyncTask.setTaskFinishCode(TaskFinishCode.NONE);
        asyncTask.setCreateIp(Const.IP);
        asyncTask.setExecIp(Const.IP);
        TransUtil.run(transStrategy, () -> {
            if (repository.save(asyncTask)) {
                addTaskToEngine(asyncTask, transStrategy);
            } else {
                this.config.getMonitorService().duplicateTask(str, obj);
            }
        });
    }

    private void addTaskToEngine(AsyncTask asyncTask, TransStrategy transStrategy) {
        boolean z;
        TransactionHook transactionHook = this.config.getTransactionHook();
        final Runnable runnable = () -> {
            this.engine.addTask(Collections.singletonList(asyncTask));
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("将任务[{}]添加到内存队列中", new Object[]{asyncTask});
            }
        };
        if (transactionHook == null || !transactionHook.isActualTransactionActive()) {
            runnable.run();
            return;
        }
        switch (AnonymousClass2.$SwitchMap$com$github$joekerouac$async$task$model$TransStrategy[transStrategy.ordinal()]) {
            case TransactionCallback.STATUS_ROLLED_BACK /* 1 */:
            case TransactionCallback.STATUS_UNKNOWN /* 2 */:
            case 3:
                z = true;
                break;
            case 4:
            case 5:
            case AsyncTaskService.MAX_RETRY /* 6 */:
                z = false;
                break;
            default:
                throw new UnsupportedOperationException(StringUtils.format("不支持的事务策略：[{}]", new Object[]{transStrategy}));
        }
        if (z) {
            transactionHook.registerCallback(new TransactionCallback() { // from class: com.github.joekerouac.async.task.service.AsyncTaskServiceImpl.1
                @Override // com.github.joekerouac.async.task.spi.TransactionCallback
                public void afterCommit() throws RuntimeException {
                    runnable.run();
                }
            });
        } else {
            runnable.run();
        }
    }
}
