package io.datakernel.stream.processor;

import io.datakernel.async.Promise;
import io.datakernel.async.Promises;
import io.datakernel.stream.AbstractStreamConsumer;
import io.datakernel.stream.AbstractStreamSupplier;
import io.datakernel.stream.StreamConsumer;
import io.datakernel.stream.StreamDataAcceptor;
import io.datakernel.stream.StreamInput;
import io.datakernel.stream.StreamOutputs;
import io.datakernel.stream.StreamSupplier;
import io.datakernel.util.Preconditions;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;

/* loaded from: input_file:io/datakernel/stream/processor/ShardingStreamSplitter.class */
public final class ShardingStreamSplitter<I, K> implements StreamInput<I>, StreamOutputs, StreamDataAcceptor<I> {
    private final MultiSharder<K> sharder;
    private final Function<I, K> keyFunction;
    private final List<ShardingStreamSplitter<I, K>.Output> outputs = new ArrayList();
    private StreamDataAcceptor<I>[] dataAcceptors = new StreamDataAcceptor[0];
    private int suspended = 0;
    private final ShardingStreamSplitter<I, K>.Input input = new Input();

    /* loaded from: input_file:io/datakernel/stream/processor/ShardingStreamSplitter$Input.class */
    protected final class Input extends AbstractStreamConsumer<I> {
        protected Input() {
        }

        @Override // io.datakernel.stream.AbstractStreamConsumer
        protected void onStarted() {
            Preconditions.checkState(!ShardingStreamSplitter.this.outputs.isEmpty(), "Empty outputs");
        }

        @Override // io.datakernel.stream.AbstractStreamConsumer
        protected Promise<Void> onEndOfStream() {
            return Promises.all(ShardingStreamSplitter.this.outputs.stream().map((v0) -> {
                return v0.sendEndOfStream();
            }));
        }

        @Override // io.datakernel.stream.AbstractStreamConsumer
        protected void onError(Throwable th) {
            ShardingStreamSplitter.this.outputs.forEach(output -> {
                output.close(th);
            });
        }
    }

    /* loaded from: input_file:io/datakernel/stream/processor/ShardingStreamSplitter$Output.class */
    protected final class Output extends AbstractStreamSupplier<I> {
        private final int index;

        Output(int i) {
            this.index = i;
        }

        @Override // io.datakernel.stream.AbstractStreamSupplier
        protected void onStarted() {
            Preconditions.checkState(ShardingStreamSplitter.this.input.getSupplier() != null, "Splitter has no input");
        }

        @Override // io.datakernel.stream.AbstractStreamSupplier
        protected void onSuspended() {
            ShardingStreamSplitter.access$208(ShardingStreamSplitter.this);
            ShardingStreamSplitter.this.input.getSupplier().suspend();
        }

        @Override // io.datakernel.stream.AbstractStreamSupplier
        protected void onProduce(StreamDataAcceptor<I> streamDataAcceptor) {
            ShardingStreamSplitter.this.dataAcceptors[this.index] = streamDataAcceptor;
            if (ShardingStreamSplitter.access$206(ShardingStreamSplitter.this) == 0) {
                ShardingStreamSplitter.this.input.getSupplier().resume(ShardingStreamSplitter.this);
            }
        }

        @Override // io.datakernel.stream.AbstractStreamSupplier
        protected void onError(Throwable th) {
            ShardingStreamSplitter.this.input.close(th);
        }
    }

    private ShardingStreamSplitter(MultiSharder<K> multiSharder, Function<I, K> function) {
        this.sharder = multiSharder;
        this.keyFunction = function;
    }

    public static <T> ShardingStreamSplitter<T, T> create(MultiSharder<T> multiSharder) {
        return new ShardingStreamSplitter<>(multiSharder, Function.identity());
    }

    public static <I, K> ShardingStreamSplitter<I, K> create(MultiSharder<K> multiSharder, Function<I, K> function) {
        return new ShardingStreamSplitter<>(multiSharder, function);
    }

    public StreamSupplier<I> newOutput() {
        ShardingStreamSplitter<I, K>.Output output = new Output(this.outputs.size());
        this.dataAcceptors = (StreamDataAcceptor[]) Arrays.copyOf(this.dataAcceptors, this.dataAcceptors.length + 1);
        this.suspended++;
        this.outputs.add(output);
        return output;
    }

    @Override // io.datakernel.stream.StreamInput
    public StreamConsumer<I> getInput() {
        return this.input;
    }

    @Override // io.datakernel.stream.StreamOutputs
    public List<? extends StreamSupplier<I>> getOutputs() {
        return this.outputs;
    }

    @Override // io.datakernel.stream.StreamDataAcceptor
    public void accept(I i) {
        for (int i2 : this.sharder.shard(this.keyFunction.apply(i))) {
            this.dataAcceptors[i2].accept(i);
        }
    }

    static /* synthetic */ int access$208(ShardingStreamSplitter shardingStreamSplitter) {
        int i = shardingStreamSplitter.suspended;
        shardingStreamSplitter.suspended = i + 1;
        return i;
    }

    static /* synthetic */ int access$206(ShardingStreamSplitter shardingStreamSplitter) {
        int i = shardingStreamSplitter.suspended - 1;
        shardingStreamSplitter.suspended = i;
        return i;
    }
}
