/*
 * Decompiled with CFR 0.152.
 */
package code.ponfee.commons.concurrent;

import com.google.common.base.Stopwatch;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MultithreadExecutors {
    private static final Logger LOG = LoggerFactory.getLogger(MultithreadExecutors.class);

    public static void execute(int parallelism, Runnable command, int execSeconds, Executor executor) {
        Stopwatch watch = Stopwatch.createStarted();
        AtomicBoolean flag = new AtomicBoolean(true);
        CompletableFuture[] futures = (CompletableFuture[])IntStream.range(0, parallelism).mapToObj(i -> () -> {
            while (flag.get() && !Thread.currentThread().isInterrupted()) {
                command.run();
            }
        }).map(runnable -> CompletableFuture.runAsync(runnable, executor)).toArray(CompletableFuture[]::new);
        try {
            Thread.sleep((long)execSeconds * 1000L);
            flag.set(false);
            CompletableFuture.allOf(futures).join();
        }
        catch (InterruptedException e) {
            flag.set(false);
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
        finally {
            LOG.info("multi thread exec async duration: {}", (Object)watch.stop());
        }
    }

    public static void execute(Runnable command, int parallelism, Executor executor) {
        Stopwatch watch = Stopwatch.createStarted();
        CompletableFuture[] futures = (CompletableFuture[])IntStream.range(0, parallelism).mapToObj(i -> CompletableFuture.runAsync(command, executor)).toArray(CompletableFuture[]::new);
        CompletableFuture.allOf(futures).join();
        LOG.info("multi thread run async duration: {}", (Object)watch.stop());
    }

    public static <U> List<U> execute(Supplier<U> supplier, int parallelism) {
        Stopwatch watch = Stopwatch.createStarted();
        List result = IntStream.range(0, parallelism).mapToObj(i -> CompletableFuture.supplyAsync(supplier)).collect(Collectors.toList()).stream().map(CompletableFuture::join).collect(Collectors.toList());
        LOG.info("multi thread call async duration: {}", (Object)watch.stop());
        return result;
    }

    public static <T> void execute(Collection<T> coll, Consumer<T> action, Executor executor) {
        Stopwatch watch = Stopwatch.createStarted();
        coll.stream().map(e -> CompletableFuture.runAsync(() -> action.accept(e), executor)).collect(Collectors.toList()).forEach(CompletableFuture::join);
        LOG.info("multi thread run async duration: {}", (Object)watch.stop());
    }

    public static <T, U> List<U> execute(Collection<T> coll, Function<T, U> mapper, Executor executor) {
        Stopwatch watch = Stopwatch.createStarted();
        List result = coll.stream().map(e -> CompletableFuture.supplyAsync(() -> mapper.apply(e), executor)).collect(Collectors.toList()).stream().map(CompletableFuture::join).collect(Collectors.toList());
        LOG.info("multi thread call async duration: {}", (Object)watch.stop());
        return result;
    }

    public static <T, R> List<R> execute(Collection<T> data, Function<T, R> action, int dataSizeThreshold, Executor executor) {
        if (CollectionUtils.isEmpty(data)) {
            return Collections.emptyList();
        }
        if (dataSizeThreshold < 1 || data.size() < dataSizeThreshold) {
            return data.stream().map(action).collect(Collectors.toList());
        }
        ExecutorCompletionService service = new ExecutorCompletionService(executor);
        data.forEach(e -> service.submit(() -> action.apply(e)));
        return MultithreadExecutors.join(service, data.size());
    }

    public static <T> void execute(Collection<T> data, Consumer<T> action, int dataSizeThreshold, Executor executor) {
        if (CollectionUtils.isEmpty(data)) {
            return;
        }
        if (dataSizeThreshold < 1 || data.size() < dataSizeThreshold) {
            data.forEach(action);
            return;
        }
        ExecutorCompletionService service = new ExecutorCompletionService(executor);
        data.forEach(e -> service.submit(() -> action.accept(e), null));
        MultithreadExecutors.joinDiscard(service, data.size());
    }

    public static <T> List<T> join(CompletionService<T> service, int count) {
        ArrayList result = new ArrayList(count);
        MultithreadExecutors.join(service, count, result::add);
        return result;
    }

    public static <T> void joinDiscard(CompletionService<T> service, int count) {
        MultithreadExecutors.join(service, count, t -> {});
    }

    public static <T> void join(CompletionService<T> service, int count, Consumer<T> accept) {
        try {
            while (count-- > 0) {
                Future<T> future = service.take();
                accept.accept(future.get());
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
        catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }
}

