/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.streams.stages;

import io.reactivex.Flowable;
import io.reactivex.plugins.RxJavaPlugins;
import io.smallrye.reactive.streams.Engine;
import io.smallrye.reactive.streams.operators.TerminalStage;
import io.smallrye.reactive.streams.operators.TerminalStageFactory;
import io.smallrye.reactive.streams.utils.FlowableCollector;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collector;
import org.eclipse.microprofile.reactive.streams.operators.spi.Stage;

public class CollectStageFactory
implements TerminalStageFactory<Stage.Collect> {
    @Override
    public <I, O> TerminalStage<I, O> create(Engine engine, Stage.Collect stage) {
        Collector<?, ?, ?> collector = Objects.requireNonNull(stage).getCollector();
        Objects.requireNonNull(collector);
        return new CollectStage(collector);
    }

    private static class CollectStage<I, O>
    implements TerminalStage<I, O> {
        private final Collector<I, Object, O> collector;

        CollectStage(Collector<I, Object, O> collector) {
            this.collector = collector;
        }

        @Override
        public CompletionStage<O> apply(Flowable<I> source) {
            CompletableFuture future = new CompletableFuture();
            Flowable flow = source.compose((Flowable<? super T> f) -> RxJavaPlugins.onAssembly(new FlowableCollector<I, Object, O>(f, this.collector)));
            flow.firstElement().subscribe(future::complete, future::completeExceptionally);
            return future;
        }
    }
}

