/*
 * Decompiled with CFR 0.152.
 */
package org.ujorm.tools.thread;

import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.ujorm.tools.Assert;
import org.ujorm.tools.thread.JobContext;
import org.ujorm.tools.thread.Jobs;

public class ParallelJob<P>
extends Jobs<P> {
    protected static final String REQUIRED_INPUT_TEMPLATE_MSG = "The {} is required";
    @Nonnull
    protected final ForkJoinPool threadPool;

    protected ParallelJob(@Nonnull Stream<P> params, @Nonnull JobContext jobContext) {
        super(params, jobContext.getTimeout());
        this.threadPool = (ForkJoinPool)Assert.notNull(jobContext.getThreadPool(), REQUIRED_INPUT_TEMPLATE_MSG, "threadPool");
    }

    @Override
    public <R> Stream<R> run(@Nonnull Jobs.UserFunction<P, R> job) throws Jobs.JobException {
        try {
            return ((Stream)((ForkJoinTask)this.threadPool.submit(() -> this.getParallel().map(job).collect(Collectors.toList()).stream())).get(this.timeout.toMillis(), TimeUnit.MILLISECONDS)).filter(Objects::nonNull);
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            throw new Jobs.JobException(e);
        }
    }

    @Override
    public <R> Stream<R> runOfStream(@Nonnull Jobs.UserFunction<P, Stream<R>> job) throws Jobs.JobException {
        try {
            return (Stream)((ForkJoinTask)this.threadPool.submit(() -> this.getParallel().map(job).collect(Collectors.toList()).stream().flatMap(Function.identity()))).get(this.timeout.toMillis(), TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            throw new Jobs.JobException(e);
        }
    }
}

