/*
 * Decompiled with CFR 0.152.
 */
package cn.zhxu.toys.concurrent;

import cn.zhxu.toys.concurrent.ParallelScheduler;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ParallelTaskScheduler
implements ParallelScheduler,
ExecutorService {
    private static final Logger log = LoggerFactory.getLogger(ParallelTaskScheduler.class);
    private int batchSize = 20;
    private final ThreadPoolExecutor exePool;
    private final AtomicInteger totalTasks;
    private boolean verbose;

    public ParallelTaskScheduler() {
        this(100);
    }

    public ParallelTaskScheduler(int corePoolSize) {
        this(new ScheduledThreadPoolExecutor(corePoolSize));
    }

    public ParallelTaskScheduler(ThreadPoolExecutor exePool) {
        this.exePool = exePool;
        this.totalTasks = new AtomicInteger(0);
    }

    @Override
    public <T> void schedule(int concurrency, ParallelScheduler.TaskProvider<T> provider, ParallelScheduler.TaskExecutor<T> executor) {
        this.schedule(concurrency, provider, executor, null);
    }

    @Override
    public <T> void schedule(int concurrency, ParallelScheduler.TaskProvider<T> provider, ParallelScheduler.TaskExecutor<T> executor, ParallelScheduler.Identify<T> identify) {
        int page = 0;
        LinkedList futures = new LinkedList();
        List<T> tasks = provider.getTaskList(page, this.batchSize);
        int size = tasks.size();
        long maxId = 0L;
        while (size > 0) {
            Queue taskQueue;
            Stream<Object> taskStream = tasks.stream();
            if (maxId > 0L) {
                long finalMaxId = maxId;
                taskStream = taskStream.filter(t -> identify.id(t) > finalMaxId);
            }
            if ((taskQueue = this.toQueue(taskStream)).size() > 0) {
                if (identify != null) {
                    Optional maxOpt = taskQueue.stream().max((t1, t2) -> (int)(identify.id(t1) - identify.id(t2)));
                    maxId = identify.id(maxOpt.get());
                }
                this.submitTasks(concurrency, taskQueue, executor, futures);
            }
            if (size < this.batchSize) break;
            tasks = provider.getTaskList(++page, this.batchSize);
            size = tasks.size();
        }
        this.waitDone(futures);
    }

    @Override
    public <T> Future<?> asyncSchedule(int concurrency, ParallelScheduler.TaskProvider<T> provider, ParallelScheduler.TaskExecutor<T> executor) {
        return this.asyncSchedule(concurrency, provider, executor, null);
    }

    @Override
    public <T> Future<?> asyncSchedule(int concurrency, ParallelScheduler.TaskProvider<T> provider, ParallelScheduler.TaskExecutor<T> executor, ParallelScheduler.Identify<T> identify) {
        return this.exePool.submit(() -> this.schedule(concurrency, provider, executor, identify));
    }

    @Override
    public void execute(Runnable command) {
        this.exePool.execute(command);
    }

    @Override
    public void shutdown() {
        this.exePool.shutdown();
    }

    @Override
    public List<Runnable> shutdownNow() {
        return this.exePool.shutdownNow();
    }

    @Override
    public boolean isShutdown() {
        return this.exePool.isShutdown();
    }

    @Override
    public boolean isTerminated() {
        return this.exePool.isTerminated();
    }

    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
        return this.exePool.awaitTermination(timeout, unit);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return this.exePool.submit(task);
    }

    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        return this.exePool.submit(task, result);
    }

    @Override
    public Future<?> submit(Runnable task) {
        return this.exePool.submit(task);
    }

    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
        return this.exePool.invokeAll(tasks);
    }

    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
        return this.exePool.invokeAll(tasks, timeout, unit);
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
        return this.exePool.invokeAny(tasks);
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return this.exePool.invokeAny(tasks, timeout, unit);
    }

    private <T> Queue<T> toQueue(Stream<T> taskStream) {
        final LinkedList taskQueue = new LinkedList();
        taskStream.forEach(new Consumer<T>(){

            @Override
            public void accept(T t) {
                taskQueue.add(t);
            }
        });
        return taskQueue;
    }

    protected <T> void submitTasks(int concurrency, Queue<T> tasks, final ParallelScheduler.TaskExecutor<T> executor, List<Future<?>> futures) {
        while (tasks.size() > 0) {
            while (futures.size() < concurrency && tasks.size() > 0) {
                final T task = tasks.poll();
                futures.add(this.exePool.submit(new Runnable(){

                    @Override
                    public void run() {
                        int ac;
                        int tc;
                        if (ParallelTaskScheduler.this.verbose) {
                            tc = ParallelTaskScheduler.this.totalTasks.incrementAndGet();
                            ac = ParallelTaskScheduler.this.exePool.getActiveCount();
                            log.info("\u5e73\u884c\u673a\uff1a" + tc + ", " + ac);
                        }
                        executor.execute(task);
                        if (ParallelTaskScheduler.this.verbose) {
                            tc = ParallelTaskScheduler.this.totalTasks.decrementAndGet();
                            ac = ParallelTaskScheduler.this.exePool.getActiveCount();
                            log.info("\u5e73\u884c\u673a\uff1a" + tc + ", " + ac);
                        }
                    }
                }));
            }
            if (tasks.size() <= 0) continue;
            try {
                Thread.sleep(1L);
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e.getMessage(), e);
            }
            for (int i = futures.size() - 1; i >= 0; --i) {
                Future<?> f = futures.get(i);
                if (!f.isDone()) continue;
                futures.remove(i);
            }
        }
    }

    private void waitDone(List<Future<?>> futures) {
        futures.forEach(new Consumer<Future<?>>(){

            @Override
            public void accept(Future<?> f) {
                try {
                    f.get();
                }
                catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException(e.getMessage(), e);
                }
            }
        });
    }

    public void setBatchSize(int batchSize) {
        this.batchSize = batchSize;
    }

    public void setVerbose(boolean verbose) {
        this.verbose = verbose;
    }
}

