/*
 * Decompiled with CFR 0.152.
 */
package org.ujorm.tools.thread;

import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.ujorm.tools.Assert;
import org.ujorm.tools.thread.JobContext;
import org.ujorm.tools.thread.Jobs;

public class MultiJob<P>
extends Jobs<P> {
    @Nonnull
    protected final Executor threadPool;

    protected MultiJob(@Nonnull Stream<P> params, @Nonnull JobContext jobContext) {
        super(params, jobContext.getTimeout());
        this.threadPool = (Executor)Assert.notNull(jobContext.getThreadPool(), "The {} is required", "threadPool");
    }

    @Override
    public <R> Stream<R> run(@Nonnull Jobs.UserFunction<P, R> job) throws Jobs.JobException {
        return this.getParallel().map(p -> CompletableFuture.supplyAsync(() -> job.apply(p), this.threadPool)).collect(Collectors.toList()).stream().map(this.createGrabber()).filter(Objects::nonNull);
    }

    @Override
    public <R> Stream<R> runOfStream(@Nonnull Jobs.UserFunction<P, Stream<R>> job) throws Jobs.JobException {
        return this.getParallel().map(p -> CompletableFuture.supplyAsync(() -> (Stream)job.apply(p), this.threadPool)).collect(Collectors.toList()).stream().map(this.createGrabber()).flatMap(Function.identity());
    }

    protected <R> Function<CompletableFuture<R>, R> createGrabber() {
        return t -> {
            try {
                return t.get(this.timeout.toMillis(), TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException | ExecutionException | TimeoutException e) {
                throw new Jobs.JobException(e);
            }
        };
    }
}

