/*
 * Decompiled with CFR 0.152.
 */
package org.iplass.mtp.impl.async.rdb;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.NotSerializableException;
import java.io.ObjectOutputStream;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import org.iplass.mtp.async.ExceptionHandlingMode;
import org.iplass.mtp.async.StartMode;
import org.iplass.mtp.async.TaskStatus;
import org.iplass.mtp.async.TaskTimeoutException;
import org.iplass.mtp.impl.async.AsyncTaskContextImpl;
import org.iplass.mtp.impl.async.AsyncTaskRuntimeException;
import org.iplass.mtp.impl.async.ExceptionHandleable;
import org.iplass.mtp.impl.async.rdb.DefaultWorkerFactory;
import org.iplass.mtp.impl.async.rdb.QueueConfig;
import org.iplass.mtp.impl.async.rdb.Task;
import org.iplass.mtp.impl.async.rdb.TaskDao;
import org.iplass.mtp.impl.async.rdb.TaskSearchCondition;
import org.iplass.mtp.impl.async.rdb.TaskSubmitListener;
import org.iplass.mtp.impl.async.rdb.Worker;
import org.iplass.mtp.impl.async.rdb.WorkerFactory;
import org.iplass.mtp.impl.core.Executable;
import org.iplass.mtp.impl.core.ExecuteContext;
import org.iplass.mtp.impl.core.TenantContextService;
import org.iplass.mtp.impl.core.config.ServerEnv;
import org.iplass.mtp.impl.counter.CounterService;
import org.iplass.mtp.impl.rdb.adapter.RdbAdapter;
import org.iplass.mtp.spi.ServiceRegistry;
import org.iplass.mtp.transaction.Transaction;
import org.iplass.mtp.transaction.TransactionListener;
import org.iplass.mtp.transaction.TransactionStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Queue {
    private static final String KEY_NEVER_START = "mtp.asynctask.neverstartasynctask";
    private static Logger mtpLogger = LoggerFactory.getLogger((String)"mtp.asynctask");
    private static Logger fatalLogger = LoggerFactory.getLogger((String)"mtp.fatal.asynctask");
    private static Logger logger = LoggerFactory.getLogger(Queue.class);
    private final QueueConfig config;
    private final TaskDao dao;
    private final CounterService counter;
    private final CounterService counterForGroup;
    private final int[][] workerIdMap;
    private final Worker[] workers;

    public Queue(QueueConfig config, CounterService counter, CounterService counterForGroup, RdbAdapter rdb) {
        this(config, counter, counterForGroup, rdb, new DefaultWorkerFactory());
    }

    public Queue(QueueConfig config, CounterService counter, CounterService counterForGroup, RdbAdapter rdb, WorkerFactory workerFactory) {
        this.config = config;
        this.counter = counter;
        this.counterForGroup = counterForGroup;
        this.dao = new TaskDao(rdb);
        this.workerIdMap = new int[config.getWorker().getActualWorkerSize()][];
        if (config.getWorker().getActualWorkerSize() > 0) {
            int mod = config.getWorker().getVirtualWorkerSize() % config.getWorker().getActualWorkerSize();
            int length = config.getWorker().getVirtualWorkerSize() / config.getWorker().getActualWorkerSize();
            for (int i = 0; i < this.workerIdMap.length; ++i) {
                this.workerIdMap[i] = i < mod ? new int[length + 1] : new int[length];
            }
            int index = 0;
            for (int i = 0; i < this.workerIdMap[0].length; ++i) {
                for (int j = 0; j < config.getWorker().getActualWorkerSize(); ++j) {
                    if (this.workerIdMap[j].length > i) {
                        this.workerIdMap[j][i] = index;
                    }
                    ++index;
                }
            }
        }
        this.workers = new Worker[config.getWorker().getActualWorkerSize()];
        for (int i = 0; i < this.workers.length; ++i) {
            this.workers[i] = workerFactory.createWorker(this, i);
        }
    }

    public void startWorker() {
        for (Worker w : this.workers) {
            w.start();
        }
    }

    public void stopWorker() {
        for (Worker w : this.workers) {
            w.stop();
        }
    }

    public QueueConfig getConfig() {
        return this.config;
    }

    public Worker getWorker(int workerId) {
        return this.workers[workerId];
    }

    private void genVirtualWorkerId(Task task) {
        if (task.getGroupingKey() == null) {
            if (this.config.isSelectWorkerOnSubmit()) {
                int prime = 31;
                int result = 1;
                result = 31 * result + task.getTenantId();
                result = 31 * result + (int)(task.getTaskId() ^ task.getTaskId() >>> 32);
                task.setVirtualWorkerId(Math.abs(result) % this.config.getWorker().getVirtualWorkerSize());
            } else {
                task.setVirtualWorkerId(-1);
            }
        } else {
            int prime = 31;
            int result = 1;
            result = 31 * result + task.getTenantId();
            result = 31 * result + task.getGroupingKey().hashCode();
            task.setVirtualWorkerId(Math.abs(result) % this.config.getWorker().getVirtualWorkerSize());
        }
    }

    public void submit(Task task, StartMode mode, TaskSubmitListener callback) {
        ExecuteContext exec = ExecuteContext.getCurrentContext();
        task.setTenantId(exec.getClientTenantId());
        task.setQueueId(this.config.getId());
        long ct = System.currentTimeMillis();
        if (ct > task.getVisibleTime()) {
            task.setVisibleTime(ct);
        }
        task.setStatus(TaskStatus.SUBMITTED);
        task.setVersion(0L);
        task.setServerId(ServerEnv.getInstance().getServerId());
        if (task.getGroupingKey() != null && this.config.isStrictSequence()) {
            task.setTaskId(this.counterForGroup.increment(exec.getClientTenantId(), this.config.getName(), 0L));
        } else {
            task.setTaskId(this.counter.increment(exec.getClientTenantId(), this.config.getName(), 0L));
        }
        this.genVirtualWorkerId(task);
        switch (mode) {
            case IMMEDIATELY: {
                Transaction.requiresNew(t -> {
                    this.dao.insert(task);
                    if (callback != null) {
                        callback.setContext(task, this);
                        t.addTransactionListener(callback);
                    }
                });
                break;
            }
            case AFTER_COMMIT: {
                this.dao.insert(task);
                if (callback == null) break;
                Transaction t2 = Transaction.getCurrent();
                if (t2.getStatus() == TransactionStatus.ACTIVE) {
                    callback.setContext(task, this);
                    t2.addTransactionListener(callback);
                    this.addNeverStartId(this.getName(), task.getTaskId(), t2);
                    break;
                }
                throw new IllegalStateException("Queue#submit() call must in Transaction");
            }
        }
    }

    private void addNeverStartId(String queueName, long taskId, Transaction t) {
        ArrayList<Long> l = (ArrayList<Long>)t.getAttribute("mtp.asynctask.neverstartasynctask." + queueName);
        if (l == null) {
            l = new ArrayList<Long>();
            t.setAttribute("mtp.asynctask.neverstartasynctask." + queueName, l);
        }
        l.add(taskId);
    }

    private boolean isNeverStartId(String queueName, long taskId, Transaction t) {
        List l = (List)t.getAttribute("mtp.asynctask.neverstartasynctask." + queueName);
        return l != null && l.contains(taskId);
    }

    public int resolveActualWorkerId(int virtualWorkerId) {
        if (virtualWorkerId == -1) {
            return -1;
        }
        return virtualWorkerId % this.config.getWorker().getActualWorkerSize();
    }

    private String getServerId(boolean localOnly) {
        if (localOnly) {
            return ServerEnv.getInstance().getServerId();
        }
        return null;
    }

    public synchronized Task poll(int myWorkerId, boolean localOnly) {
        if (myWorkerId >= this.workerIdMap.length) {
            throw new IllegalArgumentException("requested workerId:" + myWorkerId + " is out of range.ActualWorkerSize is " + this.config.getWorker().getActualWorkerSize());
        }
        List<Task> tlist = this.dao.searchForPoll(this.config.getId(), this.workerIdMap[myWorkerId], System.currentTimeMillis(), this.getServerId(localOnly), this.config.getWorker().getMaxRetryCount());
        if (tlist.size() == 0) {
            return null;
        }
        HashSet<String> excludeGroupingKeys = new HashSet<String>();
        block6: for (Task t : tlist) {
            if (t.getGroupingKey() != null) {
                if (excludeGroupingKeys.contains(t.getGroupingKey())) continue;
                if (this.dao.countPreExecuting(ExecuteContext.getCurrentContext().getClientTenantId(), this.config.getId(), t.getGroupingKey(), t.getTaskId(), this.getServerId(localOnly)) > 0) {
                    excludeGroupingKeys.add(t.getGroupingKey());
                    continue;
                }
            }
            if (t.getStatus() == TaskStatus.EXECUTING) {
                switch (t.getExceptionHandlingMode()) {
                    case RESTART: {
                        mtpLogger.warn("queue:" + this.getName() + "'s task:" + t.getTaskId() + "(tenantId:" + t.getTenantId() + ") is timeout. so re-run task.");
                        t.setRetryCount(t.getRetryCount() + 1);
                        break;
                    }
                    case ABORT: 
                    case ABORT_LOG_FATAL: {
                        final Task loaded = this.dao.load(t.getTenantId(), t.getQueueId(), t.getTaskId(), true, false, true);
                        if (!this.taskAbort(loaded, true, new TaskTimeoutException(), false, true)) continue block6;
                        mtpLogger.error("queue:" + this.getName() + "'s task:" + t.getTaskId() + "(tenantId:" + t.getTenantId() + ") is timeout.so abort task.");
                        if (t.getExceptionHandlingMode() == ExceptionHandlingMode.ABORT_LOG_FATAL) {
                            fatalLogger.error("queue:" + this.getName() + "'s task:" + t.getTaskId() + "(tenantId:" + t.getTenantId() + ") is timeout.so abort task.");
                        }
                        try {
                            if (!(loaded.getCallable().getActual() instanceof ExceptionHandleable)) continue block6;
                            ExecuteContext.executeAs(ServiceRegistry.getRegistry().getService(TenantContextService.class).getTenantContext(t.getTenantId()), new Executable<Void>(){

                                @Override
                                public Void execute() {
                                    AsyncTaskContextImpl asyncTaskContext = new AsyncTaskContextImpl(loaded.getTaskId(), Queue.this.getName());
                                    ExecuteContext ec = ExecuteContext.getCurrentContext();
                                    ec.setAttribute("mtp.async.AsyncTaskContext", asyncTaskContext, false);
                                    try {
                                        if (loaded.getCallable().getTraceId() != null) {
                                            ec.mdcPut("traceId", loaded.getCallable().getTraceId());
                                        }
                                        ((ExceptionHandleable)((Object)loaded.getCallable().getActual())).timeouted();
                                    }
                                    finally {
                                        ec.mdcPut("traceId", null);
                                    }
                                    return null;
                                }
                            });
                        }
                        catch (Throwable ee) {
                            fatalLogger.error("ExceptionHandleable's timeouted() call failed(queue:" + this.getConfig().getName() + ", task:" + loaded.getTaskId() + ",tenantId:" + loaded.getTenantId() + ")", ee);
                            Transaction tran = Transaction.getCurrent();
                            if (tran == null || tran.getStatus() != TransactionStatus.ACTIVE || !tran.isRollbackOnly()) continue block6;
                            tran.addTransactionListener(new TransactionListener(){

                                @Override
                                public void afterRollback(Transaction tt) {
                                    Queue.this.taskAbort(loaded, true, new TaskTimeoutException(), false, true);
                                }
                            });
                        }
                        continue block6;
                    }
                }
            }
            t.setStatus(TaskStatus.EXECUTING);
            t.setVisibleTime(System.currentTimeMillis() + this.config.getWorker().getExecutionTimeout() + this.config.getWorker().getRestartDelay());
            this.dao.update(t);
            return this.dao.load(t.getTenantId(), t.getQueueId(), t.getTaskId(), true, false, false);
        }
        return null;
    }

    public void taskFinish(Task task) {
        if (task.isReturnResult()) {
            task.setStatus(TaskStatus.RETURNED);
            this.dao.update(task);
        } else {
            this.dao.moveToHistory(task, TaskStatus.COMPLETED);
        }
    }

    public boolean taskAbort(Task task, boolean mayInterruptIfRunning, Throwable cause, boolean callCancel, boolean withTSCheck) {
        Task loaded = this.dao.load(task.getTenantId(), task.getQueueId(), task.getTaskId(), true, false, true);
        if (loaded != null) {
            if (withTSCheck && !loaded.getUpdateTime().equals(task.getUpdateTime())) {
                return false;
            }
            switch (loaded.getStatus()) {
                case SUBMITTED: 
                case EXECUTING: {
                    if (cause != null) {
                        try {
                            this.checkSerialize(cause);
                        }
                        catch (Exception e) {
                            logger.debug("Can't serialize abort cause exception instance:" + cause);
                            e.addSuppressed(new NotSerializableException(cause.toString()));
                            cause = e;
                        }
                        loaded.setResult(cause);
                        loaded.setStatus(TaskStatus.ABORTED);
                        this.dao.update(loaded);
                        loaded = this.dao.load(task.getTenantId(), task.getQueueId(), task.getTaskId(), true, false, true);
                    }
                    if (callCancel || !loaded.isReturnResult()) {
                        this.dao.moveToHistory(loaded, TaskStatus.ABORTED);
                    }
                    return true;
                }
            }
        }
        return false;
    }

    private void checkSerialize(Object obj) throws NotSerializableException {
        if (obj != null) {
            try (ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
                 ObjectOutputStream out = new ObjectOutputStream(byteOut);){
                out.writeObject(obj);
            }
            catch (NotSerializableException e) {
                throw e;
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public Task pullResult(long taskId) {
        Transaction t = Transaction.getCurrent();
        if (t != null && t.getStatus() == TransactionStatus.ACTIVE && this.isNeverStartId(this.getName(), taskId, t)) {
            throw new AsyncTaskRuntimeException("never start async task, because AsyncTaskStartMode is AFTER_COMMIT");
        }
        Task task = this.dao.load(ExecuteContext.getCurrentContext().getClientTenantId(), this.config.getId(), taskId, false, false, false);
        if (task == null) {
            Task unkown = new Task();
            unkown.setQueueId(this.config.getId());
            unkown.setTaskId(taskId);
            unkown.setStatus(TaskStatus.UNKNOWN);
            return unkown;
        }
        if (task.getStatus() == TaskStatus.RETURNED || task.getStatus() == TaskStatus.ABORTED) {
            task = this.dao.load(ExecuteContext.getCurrentContext().getClientTenantId(), this.config.getId(), taskId, true, false, true);
            if (task.getStatus() == TaskStatus.RETURNED) {
                this.dao.moveToHistory(task, TaskStatus.COMPLETED);
                task.setStatus(TaskStatus.COMPLETED);
                return task;
            }
            if (task.getStatus() == TaskStatus.ABORTED) {
                this.dao.moveToHistory(task, TaskStatus.ABORTED);
                task.setStatus(TaskStatus.ABORTED);
                return task;
            }
            return null;
        }
        return null;
    }

    public Task peek(long taskId, boolean withBinary, boolean withHistory) {
        return this.dao.load(ExecuteContext.getCurrentContext().getClientTenantId(), this.config.getId(), taskId, withBinary, withHistory, false);
    }

    public String getName() {
        return this.config.getName();
    }

    public List<Task> search(TaskSearchCondition cond) {
        cond.setQueueId(this.config.getId());
        return this.dao.search(cond);
    }

    public void moveNoGetResultTaskToHistory() {
        TaskSearchCondition cond = new TaskSearchCondition();
        cond.setStatus(TaskStatus.RETURNED);
        Timestamp dateBefore = new Timestamp(System.currentTimeMillis() - this.config.getResultRemainingTime());
        cond.setUpdateDate(dateBefore);
        List<Task> list = this.search(cond);
        for (Task t : list) {
            Task target = t;
            Task pulled = Transaction.requiresNew(tran -> this.pullResult(target.getTaskId()));
            if (pulled == null || pulled.getStatus() == TaskStatus.UNKNOWN) continue;
            mtpLogger.warn("async task(queue:" + this.getName() + ",tenantId:" + pulled.getTenantId() + ",taskId:" + pulled.getTaskId() + ") waiting get result timeout. so move to history");
        }
    }

    public void forceDelete(long taskId) {
        Task t = this.dao.load(ExecuteContext.getCurrentContext().getClientTenantId(), this.config.getId(), taskId, false, false, false);
        if (t == null) {
            mtpLogger.warn("probably async task(queue:" + this.getName() + ",taskId:" + taskId + ") has been moved to the history. so skip force delete");
        } else {
            this.dao.delete(t);
        }
    }
}

