package io.datakernel.stream.processor;

import io.datakernel.async.Promise;
import io.datakernel.async.Promises;
import io.datakernel.stream.AbstractStreamConsumer;
import io.datakernel.stream.AbstractStreamSupplier;
import io.datakernel.stream.StreamConsumer;
import io.datakernel.stream.StreamDataAcceptor;
import io.datakernel.stream.StreamInput;
import io.datakernel.stream.StreamOutputs;
import io.datakernel.stream.StreamSupplier;
import io.datakernel.util.Preconditions;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/* loaded from: input_file:io/datakernel/stream/processor/StreamSplitter.class */
public final class StreamSplitter<T> implements StreamInput<T>, StreamOutputs, StreamDataAcceptor<T> {
    private final List<StreamSplitter<T>.Output> outputs = new ArrayList();
    private StreamDataAcceptor<T>[] dataAcceptors = new StreamDataAcceptor[0];
    private int suspended = 0;
    private boolean lenient = false;
    private List<Throwable> lenientExceptions = new ArrayList();
    private final StreamSplitter<T>.Input input = new Input();

    /* loaded from: input_file:io/datakernel/stream/processor/StreamSplitter$Input.class */
    final class Input extends AbstractStreamConsumer<T> {
        Input() {
        }

        @Override // io.datakernel.stream.AbstractStreamConsumer
        protected void onStarted() {
            Preconditions.checkState(!StreamSplitter.this.outputs.isEmpty(), "Splitter has no outputs");
        }

        @Override // io.datakernel.stream.AbstractStreamConsumer
        protected Promise<Void> onEndOfStream() {
            return Promises.all(StreamSplitter.this.outputs.stream().map((v0) -> {
                return v0.sendEndOfStream();
            }));
        }

        @Override // io.datakernel.stream.AbstractStreamConsumer
        protected void onError(Throwable th) {
            StreamSplitter.this.outputs.forEach(output -> {
                output.close(th);
            });
        }
    }

    /* loaded from: input_file:io/datakernel/stream/processor/StreamSplitter$Output.class */
    final class Output extends AbstractStreamSupplier<T> {
        private final int index;
        private boolean isSuspended = false;
        static final /* synthetic */ boolean $assertionsDisabled;

        Output(int i) {
            this.index = i;
        }

        @Override // io.datakernel.stream.AbstractStreamSupplier
        protected void onStarted() {
            Preconditions.checkState(StreamSplitter.this.input.getSupplier() != null, "Splitter has no input");
        }

        @Override // io.datakernel.stream.AbstractStreamSupplier
        protected void onSuspended() {
            StreamSplitter.access$208(StreamSplitter.this);
            this.isSuspended = true;
            if (!$assertionsDisabled && StreamSplitter.this.input.getSupplier() == null) {
                throw new AssertionError();
            }
            StreamSplitter.this.input.getSupplier().suspend();
        }

        @Override // io.datakernel.stream.AbstractStreamSupplier
        protected void onProduce(StreamDataAcceptor<T> streamDataAcceptor) {
            StreamSplitter.this.dataAcceptors[this.index] = streamDataAcceptor;
            this.isSuspended = false;
            if (StreamSplitter.access$206(StreamSplitter.this) == 0) {
                if (!$assertionsDisabled && StreamSplitter.this.input.getSupplier() == null) {
                    throw new AssertionError();
                }
                StreamSplitter.this.input.getSupplier().resume(StreamSplitter.this);
            }
        }

        @Override // io.datakernel.stream.AbstractStreamSupplier
        protected void onError(Throwable th) {
            if (!StreamSplitter.this.lenient) {
                StreamSplitter.this.input.close(th);
                return;
            }
            StreamSplitter.this.dataAcceptors[this.index] = null;
            if (this.isSuspended) {
                StreamSplitter.access$210(StreamSplitter.this);
            }
            StreamSplitter.this.outputs.remove(this);
            if (!StreamSplitter.this.outputs.isEmpty()) {
                StreamSplitter.this.lenientExceptions.add(th);
                return;
            }
            List list = StreamSplitter.this.lenientExceptions;
            th.getClass();
            list.forEach(th::addSuppressed);
            StreamSplitter.this.input.close(th);
        }

        static {
            $assertionsDisabled = !StreamSplitter.class.desiredAssertionStatus();
        }
    }

    private StreamSplitter() {
    }

    public static <T> StreamSplitter<T> create() {
        return new StreamSplitter<>();
    }

    public StreamSplitter<T> lenient() {
        this.lenient = true;
        return this;
    }

    public StreamSupplier<T> newOutput() {
        StreamSplitter<T>.Output output = new Output(this.outputs.size());
        this.dataAcceptors = (StreamDataAcceptor[]) Arrays.copyOf(this.dataAcceptors, this.dataAcceptors.length + 1);
        this.suspended++;
        this.outputs.add(output);
        return output;
    }

    @Override // io.datakernel.stream.StreamInput
    public StreamConsumer<T> getInput() {
        return this.input;
    }

    @Override // io.datakernel.stream.StreamOutputs
    public List<? extends StreamSupplier<T>> getOutputs() {
        return this.outputs;
    }

    @Override // io.datakernel.stream.StreamDataAcceptor
    public void accept(T t) {
        for (StreamDataAcceptor<T> streamDataAcceptor : this.dataAcceptors) {
            if (streamDataAcceptor != null) {
                streamDataAcceptor.accept(t);
            }
        }
    }

    static /* synthetic */ int access$208(StreamSplitter streamSplitter) {
        int i = streamSplitter.suspended;
        streamSplitter.suspended = i + 1;
        return i;
    }

    static /* synthetic */ int access$206(StreamSplitter streamSplitter) {
        int i = streamSplitter.suspended - 1;
        streamSplitter.suspended = i;
        return i;
    }

    static /* synthetic */ int access$210(StreamSplitter streamSplitter) {
        int i = streamSplitter.suspended;
        streamSplitter.suspended = i - 1;
        return i;
    }
}
