/*
 * Decompiled with CFR 0.152.
 */
package cn.ponfee.scheduler.worker.base;

import cn.ponfee.scheduler.common.base.exception.Throwables;
import cn.ponfee.scheduler.common.base.model.Result;
import cn.ponfee.scheduler.common.concurrent.ThreadPoolExecutors;
import cn.ponfee.scheduler.common.concurrent.Threads;
import cn.ponfee.scheduler.common.util.ObjectUtils;
import cn.ponfee.scheduler.core.base.SupervisorService;
import cn.ponfee.scheduler.core.enums.ExecuteState;
import cn.ponfee.scheduler.core.enums.Operations;
import cn.ponfee.scheduler.core.exception.CancelTaskException;
import cn.ponfee.scheduler.core.exception.PauseTaskException;
import cn.ponfee.scheduler.core.handle.Checkpoint;
import cn.ponfee.scheduler.core.handle.JobHandler;
import cn.ponfee.scheduler.core.handle.JobHandlerUtils;
import cn.ponfee.scheduler.core.handle.TaskExecutor;
import cn.ponfee.scheduler.core.model.SchedJob;
import cn.ponfee.scheduler.core.model.SchedTask;
import cn.ponfee.scheduler.core.param.ExecuteParam;
import cn.ponfee.scheduler.core.param.TaskWorker;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;

