/*
 * Decompiled with CFR 0.152.
 */
package org.ujorm.tools.thread;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.ujorm.tools.Assert;

public class MultiJob<P> {
    @Nonnull
    protected final Stream<P> params;
    @Nonnull
    protected Duration timeout = Duration.ofHours(1L);
    @Nullable
    protected Executor executor;

    protected MultiJob(@Nonnull Stream<P> params) {
        this.params = Assert.notNull(params, "The {} is required", "params");
    }

    public MultiJob<P> setTimeout(@Nonnull Duration timeout) {
        this.timeout = Assert.notNull(timeout, "The {} is required", "timeout");
        return this;
    }

    public MultiJob<P> setExecutor(@Nullable Executor executor) {
        this.executor = executor;
        return this;
    }

    public MultiJob<P> setNewFixedThreadPool(int nThreads) {
        return this.setExecutor(Executors.newFixedThreadPool(nThreads));
    }

    public MultiJob<P> setNewFixedThreadPool(int nThreads, @Nonnull Duration keepAliveTime) {
        return this.setExecutor(new ThreadPoolExecutor(nThreads, nThreads, keepAliveTime.toMillis(), TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
    }

    public <R> Stream<R> run(@Nonnull Function<P, R> job) throws MultiJobException {
        return this.params.map(this.getAsync(job)).collect(Collectors.toList()).stream().map(this.createGrabber(this.timeout)).filter(Objects::nonNull);
    }

    public <R> Stream<R> runToStream(@Nonnull Function<P, Stream<R>> job) throws MultiJobException {
        return this.params.map(this.getAsync(job)).collect(Collectors.toList()).stream().map(this.createGrabber(this.timeout)).flatMap(Function.identity());
    }

    protected <R> Function<P, CompletableFuture<R>> getAsync(@Nonnull Function<P, R> job) {
        return this.executor != null ? p -> CompletableFuture.supplyAsync(() -> job.apply(p), this.executor) : p -> CompletableFuture.supplyAsync(() -> job.apply(p));
    }

    protected <R> Function<CompletableFuture<R>, R> createGrabber(final @Nonnull Duration timeout) {
        return new Function<CompletableFuture<R>, R>(){

            @Override
            public R apply(@Nonnull CompletableFuture<R> t) {
                try {
                    return this.convert(t);
                }
                catch (InterruptedException | ExecutionException | TimeoutException e) {
                    throw new MultiJobException(e);
                }
            }

            protected R convert(@Nonnull CompletableFuture<R> t) throws InterruptedException, ExecutionException, TimeoutException {
                return t.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
            }
        };
    }

    public static <P> MultiJob<P> forParams(@Nonnull Stream<P> params) {
        return new MultiJob<P>(params);
    }

    public static <P> MultiJob<P> forParams(@Nonnull List<P> params) {
        return MultiJob.forParams(params.stream());
    }

    public static <P> MultiJob<P> forParams(P ... params) {
        return new MultiJob<P>(Arrays.stream(params));
    }

    public static <P> MultiJob<P> forParams(@Nonnull Iterable<P> params) {
        return MultiJob.forParams(StreamSupport.stream(params.spliterator(), false));
    }

    public static class MultiJobException
    extends IllegalStateException {
        public MultiJobException(@Nonnull Throwable cause) {
            super(cause);
        }
    }
}

