package io.datakernel.stream.processor;

import io.datakernel.async.Promise;
import io.datakernel.async.Promises;
import io.datakernel.stream.AbstractStreamConsumer;
import io.datakernel.stream.AbstractStreamSupplier;
import io.datakernel.stream.StreamConsumer;
import io.datakernel.stream.StreamDataAcceptor;
import io.datakernel.stream.StreamInput;
import io.datakernel.stream.StreamOutputs;
import io.datakernel.stream.StreamSupplier;
import io.datakernel.util.Preconditions;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.BiConsumer;

/* loaded from: input_file:io/datakernel/stream/processor/StreamMapSplitter.class */
public final class StreamMapSplitter<I> implements StreamInput<I>, StreamOutputs, StreamDataAcceptor<I> {
    private final BiConsumer<I, StreamDataAcceptor<Object>[]> action;
    private final List<StreamMapSplitter<I>.Output> outputs = new ArrayList();
    private StreamDataAcceptor<Object>[] dataAcceptors = new StreamDataAcceptor[0];
    private int suspended = 0;
    private final StreamMapSplitter<I>.Input input = new Input();

    /* loaded from: input_file:io/datakernel/stream/processor/StreamMapSplitter$Input.class */
    final class Input extends AbstractStreamConsumer<I> {
        Input() {
        }

        @Override // io.datakernel.stream.AbstractStreamConsumer
        protected void onStarted() {
            Preconditions.checkState(!StreamMapSplitter.this.outputs.isEmpty(), "Empty outputs");
        }

        @Override // io.datakernel.stream.AbstractStreamConsumer
        protected Promise<Void> onEndOfStream() {
            return Promises.all(StreamMapSplitter.this.outputs.stream().map((v0) -> {
                return v0.sendEndOfStream();
            }));
        }

        @Override // io.datakernel.stream.AbstractStreamConsumer
        protected void onError(Throwable th) {
            StreamMapSplitter.this.outputs.forEach(output -> {
                output.close(th);
            });
        }
    }

    /* loaded from: input_file:io/datakernel/stream/processor/StreamMapSplitter$Output.class */
    final class Output extends AbstractStreamSupplier<Object> {
        final int index;

        Output(int i) {
            this.index = i;
        }

        @Override // io.datakernel.stream.AbstractStreamSupplier
        protected void onStarted() {
            Preconditions.checkState(StreamMapSplitter.this.input.getSupplier() != null, "Splitter has no input");
        }

        @Override // io.datakernel.stream.AbstractStreamSupplier
        protected void onSuspended() {
            StreamMapSplitter.access$208(StreamMapSplitter.this);
            StreamMapSplitter.this.input.getSupplier().suspend();
        }

        @Override // io.datakernel.stream.AbstractStreamSupplier
        protected void onProduce(StreamDataAcceptor<Object> streamDataAcceptor) {
            StreamMapSplitter.this.dataAcceptors[this.index] = streamDataAcceptor;
            if (StreamMapSplitter.access$206(StreamMapSplitter.this) == 0) {
                StreamMapSplitter.this.input.getSupplier().resume(StreamMapSplitter.this);
            }
        }

        @Override // io.datakernel.stream.AbstractStreamSupplier
        protected void onError(Throwable th) {
            StreamMapSplitter.this.input.close(th);
        }
    }

    private StreamMapSplitter(BiConsumer<I, StreamDataAcceptor<Object>[]> biConsumer) {
        this.action = biConsumer;
    }

    public static <I> StreamMapSplitter<I> create(BiConsumer<I, StreamDataAcceptor<Object>[]> biConsumer) {
        return new StreamMapSplitter<>(biConsumer);
    }

    public <O> StreamSupplier<O> newOutput() {
        StreamMapSplitter<I>.Output output = new Output(this.outputs.size());
        this.dataAcceptors = (StreamDataAcceptor[]) Arrays.copyOf(this.dataAcceptors, this.dataAcceptors.length + 1);
        this.suspended++;
        this.outputs.add(output);
        return output;
    }

    @Override // io.datakernel.stream.StreamInput
    public StreamConsumer<I> getInput() {
        return this.input;
    }

    @Override // io.datakernel.stream.StreamOutputs
    public List<? extends StreamSupplier<?>> getOutputs() {
        return this.outputs;
    }

    @Override // io.datakernel.stream.StreamDataAcceptor
    public void accept(I i) {
        this.action.accept(i, this.dataAcceptors);
    }

    static /* synthetic */ int access$208(StreamMapSplitter streamMapSplitter) {
        int i = streamMapSplitter.suspended;
        streamMapSplitter.suspended = i + 1;
        return i;
    }

    static /* synthetic */ int access$206(StreamMapSplitter streamMapSplitter) {
        int i = streamMapSplitter.suspended - 1;
        streamMapSplitter.suspended = i;
        return i;
    }
}
