/*
 * Decompiled with CFR 0.152.
 */
package org.iplass.mtp.impl.async.rdb.workers;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.iplass.mtp.impl.async.rdb.Queue;
import org.iplass.mtp.impl.async.rdb.RdbQueueService;
import org.iplass.mtp.impl.async.rdb.Task;
import org.iplass.mtp.impl.async.rdb.workers.LocalWorker;
import org.iplass.mtp.impl.async.rdb.workers.WorkerCallable;
import org.iplass.mtp.impl.core.ExecuteContext;
import org.iplass.mtp.impl.core.TenantContext;
import org.iplass.mtp.impl.core.TenantContextService;
import org.iplass.mtp.impl.rdb.connection.ResourceHolder;
import org.iplass.mtp.spi.ServiceRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProcessWorker
extends LocalWorker {
    private static Logger logger = LoggerFactory.getLogger(ProcessWorker.class);
    private static Logger fatalLogger = LoggerFactory.getLogger((String)"mtp.fatal.async.rdb.processworker");
    private ExecutorService executor;

    public ProcessWorker(Queue queue, int workerId) {
        super(queue, workerId);
    }

    private Process createProcess(Task task) throws IOException {
        ArrayList<String> command = new ArrayList<String>();
        command.add(this.queue.getConfig().getWorker().getJavaCommand());
        if (this.queue.getConfig().getWorker().getVmArgs() != null) {
            command.addAll(this.queue.getConfig().getWorker().getVmArgs());
        }
        command.add("-Dmtp.async.rdb.worker.process=true");
        command.add(Main.class.getName());
        ProcessBuilder.Redirect redirect = null;
        if (this.queue.getConfig().getWorker().getRedirectFile() == null) {
            redirect = ProcessBuilder.Redirect.INHERIT;
        } else {
            File f = new File(this.queue.getConfig().getWorker().getRedirectFile());
            if (!f.exists()) {
                f.createNewFile();
            }
            redirect = ProcessBuilder.Redirect.appendTo(f);
        }
        return new ProcessBuilder(command).redirectError(redirect).redirectOutput(redirect).start();
    }

    @Override
    protected void startImpl() {
        SecurityManager s = System.getSecurityManager();
        final ThreadGroup group = s != null ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
        this.executor = Executors.newSingleThreadExecutor(new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(group, r, ProcessWorker.this.queueConfig.getName() + "-" + ProcessWorker.this.workerId + "-processWatcher-" + ProcessWorker.this.counter.incrementAndGet(), 0L);
                if (t.isDaemon()) {
                    t.setDaemon(false);
                }
                if (t.getPriority() != 5) {
                    t.setPriority(5);
                }
                return t;
            }
        });
    }

    @Override
    protected void stopImpl() {
        this.executor.shutdown();
        try {
            boolean isOk = this.executor.awaitTermination((long)((double)this.queue.getConfig().getWorker().getExecutionTimeout() * 1.3), TimeUnit.MILLISECONDS);
            if (!isOk) {
                logger.error(this.queueConfig.getName() + "'s processWatcher:" + this.workerId + " stop process timeout( at ProcessWorker). may be illegal state....");
            }
        }
        catch (InterruptedException e) {
            logger.error(this.queueConfig.getName() + "'s processWatcher:" + this.workerId + " stop process Interrupted( at ProcessWorker). may be illegal state....", (Throwable)e);
        }
    }

    @Override
    protected Future<Void> doTaskAndStatusUpdate(final Task task) {
        return this.executor.submit(new Callable<Void>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public Void call() throws Exception {
                boolean isDone = false;
                Process process = ProcessWorker.this.createProcess(task);
                try (OutputStream os = process.getOutputStream();
                     ObjectOutputStream oos = new ObjectOutputStream(new BufferedOutputStream(os));){
                    oos.writeObject(task);
                    oos.flush();
                    int exitCode = process.waitFor();
                    if (exitCode != 0) {
                        logger.warn("process exited un-normal code:" + exitCode + ", re-run or abort after a while.");
                    }
                    isDone = true;
                }
                finally {
                    process.destroy();
                    if (!isDone) {
                        logger.warn("process exited illegally, maybe cancel called.");
                    }
                }
                return null;
            }
        });
    }

    public static class Main {
        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public static void main(String[] args) throws Exception {
            boolean isSuccess = false;
            InputStream is = System.in;
            ObjectInputStream ois = new ObjectInputStream(new BufferedInputStream(is));
            Task task = (Task)ois.readObject();
            try {
                Queue queue;
                ResourceHolder.init();
                if (!ExecuteContext.isInited()) {
                    TenantContextService tcs = ServiceRegistry.getRegistry().getService(TenantContextService.class);
                    TenantContext tContext = new TenantContext(tcs.getSharedTenantId(), null, null, true);
                    ExecuteContext econtext = new ExecuteContext(tContext);
                    ExecuteContext.initContext(econtext);
                }
                if ((queue = ServiceRegistry.getRegistry().getService(RdbQueueService.class).getQueueById(task.getQueueId())) == null) {
                    throw new IllegalArgumentException("queueId:" + task.getQueueId() + " not found");
                }
                WorkerCallable wc = new WorkerCallable(task, queue, queue.getConfig().getWorker().isTrace(), false);
                wc.call();
                isSuccess = true;
            }
            catch (Throwable t) {
                logger.error("error occured while task processing:" + String.valueOf(task), t);
                if (t instanceof Error) {
                    fatalLogger.error("fatal error occured while task processing:" + String.valueOf(task), t);
                }
            }
            finally {
                ExecuteContext.finContext();
                ResourceHolder.fin();
                ServiceRegistry.getRegistry().destroyAllService();
                if (isSuccess) {
                    System.exit(0);
                } else {
                    System.exit(1);
                }
            }
        }
    }
}

