/*
 * Decompiled with CFR 0.152.
 */
package net.anwiba.commons.process.queue;

import java.util.List;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.RunnableScheduledFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import net.anwiba.commons.logging.ILevel;
import net.anwiba.commons.logging.ILogger;
import net.anwiba.commons.process.IProcessIdentfier;
import net.anwiba.commons.process.cancel.ICancelerListener;
import net.anwiba.commons.process.queue.ICancelableRunnable;
import net.anwiba.commons.process.queue.IRunnable;
import net.anwiba.commons.process.queue.IWorkQueue;

public class WorkQueue
implements IWorkQueue {
    private final ScheduledThreadPoolExecutor executor;

    public static IWorkQueue create(ILogger logger, final String queueName, int threadCount, final boolean asDaemon, final int priority) {
        ThreadFactory factory = new ThreadFactory(){
            AtomicInteger counter = new AtomicInteger();
            ThreadFactory threadFactory = Executors.defaultThreadFactory();

            @Override
            public Thread newThread(Runnable runnable) {
                Thread thread = this.threadFactory.newThread(runnable);
                thread.setName(String.valueOf(queueName) + "-T-" + this.counter.getAndIncrement());
                thread.setPriority(priority);
                thread.setDaemon(asDaemon);
                return thread;
            }
        };
        final ExecutionHandler handler = new ExecutionHandler(logger, queueName);
        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(threadCount, factory, handler){

            @Override
            protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
                return new ProcessScheduledFuture<V>(task, (IRunnable)runnable);
            }

            @Override
            protected void beforeExecute(Thread thread, Runnable runnable) {
                super.beforeExecute(thread, runnable);
                ProcessScheduledFuture future = (ProcessScheduledFuture)runnable;
                handler.beforeExecute(thread, future, this.getPoolSize(), this.getActiveCount(), this.getQueue().size(), QueueState.get(this));
            }

            @Override
            public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
                ProcessScheduledFuture future = (ProcessScheduledFuture)super.schedule(command, delay, unit);
                handler.execute(future, this.getPoolSize(), this.getActiveCount(), this.getQueue().size(), QueueState.get(this));
                return future;
            }

            @Override
            protected void afterExecute(Runnable runnable, Throwable throwable) {
                super.afterExecute(runnable, throwable);
                ProcessScheduledFuture future = (ProcessScheduledFuture)runnable;
                handler.afterExecute(future, throwable, this.getPoolSize(), this.getActiveCount(), this.getQueue().size(), QueueState.get(this));
            }
        };
        return new WorkQueue(executor);
    }

    public WorkQueue(ScheduledThreadPoolExecutor executor) {
        this.executor = executor;
    }

    @Override
    public ICancelableRunnable execute(IRunnable runnable) throws IllegalStateException {
        try {
            long delay = runnable.getDelay(TimeUnit.MILLISECONDS);
            if (runnable.isPeriodic()) {
                final ProcessScheduledFuture scheduledFuture = (ProcessScheduledFuture)this.executor.scheduleAtFixedRate(runnable, delay, delay, TimeUnit.MILLISECONDS);
                return new ICancelableRunnable(){

                    @Override
                    public IProcessIdentfier getIdentifier() {
                        return scheduledFuture.getIdentifier();
                    }

                    @Override
                    public boolean isCanceled() {
                        return scheduledFuture.isCancelled();
                    }

                    @Override
                    public boolean isCancel() {
                        return scheduledFuture.cancel(true);
                    }
                };
            }
            final ProcessScheduledFuture scheduledFuture = (ProcessScheduledFuture)this.executor.schedule(runnable, delay, TimeUnit.MILLISECONDS);
            return new ICancelableRunnable(){

                @Override
                public IProcessIdentfier getIdentifier() {
                    return scheduledFuture.getIdentifier();
                }

                @Override
                public boolean isCanceled() {
                    return scheduledFuture.isCancelled();
                }

                @Override
                public boolean isCancel() {
                    return scheduledFuture.cancel(true);
                }
            };
        }
        catch (RejectedExecutionException exception) {
            throw new IllegalStateException(exception);
        }
    }

    @Override
    public void shutdown() {
        this.executor.shutdown();
    }

    @Override
    public void waitForWorkQueueFinished(long timeout) {
        try {
            this.executor.awaitTermination(timeout, TimeUnit.SECONDS);
        }
        catch (InterruptedException interruptedException) {
            this.executor.shutdownNow();
        }
    }

    @Override
    public void remove(IProcessIdentfier identifier) {
        List futures = this.executor.getQueue().stream().map(r -> (ProcessScheduledFuture)r).collect(Collectors.toList());
        for (ProcessScheduledFuture future : futures) {
            if (!future.getIdentifier().equals(identifier)) continue;
            this.executor.getQueue().remove(future);
            return;
        }
    }

    @Override
    public void cancel(IProcessIdentfier identifier) {
        List futures = this.executor.getQueue().stream().map(r -> (ProcessScheduledFuture)r).collect(Collectors.toList());
        for (ProcessScheduledFuture future : futures) {
            if (!future.getIdentifier().equals(identifier)) continue;
            future.cancel(true);
            return;
        }
    }

    public static final class ExecutionHandler
    implements RejectedExecutionHandler {
        private final ILogger logger;
        private final String queueName;

        public ExecutionHandler(ILogger logger, String queueName) {
            this.logger = logger;
            this.queueName = queueName;
        }

        public void execute(ProcessScheduledFuture<?> future, int poolSize, int activeCount, int queueSize, QueueState queueState) {
            String message = "Process " + future.getIdentifier().toString() + " finished from queue " + this.queueName + ", queue state is '" + queueState.name() + "', pool size : " + poolSize + ", active processes: " + activeCount + ", queued processes: " + queueSize;
            this.logger.log(ILevel.DEBUG, message);
        }

        public void beforeExecute(Thread thread, ProcessScheduledFuture<?> future, int poolSize, int activeCount, int queueSize, QueueState queueState) {
            String message = "Process " + future.getIdentifier().toString() + " started from queue " + this.queueName + ", task '" + thread.getName() + "', queue state is '" + queueState.name() + "', pool size : " + poolSize + ", active processes: " + activeCount + ", queued processes: " + queueSize;
            this.logger.log(ILevel.DEBUG, message);
        }

        public void afterExecute(ProcessScheduledFuture<?> future, Throwable throwable, int poolSize, int activeCount, int queueSize, QueueState queueState) {
            String message = "Process " + future.getIdentifier().toString() + " finished from queue " + this.queueName + ", queue state is '" + queueState.name() + "', pool size : " + poolSize + ", active processes: " + activeCount + ", queued processes: " + queueSize;
            this.logger.log(ILevel.DEBUG, message, throwable);
        }

        public String getIdentifierString(IRunnable runnable) {
            if (runnable == null) {
                return "-";
            }
            return runnable.getIdentifier().toString();
        }

        private void rejectedExecution(ProcessScheduledFuture<?> future, int poolSize, int activeCount, int queueSize, QueueState queueState) {
            String message = "Process " + future.toString() + " rejected from queue " + this.queueName + ", queue state is '" + queueState.name() + "', pool size : " + poolSize + ", active processes: " + activeCount + ", queued processes: " + queueSize;
            this.logger.log(ILevel.DEBUG, message);
            throw new RejectedExecutionException(message);
        }

        @Override
        public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
            this.rejectedExecution((ProcessScheduledFuture)runnable, executor.getPoolSize(), executor.getActiveCount(), executor.getQueue().size(), QueueState.get(executor));
        }
    }

    public static final class ProcessScheduledFuture<V>
    implements RunnableScheduledFuture<V> {
        private final RunnableScheduledFuture<V> task;
        private final IRunnable runnable;

        public ProcessScheduledFuture(RunnableScheduledFuture<V> task, IRunnable runnable) {
            this.task = task;
            this.runnable = runnable;
        }

        public IProcessIdentfier getIdentifier() {
            return this.runnable.getIdentifier();
        }

        @Override
        public void run() {
            ICancelerListener listner = new ICancelerListener(){

                @Override
                public void canceled() {
                    if (task.isCancelled()) {
                        return;
                    }
                    task.cancel(true);
                }
            };
            try {
                this.runnable.addCancelerListener(listner);
                this.task.run();
            }
            finally {
                this.runnable.removeCancelerListener(listner);
            }
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            if (this.isCancelled()) {
                return true;
            }
            if (!this.runnable.isCancelled()) {
                this.runnable.cancel(mayInterruptIfRunning);
            }
            if (this.task.isCancelled()) {
                return true;
            }
            return this.task.cancel(mayInterruptIfRunning);
        }

        @Override
        public boolean isCancelled() {
            return this.runnable.isCancelled() && this.task.isCancelled();
        }

        @Override
        public boolean isDone() {
            return this.task.isDone();
        }

        @Override
        public V get() throws InterruptedException, ExecutionException {
            return this.task.get();
        }

        @Override
        public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            return this.task.get(timeout, unit);
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return this.task.getDelay(unit);
        }

        @Override
        public int compareTo(Delayed o) {
            return this.task.compareTo(o);
        }

        @Override
        public boolean isPeriodic() {
            return this.runnable.isPeriodic();
        }
    }

    public static enum QueueState {
        ACTIVE,
        SHUTDOWN,
        TERMINATING,
        TERMINATED;


        public static QueueState get(ThreadPoolExecutor executor) {
            if (executor.isShutdown()) {
                return SHUTDOWN;
            }
            if (executor.isTerminating()) {
                return TERMINATING;
            }
            if (executor.isShutdown()) {
                return TERMINATED;
            }
            return ACTIVE;
        }
    }
}