public class WorkerThreadPool
extends Thread
implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(WorkerThreadPool.class);
    private static final int ERROR_MSG_MAX_LENGTH = 2048;
    private static final ThreadPoolExecutor STOP_TASK_POOL = ThreadPoolExecutors.create((int)1, (int)10, (long)300L, (int)50, (RejectedExecutionHandler)ThreadPoolExecutors.ALWAYS_CALLER_RUNS);
    private final SupervisorService supervisorClient;
    private final int maximumPoolSize;
    private final long keepAliveTimeSeconds;
    private final ActiveThreadPool activePool = new ActiveThreadPool();
    private final LinkedBlockingDeque<WorkerThread> idlePool = new LinkedBlockingDeque();
    private final LinkedBlockingDeque<ExecuteParam> taskQueue = new LinkedBlockingDeque();
    private final AtomicInteger workerThreadCounter = new AtomicInteger(0);
    private final AtomicBoolean closed = new AtomicBoolean(false);

    public WorkerThreadPool(int maximumPoolSize, long keepAliveTimeSeconds, SupervisorService supervisorClient) {
        Assert.isTrue((maximumPoolSize > 0 ? 1 : 0) != 0, (String)"Maximum pool size must be positive number.");
        Assert.isTrue((keepAliveTimeSeconds > 0L ? 1 : 0) != 0, (String)"Keep alive time seconds must be positive number.");
        this.maximumPoolSize = maximumPoolSize;
        this.keepAliveTimeSeconds = keepAliveTimeSeconds;
        this.supervisorClient = supervisorClient;
        super.setDaemon(true);
        super.setName(this.getClass().getSimpleName());
    }

    public boolean submit(ExecuteParam param) {
        if (this.closed.get()) {
            return false;
        }
        if (param.operation() == Operations.TRIGGER) {
            return this.taskQueue.offerLast(param);
        }
        STOP_TASK_POOL.execute(() -> this.stop(param));
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void stop(ExecuteParam stopParam) {
        Operations ops = stopParam.operation();
        Assert.isTrue((ops != null && ops != Operations.TRIGGER ? 1 : 0) != 0, () -> "Invalid stop operation: " + ops);
        if (this.closed.get()) {
            return;
        }
        long taskId = stopParam.getTaskId();
        Pair<WorkerThread, ExecuteParam> pair = this.activePool.stopTask(taskId, ops);
        if (pair == null) {
            LOG.info("Not found stoppable task {} | {}", (Object)taskId, (Object)ops);
            try {
                WorkerThreadPool.terminateTask0(this.supervisorClient, stopParam, ops, ops.targetState(), null);
            }
            catch (Exception e) {
                LOG.error("Abort stopped task occur error: {} | {}", (Object)taskId, (Object)ops);
            }
            return;
        }
        WorkerThread workerThread = (WorkerThread)pair.getLeft();
        ExecuteParam param = (ExecuteParam)pair.getRight();
        LOG.info("Stop task: {} | {} | {}", new Object[]{taskId, ops, workerThread.getName()});
        try {
            param.interrupt();
            this.stopWorkerThread(workerThread, true);
        }
        catch (Throwable throwable) {
            try {
                WorkerThreadPool.terminateTask(this.supervisorClient, param, ops, ops.targetState(), null);
            }
            catch (Exception e) {
                LOG.error("Normal stop task occur error: {} | {} | {}", new Object[]{taskId, ops, workerThread.getName()});
                Threads.interruptIfNecessary((Throwable)e);
            }
            throw throwable;
        }
        try {
            WorkerThreadPool.terminateTask(this.supervisorClient, param, ops, ops.targetState(), null);
        }
        catch (Exception e) {
            LOG.error("Normal stop task occur error: {} | {} | {}", new Object[]{taskId, ops, workerThread.getName()});
            Threads.interruptIfNecessary((Throwable)e);
        }
    }

    @Override
    public void close() {
        if (!this.closed.compareAndSet(false, true)) {
            LOG.warn("Repeat call close method\n{}", (Object)ObjectUtils.getStackTrace());
            return;
        }
        LOG.info("Close worker thread pool start...");
        Throwables.caught(this.activePool::stopPool);
        this.idlePool.forEach(e -> Throwables.caught(e::toStop));
        Throwables.caught(this.taskQueue::clear);
        Throwables.caught(() -> Threads.stopThread((Thread)this, (int)0, (long)0L, (long)200L));
        this.idlePool.forEach(e -> Throwables.caught(() -> this.stopWorkerThread((WorkerThread)e, true)));
        Throwables.caught(this.idlePool::clear);
        Throwables.caught(this.activePool::closePool);
        this.workerThreadCounter.set(0);
        Throwables.caught(() -> ThreadPoolExecutors.shutdown((ExecutorService)STOP_TASK_POOL, (int)1));
        LOG.info("Close worker thread pool end.");
    }

    @Override
    public void run() {
        block10: while (true) {
            try {
                while (!this.closed.get()) {
                    ExecuteParam param = this.taskQueue.takeFirst();
                    WorkerThread workerThread = this.idlePool.pollFirst();
                    if (workerThread == null) {
                        workerThread = this.createWorkerThreadIfNecessary();
                    }
                    if (workerThread == null) {
                        workerThread = this.idlePool.takeFirst();
                    }
                    if (workerThread.isStopped()) {
                        LOG.info("Worker thread already stopped.");
                        this.taskQueue.putFirst(param);
                        this.stopWorkerThread(workerThread, true);
                        continue;
                    }
                    try {
                        this.activePool.doExecute(workerThread, param);
                        continue block10;
                    }
                    catch (InterruptedException e) {
                        LOG.error("Do execute occur thread interrupted.", (Throwable)e);
                        param = null;
                        this.stopWorkerThread(workerThread, true);
                        throw e;
                    }
                    catch (BrokenThreadException e) {
                        LOG.error("Do execute broken thread.", (Throwable)e);
                        this.taskQueue.putFirst(param);
                        this.stopWorkerThread(workerThread, true);
                    }
                    catch (IllegalTaskException e) {
                        LOG.error(e.getMessage());
                        param = null;
                        this.idlePool.putFirst(workerThread);
                    }
                    catch (DuplicateTaskException e) {
                        LOG.error(e.getMessage());
                        try {
                            WorkerThreadPool.terminateTask(this.supervisorClient, param, Operations.TRIGGER, ExecuteState.VERIFY_FAILED, WorkerThreadPool.toErrorMsg(e));
                        }
                        catch (Exception ex) {
                            LOG.error("Cancel duplicate task occur error: " + param, (Throwable)ex);
                        }
                        this.idlePool.putFirst(workerThread);
                    }
                }
                break;
            }
            catch (InterruptedException e) {
                LOG.error("Thread pool running interrupted.", (Throwable)e);
                Thread.currentThread().interrupt();
                break;
            }
            catch (Exception e) {
                LOG.error("Thread pool running occur error.", (Throwable)e);
                break;
            }
        }
        this.close();
    }

    @Override
    public String toString() {
        return String.format("maximum-pool-size=%d, total-count=%d, active-count=%d, idle-count=%d, task-count=%d", this.maximumPoolSize, this.workerThreadCounter.get(), this.activePool.size(), this.idlePool.size(), this.taskQueue.size());
    }

    private boolean returnWorkerThread(WorkerThread workerThread) {
        if (this.activePool.removeThread(workerThread) == null) {
            LOG.warn("Return thread failed, because not found: {}", (Object)workerThread.getName());
            return false;
        }
        try {
            this.idlePool.putFirst(workerThread);
            return true;
        }
        catch (InterruptedException e) {
            LOG.error("Return thread to idle pool interrupted.", (Throwable)e);
            Thread.currentThread().interrupt();
            this.stopWorkerThread(workerThread, false);
            return false;
        }
    }

    private void removeWorkerThread(WorkerThread workerThread) {
        boolean hasRemoved;
        workerThread.toStop();
        boolean bl = hasRemoved = this.activePool.removeThread(workerThread) != null;
        if (!hasRemoved) {
            hasRemoved = this.idlePool.remove(workerThread);
        }
        if (!hasRemoved) {
            LOG.warn("Not found removable thread: {}", (Object)workerThread.getName());
        }
        this.stopWorkerThread(workerThread, false);
    }

    private void stopWorkerThread(WorkerThread workerThread, boolean doStop) {
        this.workerThreadCounter.decrementAndGet();
        if (doStop) {
            LOG.info("Worker thread death: {}", (Object)workerThread.getName());
            workerThread.doStop(0, 0L, 200L);
        } else {
            workerThread.toStop();
        }
    }

    private WorkerThread createWorkerThreadIfNecessary() {
        int count;
        while ((count = this.workerThreadCounter.get()) < this.maximumPoolSize) {
            if (!this.workerThreadCounter.compareAndSet(count, count + 1)) continue;
            WorkerThread thread = new WorkerThread(this, this.supervisorClient, this.keepAliveTimeSeconds);
            LOG.info("Created worker thread, current size: {}", (Object)(count + 1));
            return thread;
        }
        return null;
    }

    private static String toErrorMsg(Exception e) {
        if (e == null) {
            return null;
        }
        String errorMsg = Throwables.getRootCauseStackTrace((Throwable)e);
        if (errorMsg.length() > 2048) {
            errorMsg = errorMsg.substring(0, 2048);
        }
        return errorMsg;
    }

    private static void terminateTask(SupervisorService supervisorClient, ExecuteParam param, Operations ops, ExecuteState toState, String errorMsg) throws Exception {
        Assert.notNull((Object)ops, (String)"Terminate task current operation cannot null.");
        if (param.operation() == null) {
            return;
        }
        if (!param.updateOperation(ops, null)) {
            LOG.warn("Clear execution param operation conflict: {} | {} | {}", new Object[]{param, ops, toState});
            return;
        }
        WorkerThreadPool.terminateTask0(supervisorClient, param, ops, toState, errorMsg);
    }

    private static void terminateTask0(SupervisorService supervisorClient, ExecuteParam param, Operations ops, ExecuteState toState, String errorMsg) throws Exception {
        boolean success;
        switch (ops) {
            case TRIGGER: {
                success = supervisorClient.terminateExecutingTask(param, toState, errorMsg);
                break;
            }
            case PAUSE: {
                success = supervisorClient.pauseExecutingTask(param, errorMsg);
                break;
            }
            default: {
                success = supervisorClient.cancelExecutingTask(param, toState, errorMsg);
            }
        }
        if (!success) {
            LOG.error("Terminate sched task failed: {} | {} | {} | {}", new Object[]{param, ops, toState, errorMsg});
        }
    }

    private static void terminateInstance(SupervisorService supervisorClient, ExecuteParam param, Operations ops, String errorMsg) throws Exception {
        boolean success;
        if (!param.updateOperation(Operations.TRIGGER, ops)) {
            LOG.error("Terminate sched instance conflicted: {} | {} | {}", new Object[]{param, ops, errorMsg});
            return;
        }
        LOG.info("Terminate the sched instance {}", (Object)param);
        try {
            WorkerThreadPool.terminateTask(supervisorClient, param, ops, ops.targetState(), errorMsg);
        }
        catch (Exception e) {
            LOG.error("Terminate sched instance task error: " + param, (Throwable)e);
        }
        switch (ops) {
            case PAUSE: {
                success = supervisorClient.pauseInstance(param.getInstanceId());
                break;
            }
            case EXCEPTION_CANCEL: {
                success = supervisorClient.cancelInstance(param.getInstanceId(), ops);
                break;
            }
            default: {
                throw new UnsupportedOperationException("Terminate sched instance unsupported operation: " + ops);
            }
        }
        if (!success) {
            LOG.error("Terminate sched instance failed: {} | {} | {}", new Object[]{param, ops, errorMsg});
        }
    }

    private static class WorkerThread
    extends Thread {
        private static final AtomicInteger NAMED_SEQ = new AtomicInteger(1);
        private static final AtomicInteger FUTURE_TASK_NAMED_SEQ = new AtomicInteger(1);
        private final WorkerThreadPool threadPool;
        private final SupervisorService supervisorClient;
        private final long keepAliveTime;
        private final BlockingQueue<ExecuteParam> workQueue = new SynchronousQueue<ExecuteParam>();
        private final AtomicBoolean stopped = new AtomicBoolean(false);
        private final AtomicReference<ExecuteParam> executingParam = new AtomicReference();

        public WorkerThread(WorkerThreadPool threadPool, SupervisorService supervisorClient, long keepAliveTimeSeconds) {
            this.threadPool = threadPool;
            this.supervisorClient = supervisorClient;
            this.keepAliveTime = TimeUnit.SECONDS.toNanos(keepAliveTimeSeconds);
            super.setDaemon(true);
            super.setName(this.getClass().getSimpleName() + "-" + NAMED_SEQ.getAndIncrement());
            super.start();
        }

        public final void execute(ExecuteParam param) throws InterruptedException {
            if (this.stopped.get() || this.isStopped()) {
                throw new BrokenThreadException("Worker thread already stopped: " + super.getName());
            }
            if (!this.workQueue.offer(param, 100L, TimeUnit.MILLISECONDS)) {
                throw new BrokenThreadException("Put to worker thread queue timeout: " + super.getName());
            }
        }

        public final void toStop() {
            this.stopped.compareAndSet(false, true);
        }

        public final boolean doStop(int sleepCount, long sleepMillis, long joinMillis) {
            this.toStop();
            if (!this.stopped.compareAndSet(false, true)) {
                LOG.error("Repeat do stop worker thread: {}", (Object)super.getName());
                return false;
            }
            return Threads.stopThread((Thread)this, (int)sleepCount, (long)sleepMillis, (long)joinMillis);
        }

        public final boolean updateExecuteParam(ExecuteParam expect, ExecuteParam update) {
            return this.executingParam.compareAndSet(expect, update);
        }

        public final ExecuteParam executingParam() {
            return this.executingParam.get();
        }

        public final boolean isExecuting() {
            return this.executingParam() != null;
        }

        public final boolean isStopped() {
            return Threads.isStopped((Thread)this);
        }

        @Override
        public void run() {
            while (!this.stopped.get()) {
                ExecuteParam executeParam;
                if (super.isInterrupted()) {
                    LOG.warn("Worker boss thread interrupted.");
                    this.threadPool.removeWorkerThread(this);
                    return;
                }
                try {
                    executeParam = this.workQueue.poll(this.keepAliveTime, TimeUnit.NANOSECONDS);
                }
                catch (InterruptedException e) {
                    LOG.error("Poll execution param block interrupted.", (Throwable)e);
                    this.threadPool.removeWorkerThread(this);
                    Thread.currentThread().interrupt();
                    return;
                }
                if (executeParam == null) {
                    LOG.info("Worker thread exit, idle wait timeout.");
                    this.threadPool.removeWorkerThread(this);
                    return;
                }
                try {
                    this.runTask(executeParam);
                }
                catch (Exception e) {
                    try {
                        WorkerThreadPool.terminateTask(this.supervisorClient, executeParam, Operations.TRIGGER, ExecuteState.EXECUTE_EXCEPTION, WorkerThreadPool.toErrorMsg(e));
                    }
                    catch (Exception ex) {
                        LOG.error("Worker thread terminate failed: " + executeParam, (Throwable)ex);
                    }
                    if (e instanceof InterruptedException) {
                        LOG.error("Worker thread execute interrupted: " + executeParam, (Throwable)e);
                        this.threadPool.removeWorkerThread(this);
                        Thread.currentThread().interrupt();
                        return;
                    }
                    LOG.error("Worker thread execute failed: " + executeParam, (Throwable)e);
                }
                if (this.threadPool.returnWorkerThread(this)) continue;
                return;
            }
            this.threadPool.removeWorkerThread(this);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void runTask(ExecuteParam param) throws Exception {
            JobHandler taskExecutor;
            SchedJob schedJob;
            SchedTask task;
            try {
                task = this.supervisorClient.getTask(param.getTaskId());
                if (task == null) {
                    LOG.error("Sched task not found {}", (Object)param);
                    return;
                }
                ExecuteState fromState = ExecuteState.of((Integer)task.getExecuteState());
                if (fromState != ExecuteState.WAITING) {
                    LOG.warn("Task state not executable: {} | {} | {}", new Object[]{task.getTaskId(), fromState, param.operation()});
                    return;
                }
                schedJob = this.supervisorClient.getJob(param.getJobId());
                if (schedJob == null) {
                    LOG.error("Sched job not found {}", (Object)param);
                    return;
                }
                boolean status = this.supervisorClient.startTask(param);
                if (!status) {
                    LOG.warn("Task start conflicted {}", (Object)param);
                    return;
                }
            }
            catch (Exception e) {
                LOG.warn("Start task fail: " + param, (Throwable)e);
                try {
                    this.supervisorClient.updateTaskWorker(Collections.singletonList(new TaskWorker(Long.valueOf(param.getTaskId()), "")));
                }
                catch (Exception ex) {
                    LOG.error("Reset task worker occur error: " + param, (Throwable)ex);
                }
                return;
            }
            try {
                taskExecutor = JobHandlerUtils.newInstance((String)schedJob.getJobHandler());
            }
            catch (Exception e) {
                LOG.error("Load job handler error: " + param, (Throwable)e);
                WorkerThreadPool.terminateTask(this.supervisorClient, param, Operations.TRIGGER, ExecuteState.INSTANCE_FAILED, WorkerThreadPool.toErrorMsg(e));
                return;
            }
            taskExecutor.task(task);
            param.taskExecutor((TaskExecutor)taskExecutor);
            try {
                taskExecutor.verify();
            }
            catch (Exception e) {
                LOG.error("Task verify failed: " + param, (Throwable)e);
                WorkerThreadPool.terminateTask(this.supervisorClient, param, Operations.TRIGGER, ExecuteState.VERIFY_FAILED, WorkerThreadPool.toErrorMsg(e));
                return;
            }
            try {
                taskExecutor.init();
                LOG.info("Initiated sched task {}", (Object)param);
            }
            catch (Exception e) {
                LOG.error("Task init error: " + param, (Throwable)e);
                WorkerThreadPool.terminateTask(this.supervisorClient, param, Operations.TRIGGER, ExecuteState.INIT_EXCEPTION, WorkerThreadPool.toErrorMsg(e));
                return;
            }
            try {
                Result result;
                if (schedJob.getExecuteTimeout() > 0) {
                    FutureTask<Result> futureTask = new FutureTask<Result>(() -> this.lambda$runTask$0((TaskExecutor)taskExecutor));
                    Thread futureTaskThread = new Thread(futureTask);
                    futureTaskThread.setDaemon(true);
                    futureTaskThread.setName(this.getClass().getSimpleName() + "#FutureTaskThread-" + FUTURE_TASK_NAMED_SEQ.getAndIncrement());
                    futureTaskThread.start();
                    try {
                        result = futureTask.get(schedJob.getExecuteTimeout().intValue(), TimeUnit.MILLISECONDS);
                    }
                    finally {
                        Threads.stopThread((Thread)futureTaskThread, (int)0, (long)0L, (long)0L);
                    }
                } else {
                    result = taskExecutor.execute((Checkpoint)this.supervisorClient);
                }
                LOG.info("Executed sched task {}", (Object)param);
                if (result.isSuccess()) {
                    LOG.info("Task executed finished {}", (Object)param);
                    WorkerThreadPool.terminateTask(this.supervisorClient, param, Operations.TRIGGER, ExecuteState.FINISHED, null);
                } else {
                    LOG.error("Task executed failed {} | {}", (Object)param, (Object)result);
                    WorkerThreadPool.terminateTask(this.supervisorClient, param, Operations.TRIGGER, ExecuteState.EXECUTE_FAILED, result.getMsg());
                }
            }
            catch (TimeoutException e) {
                LOG.error("Task execute timeout: " + param, (Throwable)e);
                WorkerThreadPool.terminateTask(this.supervisorClient, param, Operations.TRIGGER, ExecuteState.EXECUTE_TIMEOUT, WorkerThreadPool.toErrorMsg(e));
            }
            catch (PauseTaskException e) {
                LOG.error("PauseTaskException, do pause: " + param, (Throwable)e);
                WorkerThreadPool.terminateInstance(this.supervisorClient, param, Operations.PAUSE, WorkerThreadPool.toErrorMsg((Exception)((Object)e)));
            }
            catch (CancelTaskException e) {
                LOG.error("CancelTaskException, do manual cancel: " + param, (Throwable)e);
                WorkerThreadPool.terminateInstance(this.supervisorClient, param, Operations.EXCEPTION_CANCEL, WorkerThreadPool.toErrorMsg((Exception)((Object)e)));
            }
            catch (Exception e) {
                LOG.error("Task execute occur error: " + param, (Throwable)e);
                WorkerThreadPool.terminateTask(this.supervisorClient, param, Operations.TRIGGER, ExecuteState.EXECUTE_EXCEPTION, WorkerThreadPool.toErrorMsg(e));
            }
            finally {
                try {
                    taskExecutor.destroy();
                    LOG.info("Destroyed sched task: {}", (Object)param);
                }
                catch (Exception e) {
                    LOG.error("Task destroy error: " + param, (Throwable)e);
                }
            }
        }

        private /* synthetic */ Result lambda$runTask$0(TaskExecutor taskExecutor) throws Exception {
            return taskExecutor.execute((Checkpoint)this.supervisorClient);
        }
    }

    private static class BrokenThreadException
    extends RuntimeException {
        private static final long serialVersionUID = 3475868254991118684L;

        public BrokenThreadException(String message) {
            super(message);
        }
    }

    private static class DuplicateTaskException
    extends RuntimeException {
        private static final long serialVersionUID = -5210570253941551115L;

        public DuplicateTaskException(String message) {
            super(message);
        }
    }

    private static class IllegalTaskException
    extends RuntimeException {
        private static final long serialVersionUID = -1273937229826200274L;

        public IllegalTaskException(String message) {
            super(message);
        }
    }

    private class ActiveThreadPool {
        private final Map<Long, WorkerThread> pool = new HashMap<Long, WorkerThread>();

        private ActiveThreadPool() {
        }

        synchronized void doExecute(WorkerThread workerThread, ExecuteParam param) throws InterruptedException {
            if (param == null || param.operation() != Operations.TRIGGER) {
                throw new IllegalTaskException("Invalid execute param: " + param);
            }
            WorkerThread exists = this.pool.get(param.getTaskId());
            if (exists != null) {
                throw param.same(exists.executingParam()) ? new IllegalTaskException("Task repeat execute: " + param) : new DuplicateTaskException("Task id duplicate: " + param + ", " + exists.executingParam());
            }
            if (!workerThread.updateExecuteParam(null, param)) {
                throw new BrokenThreadException("Execute worker thread conflict: " + workerThread.getName() + ", " + workerThread.executingParam());
            }
            try {
                workerThread.execute(param);
            }
            catch (Exception e) {
                workerThread.updateExecuteParam(param, null);
                throw e;
            }
            this.pool.put(param.getTaskId(), workerThread);
        }

        synchronized Pair<WorkerThread, ExecuteParam> stopTask(long taskId, Operations ops) {
            ExecuteParam param;
            WorkerThread thread = this.pool.get(taskId);
            if (thread == null || (param = thread.executingParam()) == null) {
                return null;
            }
            if (!param.updateOperation(Operations.TRIGGER, ops)) {
                return null;
            }
            if (!thread.updateExecuteParam(param, null)) {
                LOG.error("Stop task clear execute param failed: {}", (Object)param);
                return null;
            }
            this.pool.remove(taskId);
            LOG.info("Removed active pool worker thread: {} | {}", (Object)thread.getName(), (Object)param);
            return Pair.of((Object)thread, (Object)param);
        }

        synchronized ExecuteParam removeThread(WorkerThread workerThread) {
            ExecuteParam param = workerThread.executingParam();
            if (param == null) {
                return null;
            }
            if (!workerThread.updateExecuteParam(param, null)) {
                LOG.error("Remove thread clear execute param failed: {}", (Object)param);
                return null;
            }
            WorkerThread removed = this.pool.remove(param.getTaskId());
            Assert.isTrue((workerThread == removed ? 1 : 0) != 0, () -> "Inconsistent worker thread: " + param.getTaskId() + ", " + workerThread.getName() + ", " + removed.getName());
            return param;
        }

        synchronized void stopPool() {
            this.pool.forEach((id, workerThread) -> {
                workerThread.toStop();
                workerThread.executingParam().interrupt();
            });
        }

        synchronized void closePool() {
            this.pool.entrySet().parallelStream().forEach(entry -> {
                WorkerThread workerThread = (WorkerThread)entry.getValue();
                ExecuteParam param = workerThread.executingParam();
                Operations ops = Operations.PAUSE;
                boolean success = param.updateOperation(Operations.TRIGGER, ops);
                try {
                    WorkerThreadPool.this.stopWorkerThread(workerThread, true);
                }
                catch (Exception e) {
                    LOG.error("Stop worker thread occur error on thread pool close: " + param + " | " + workerThread, (Throwable)e);
                }
                if (success) {
                    try {
                        WorkerThreadPool.terminateTask(WorkerThreadPool.this.supervisorClient, param, ops, ops.targetState(), null);
                    }
                    catch (Exception e) {
                        LOG.error("Terminate task failed on thread pool close: " + param, (Throwable)e);
                    }
                } else {
                    LOG.error("Change execution param ops failed on thread pool close: {} | {}", (Object)param, (Object)ops);
                }
                workerThread.updateExecuteParam(param, null);
            });
            this.pool.clear();
        }

        int size() {
            return this.pool.size();
        }
    }
}

