/*
 * Decompiled with CFR 0.152.
 */
package org.iplass.mtp.impl.async.rdb.workers;

import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.iplass.mtp.async.ExceptionHandlingMode;
import org.iplass.mtp.async.TaskTimeoutException;
import org.iplass.mtp.impl.async.AsyncTaskContextImpl;
import org.iplass.mtp.impl.async.ExceptionHandleable;
import org.iplass.mtp.impl.async.rdb.Queue;
import org.iplass.mtp.impl.async.rdb.QueueConfig;
import org.iplass.mtp.impl.async.rdb.Task;
import org.iplass.mtp.impl.async.rdb.Worker;
import org.iplass.mtp.impl.async.rdb.WorkerConfig;
import org.iplass.mtp.impl.async.rdb.workers.WorkerState;
import org.iplass.mtp.impl.core.Executable;
import org.iplass.mtp.impl.core.ExecuteContext;
import org.iplass.mtp.impl.core.TenantContext;
import org.iplass.mtp.impl.core.TenantContextService;
import org.iplass.mtp.impl.core.config.ServerEnv;
import org.iplass.mtp.impl.rdb.connection.ResourceHolder;
import org.iplass.mtp.spi.ServiceRegistry;
import org.iplass.mtp.transaction.Transaction;
import org.iplass.mtp.transaction.TransactionListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class LocalWorker
implements Worker {
    private static Logger logger = LoggerFactory.getLogger(LocalWorker.class);
    private static Logger mtpLogger = LoggerFactory.getLogger((String)"mtp.async.rdb");
    private static Logger fatalLogger = LoggerFactory.getLogger((String)"mtp.fatal.async.rdb");
    private static Random rand = new Random(ServerEnv.getInstance().getServerId().hashCode());
    protected final QueueConfig queueConfig;
    protected final WorkerConfig workerConfig;
    protected final int workerId;
    protected final Queue queue;
    protected volatile WorkerState state;
    private ScheduledExecutorService queuePoller;
    protected AtomicInteger counter = new AtomicInteger();

    public LocalWorker(Queue queue, int workerId) {
        this.queue = queue;
        this.queueConfig = queue.getConfig();
        this.workerConfig = queue.getConfig().getWorker();
        this.workerId = workerId;
        this.state = WorkerState.STOPPED;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void start() {
        if (this.state != WorkerState.STOPPED) {
            logger.warn(this.queueConfig.getName() + "'s worker:" + this.workerId + " is not stopped. so can not (re)start worker...");
            return;
        }
        LocalWorker localWorker = this;
        synchronized (localWorker) {
            if (this.state == WorkerState.STOPPED) {
                SecurityManager s = System.getSecurityManager();
                final ThreadGroup group = s != null ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
                this.queuePoller = Executors.newSingleThreadScheduledExecutor(new ThreadFactory(){

                    @Override
                    public Thread newThread(Runnable r) {
                        Thread t = new Thread(group, r, LocalWorker.this.queueConfig.getName() + "-" + LocalWorker.this.workerId + "-poller-" + LocalWorker.this.counter.incrementAndGet(), 0L);
                        if (t.isDaemon()) {
                            t.setDaemon(false);
                        }
                        if (t.getPriority() != 5) {
                            t.setPriority(5);
                        }
                        return t;
                    }
                });
                this.startImpl();
                this.state = WorkerState.STARTED;
                long delay = (long)((double)this.workerConfig.getPollingInterval() * rand.nextDouble());
                this.queuePoller.schedule(new PollingTask(false), delay, TimeUnit.MILLISECONDS);
                mtpLogger.info("worker(id:" + this.workerId + ") of queue:" + this.queue.getName() + " is started.");
            } else {
                logger.debug(this.queueConfig.getName() + "'s worker:" + this.workerId + " is already stared. may be another thread call start().");
            }
        }
    }

    protected abstract void startImpl();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void stop() {
        if (this.state != WorkerState.STARTED) {
            logger.warn(this.queueConfig.getName() + "'s worker:" + this.workerId + " is not stared.");
            return;
        }
        LocalWorker localWorker = this;
        synchronized (localWorker) {
            if (this.state == WorkerState.STARTED) {
                this.state = WorkerState.STOPPING;
                this.queuePoller.shutdown();
                try {
                    boolean isOk = this.queuePoller.awaitTermination((long)((double)this.workerConfig.getExecutionTimeout() * 1.3), TimeUnit.MILLISECONDS);
                    if (!isOk) {
                        logger.error(this.queueConfig.getName() + "'s worker:" + this.workerId + " stop process timeout. may be illegal state....");
                    }
                }
                catch (InterruptedException e) {
                    logger.error(this.queueConfig.getName() + "'s worker:" + this.workerId + " stop process Interrupted. may be illegal state....", (Throwable)e);
                }
                this.stopImpl();
                this.state = WorkerState.STOPPED;
                mtpLogger.info("worker(id:" + this.workerId + ") of queue:" + this.queue.getName() + " is stopped.");
            } else {
                logger.warn(this.queueConfig.getName() + "'s worker:" + this.workerId + " is not stared. may be another thread call stop().");
            }
        }
    }

    protected abstract void stopImpl();

    protected abstract Future<Void> doTaskAndStatusUpdate(Task var1);

    @Override
    public void wakeup() {
        if (this.state == WorkerState.STARTED) {
            this.queuePoller.schedule(new PollingTask(true), 0L, TimeUnit.MILLISECONDS);
        }
    }

    private class PollingTask
    implements Runnable {
        private final boolean oneshot;

        public PollingTask(boolean oneshot) {
            this.oneshot = oneshot;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                ResourceHolder.init();
                if (!ExecuteContext.isInited()) {
                    TenantContextService tcs = ServiceRegistry.getRegistry().getService(TenantContextService.class);
                    TenantContext tContext = tcs.getSharedTenantContext();
                    ExecuteContext econtext = new ExecuteContext(tContext);
                    ExecuteContext.initContext(econtext);
                }
                try {
                    while (true) {
                        if (LocalWorker.this.state == WorkerState.STARTED) {
                            final Task task = Transaction.required(t -> LocalWorker.this.queue.poll(LocalWorker.this.workerId, LocalWorker.this.workerConfig.isLocal()));
                            if (task == null) {
                                return;
                            }
                            TenantContext taskTC = ServiceRegistry.getRegistry().getService(TenantContextService.class).getTenantContext(task.getTenantId());
                            ExecuteContext.executeAs(taskTC, new Executable<Void>(){

                                @Override
                                public Void execute() {
                                    if (logger.isDebugEnabled()) {
                                        logger.debug("polled task:" + task);
                                    }
                                    Future<Void> ft = LocalWorker.this.doTaskAndStatusUpdate(task);
                                    try {
                                        ft.get(LocalWorker.this.workerConfig.getExecutionTimeout(), TimeUnit.MILLISECONDS);
                                    }
                                    catch (InterruptedException e) {
                                        mtpLogger.error("queue:" + LocalWorker.this.queueConfig.getName() + "'s queuePoller is interrupted on task execution wait...", (Throwable)e);
                                        Thread.currentThread().interrupt();
                                    }
                                    catch (Error | RuntimeException | ExecutionException | TimeoutException e) {
                                        String msg = e instanceof TimeoutException ? "timeout" : "failed";
                                        switch (task.getExceptionHandlingMode()) {
                                            case RESTART: {
                                                mtpLogger.error("queue:" + LocalWorker.this.queueConfig.getName() + "'s task:" + task.getTaskId() + "(tenantId:" + task.getTenantId() + ") is " + msg + ". re-run after a while.", e);
                                                if (ft.cancel(true)) break;
                                                logger.debug("task process cant cancel.may be completed." + task);
                                                break;
                                            }
                                            case ABORT_LOG_FATAL: 
                                            case ABORT: {
                                                if (task.getExceptionHandlingMode() == ExceptionHandlingMode.ABORT_LOG_FATAL || e instanceof Error) {
                                                    fatalLogger.error("queue:" + LocalWorker.this.queueConfig.getName() + "'s task:" + task.getTaskId() + "(tenantId:" + task.getTenantId() + ") " + msg + ", so abort task.", e);
                                                } else {
                                                    mtpLogger.error("queue:" + LocalWorker.this.queueConfig.getName() + "'s task:" + task.getTaskId() + "(tenantId:" + task.getTenantId() + ") is " + msg + ", so abort task.", e);
                                                }
                                                if (ft.cancel(true)) {
                                                    Transaction.required(transaction -> {
                                                        final Throwable t = e instanceof TimeoutException ? new TaskTimeoutException(e) : e;
                                                        if (LocalWorker.this.queue.taskAbort(task, true, t, false, false)) {
                                                            if (!(task.getCallable().getActual() instanceof ExceptionHandleable)) return;
                                                            AsyncTaskContextImpl asyncTaskContext = new AsyncTaskContextImpl(task.getTaskId(), LocalWorker.this.queue.getName());
                                                            try {
                                                                ExecuteContext.getCurrentContext().setAttribute("mtp.async.AsyncTaskContext", asyncTaskContext, false);
                                                                if (e instanceof TimeoutException) {
                                                                    ((ExceptionHandleable)((Object)task.getCallable().getActual())).timeouted();
                                                                    return;
                                                                }
                                                                ((ExceptionHandleable)((Object)task.getCallable().getActual())).aborted(t);
                                                                return;
                                                            }
                                                            catch (Throwable ee) {
                                                                fatalLogger.error("ExceptionHandleable's aborted()/timeouted() call failed(queue:" + LocalWorker.this.queue.getConfig().getName() + ", task:" + task.getTaskId() + ",tenantId:" + task.getTenantId() + ")", ee);
                                                                if (!(t instanceof TaskTimeoutException) || !transaction.isRollbackOnly()) return;
                                                                transaction.addTransactionListener(new TransactionListener(){

                                                                    @Override
                                                                    public void afterRollback(Transaction tt) {
                                                                        LocalWorker.this.queue.taskAbort(task, true, t, false, true);
                                                                    }
                                                                });
                                                                return;
                                                            }
                                                            finally {
                                                                ExecuteContext.getCurrentContext().removeAttribute("mtp.async.AsyncTaskContext");
                                                            }
                                                        } else {
                                                            mtpLogger.warn("task status cant update to [abort]. may be another process update status." + task);
                                                        }
                                                    });
                                                    break;
                                                }
                                                mtpLogger.warn("task process cant cancel. may be completed." + task);
                                                break;
                                            }
                                        }
                                    }
                                    return null;
                                }
                            });
                            continue;
                        }
                        break;
                    }
                }
                catch (RuntimeException e) {
                    logger.error("exception while polling...", (Throwable)e);
                }
                catch (Error e) {
                    fatalLogger.error("fatal Error while polling...", (Throwable)e);
                }
                finally {
                    if (LocalWorker.this.state == WorkerState.STARTED && !this.oneshot) {
                        LocalWorker.this.queuePoller.schedule(this, LocalWorker.this.workerConfig.getPollingInterval(), TimeUnit.MILLISECONDS);
                    }
                }
            }
            finally {
                ExecuteContext.finContext();
                ResourceHolder.fin();
            }
        }
    }
}

