/*
 * Decompiled with CFR 0.152.
 */
package org.iplass.mtp.impl.async.rdb;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.iplass.mtp.async.AsyncTaskFuture;
import org.iplass.mtp.async.AsyncTaskOption;
import org.iplass.mtp.async.TaskStatus;
import org.iplass.mtp.entity.EntityConcurrentUpdateException;
import org.iplass.mtp.impl.async.AsyncTaskContextImpl;
import org.iplass.mtp.impl.async.AsyncTaskRuntimeException;
import org.iplass.mtp.impl.async.AsyncTaskService;
import org.iplass.mtp.impl.async.ExceptionHandleable;
import org.iplass.mtp.impl.async.rdb.CallableInput;
import org.iplass.mtp.impl.async.rdb.Queue;
import org.iplass.mtp.impl.async.rdb.RdbQueueService;
import org.iplass.mtp.impl.async.rdb.Task;
import org.iplass.mtp.impl.async.rdb.TaskSubmitListener;
import org.iplass.mtp.impl.async.rdb.Worker;
import org.iplass.mtp.impl.async.rdb.workers.LocalWorker;
import org.iplass.mtp.impl.auth.AuthContextHolder;
import org.iplass.mtp.impl.core.ExecuteContext;
import org.iplass.mtp.spi.Config;
import org.iplass.mtp.transaction.Transaction;
import org.iplass.mtp.transaction.TransactionListener;
import org.iplass.mtp.transaction.TransactionStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RdbAsyncTaskService
extends AsyncTaskService {
    private static Logger fatalLogger = LoggerFactory.getLogger((String)"mtp.fatal.async.rdb");
    private long defaultGetResultTimeoutMillis = 60000L;
    private long initialGetResultIntervalMillis = 100L;
    private RdbQueueService queueService;

    public long getDefaultGetResultTimeoutMillis() {
        return this.defaultGetResultTimeoutMillis;
    }

    @Override
    public void init(Config config) {
        this.queueService = config.getDependentService(RdbQueueService.class);
        if (config.getValue("defaultGetResultTimeoutMillis") != null) {
            this.defaultGetResultTimeoutMillis = Long.parseLong(config.getValue("defaultGetResultTimeoutMillis"));
        }
    }

    @Override
    public void destroy() {
    }

    public RdbQueueService getQueueService() {
        return this.queueService;
    }

    @Override
    public <V> Future<V> execute(Callable<V> task) {
        return this.execute(task, new AsyncTaskOption(), true);
    }

    @Override
    public <V> AsyncTaskFuture<V> execute(Callable<V> task, AsyncTaskOption option, boolean inheritAuthContext) {
        if (!(task instanceof Serializable)) {
            throw new AsyncTaskRuntimeException("AsyncTask's callable must implements Serializable:" + task.getClass());
        }
        Queue q = this.queueService.getQueue(option.getQueue());
        if (q == null) {
            throw new AsyncTaskRuntimeException("queue:" + option.getQueue() + " not defined.");
        }
        Task newTask = null;
        AuthContextHolder ach = AuthContextHolder.getAuthContext();
        newTask = ach.isSecuredAction() && inheritAuthContext ? new Task(new CallableInput<V>(task, ach.getUserContext(), ach.isPrivileged()), option.getGroupingKey(), option.getExceptionHandlingMode(), option.isReturnResult(), option.getExecutionTime()) : new Task(new CallableInput<V>(task, null, false), option.getGroupingKey(), option.getExceptionHandlingMode(), option.isReturnResult(), option.getExecutionTime());
        WorkerWakeUpCaller callback = null;
        if (q.getConfig().getWorker().isWakeupOnSubmit()) {
            callback = new WorkerWakeUpCaller();
        }
        q.submit(newTask, option.getStartMode(), callback);
        return new RdbAsyncTaskFuture(newTask, q, this.defaultGetResultTimeoutMillis, this.initialGetResultIntervalMillis);
    }

    @Override
    public <V> AsyncTaskFuture<V> getResult(long taskId, String queueName) {
        Queue q = this.queueService.getQueue(queueName);
        if (q == null) {
            throw new AsyncTaskRuntimeException("queue:" + queueName + " not defined.");
        }
        Task task = new Task();
        task.setTenantId(ExecuteContext.getCurrentContext().getClientTenantId());
        task.setTaskId(taskId);
        task.setStatus(TaskStatus.UNKNOWN);
        return new RdbAsyncTaskFuture(task, q, this.defaultGetResultTimeoutMillis, this.initialGetResultIntervalMillis);
    }

    private static class RdbAsyncTaskFuture<V>
    implements AsyncTaskFuture<V> {
        private Task task;
        private Queue q;
        private long defaultGetResultTimeoutMillis;
        private long initialGetResultIntervalMillis;

        private RdbAsyncTaskFuture(Task task, Queue q, long defaultGetResultTimeoutMillis, long initialGetResultIntervalMillis) {
            this.task = task;
            this.q = q;
            this.defaultGetResultTimeoutMillis = defaultGetResultTimeoutMillis;
            this.initialGetResultIntervalMillis = initialGetResultIntervalMillis;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public boolean cancel(final boolean mayInterruptIfRunning) {
            try {
                final Task reload = this.q.peek(this.task.getTaskId(), true, true);
                if (this.q.taskAbort(reload, mayInterruptIfRunning, null, true, true)) {
                    if (reload != null && reload.getCallable().getActual() instanceof ExceptionHandleable) {
                        AsyncTaskContextImpl asyncTaskContext = new AsyncTaskContextImpl(this.task.getTaskId(), this.q.getName());
                        try {
                            ExecuteContext.getCurrentContext().setAttribute("mtp.async.AsyncTaskContext", asyncTaskContext, false);
                            ((ExceptionHandleable)((Object)reload.getCallable().getActual())).canceled();
                        }
                        catch (Throwable e) {
                            fatalLogger.warn("ExceptionHandleable's canceled() call failed.(queue:" + this.q.getConfig().getName() + ", task:" + reload.getTaskId() + ",tenantId:" + reload.getTenantId() + ")", e);
                            Transaction tran = Transaction.getCurrent();
                            if (tran != null && tran.getStatus() == TransactionStatus.ACTIVE && tran.isRollbackOnly()) {
                                tran.addTransactionListener(new TransactionListener(){

                                    @Override
                                    public void afterRollback(Transaction tt) {
                                        q.taskAbort(reload, mayInterruptIfRunning, null, true, true);
                                    }
                                });
                            }
                        }
                        finally {
                            ExecuteContext.getCurrentContext().removeAttribute("mtp.async.AsyncTaskContext");
                        }
                    }
                    return true;
                }
                return false;
            }
            catch (EntityConcurrentUpdateException e) {
                return false;
            }
        }

        private void reload() {
            Task reload;
            if (this.task.getStatus() != TaskStatus.COMPLETED && this.task.getStatus() != TaskStatus.ABORTED && (reload = this.q.peek(this.task.getTaskId(), false, true)) != null) {
                this.task = reload;
            }
        }

        @Override
        public boolean isCancelled() {
            this.reload();
            return this.task.getStatus() == TaskStatus.ABORTED;
        }

        @Override
        public boolean isDone() {
            this.reload();
            return this.task.getStatus() == TaskStatus.COMPLETED || this.task.getStatus() == TaskStatus.ABORTED || this.task.getStatus() == TaskStatus.RETURNED;
        }

        @Override
        public V get() throws InterruptedException, ExecutionException {
            try {
                return this.get(-1L, null);
            }
            catch (TimeoutException e) {
                throw new ExecutionException(e);
            }
        }

        @Override
        public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            if (this.task.getStatus() != TaskStatus.COMPLETED && this.task.getStatus() != TaskStatus.ABORTED) {
                if (timeout < 0L) {
                    timeout = this.defaultGetResultTimeoutMillis;
                    unit = TimeUnit.MILLISECONDS;
                }
                long remainTime = unit.toMillis(timeout);
                long interval = this.initialGetResultIntervalMillis;
                Task getTask = null;
                while (remainTime >= 0L) {
                    try {
                        getTask = this.q.pullResult(this.task.getTaskId());
                    }
                    catch (Exception e) {
                        throw new ExecutionException(e);
                    }
                    if (getTask != null) break;
                    Thread.sleep(interval);
                    remainTime -= interval;
                    interval *= 2L;
                }
                if (getTask == null) {
                    throw new TimeoutException("RdbAsyncTask.get operation timeout.queueName:" + this.q.getName() + ", taskId:" + this.task.getTaskId());
                }
                this.task = getTask;
            }
            Object res = this.task.getResult();
            if (this.task.getStatus() == TaskStatus.ABORTED && res instanceof Throwable) {
                throw new ExecutionException((Throwable)res);
            }
            return (V)res;
        }

        @Override
        public long getTaskId() {
            return this.task.getTaskId();
        }

        @Override
        public TaskStatus getStatus() {
            this.reload();
            return this.task.getStatus();
        }

        @Override
        public String getQueueName() {
            return this.q.getName();
        }
    }

    private static class WorkerWakeUpCaller
    implements TaskSubmitListener {
        private static final String WID_LIST = "mtp.async.rdb.workerWakeUpCaller";
        private static Random rand = new Random(System.currentTimeMillis());
        private Task task;
        private Queue queue;

        private WorkerWakeUpCaller() {
        }

        @Override
        public void setContext(Task task, Queue queue) {
            this.task = task;
            this.queue = queue;
        }

        @Override
        public void afterCommit(Transaction t) {
            int workerId = this.queue.resolveActualWorkerId(this.task.getVirtualWorkerId());
            ArrayList<Integer> widList = (ArrayList<Integer>)t.getAttribute(WID_LIST);
            if (widList == null) {
                widList = new ArrayList<Integer>();
                t.setAttribute(WID_LIST, widList);
            }
            if (!widList.contains(workerId)) {
                Worker w = null;
                if (workerId == -1) {
                    widList.add(workerId);
                    workerId = rand.nextInt(this.queue.getConfig().getWorker().getActualWorkerSize());
                    for (int i = 0; i < this.queue.getConfig().getWorker().getActualWorkerSize(); ++i) {
                        int cwid = (workerId + i) % this.queue.getConfig().getWorker().getActualWorkerSize();
                        Worker cw = this.queue.getWorker(cwid);
                        if (!(cw instanceof LocalWorker)) continue;
                        w = cw;
                        workerId = cwid;
                        break;
                    }
                    if (w == null) {
                        w = this.queue.getWorker(workerId);
                    }
                } else {
                    w = this.queue.getWorker(workerId);
                }
                w.wakeup();
                widList.add(workerId);
            }
        }

        @Override
        public void afterRollback(Transaction t) {
        }
    }
}

