package io.datakernel.stream.processor;

import io.datakernel.async.Promise;
import io.datakernel.stream.AbstractStreamConsumer;
import io.datakernel.stream.AbstractStreamSupplier;
import io.datakernel.stream.StreamCapability;
import io.datakernel.stream.StreamConsumer;
import io.datakernel.stream.StreamDataAcceptor;
import io.datakernel.stream.StreamSupplier;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Set;

/* loaded from: input_file:io/datakernel/stream/processor/StreamBuffer.class */
public class StreamBuffer<T> implements StreamTransformer<T, T> {
    private final int minBuffered;
    private final int maxBuffered;
    private final Deque<T> buffer = new ArrayDeque();
    private boolean suspended = false;
    private final StreamBuffer<T>.Input input = new Input();
    private final StreamBuffer<T>.Output output = new Output();

    /* loaded from: input_file:io/datakernel/stream/processor/StreamBuffer$Input.class */
    protected final class Input extends AbstractStreamConsumer<T> {
        static final /* synthetic */ boolean $assertionsDisabled;

        protected Input() {
        }

        @Override // io.datakernel.stream.AbstractStreamConsumer, io.datakernel.stream.StreamConsumer
        public Set<StreamCapability> getCapabilities() {
            return addCapabilities(StreamBuffer.this.output.getConsumer(), StreamCapability.IMMEDIATE_SUSPEND, new StreamCapability[0]);
        }

        @Override // io.datakernel.stream.AbstractStreamConsumer
        protected Promise<Void> onEndOfStream() {
            StreamBuffer.this.output.tryProduce();
            if ($assertionsDisabled || StreamBuffer.this.output.getConsumer() != null) {
                return StreamBuffer.this.output.getConsumer().getAcknowledgement();
            }
            throw new AssertionError();
        }

        @Override // io.datakernel.stream.AbstractStreamConsumer
        protected void onError(Throwable th) {
            StreamBuffer.this.output.close(th);
        }

        static {
            $assertionsDisabled = !StreamBuffer.class.desiredAssertionStatus();
        }
    }

    /* loaded from: input_file:io/datakernel/stream/processor/StreamBuffer$Output.class */
    protected final class Output extends AbstractStreamSupplier<T> implements StreamDataAcceptor<T> {
        protected Output() {
        }

        @Override // io.datakernel.stream.StreamDataAcceptor
        public void accept(T t) {
            if (!StreamBuffer.this.suspended) {
                StreamBuffer.this.output.getLastDataAcceptor().accept(t);
                return;
            }
            StreamBuffer.this.buffer.offer(t);
            if (StreamBuffer.this.buffer.size() >= StreamBuffer.this.maxBuffered) {
                StreamBuffer.this.input.getSupplier().suspend();
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // io.datakernel.stream.AbstractStreamSupplier
        protected void produce(AbstractStreamSupplier<T>.AsyncProduceController asyncProduceController) {
            while (!StreamBuffer.this.buffer.isEmpty()) {
                if (!StreamBuffer.this.output.isReceiverReady()) {
                    return;
                }
                send(StreamBuffer.this.buffer.pop());
                if (StreamBuffer.this.buffer.size() < StreamBuffer.this.minBuffered) {
                    StreamBuffer.this.input.getSupplier().resume(this);
                }
            }
            if (StreamBuffer.this.output.isReceiverReady()) {
                StreamBuffer.this.suspended = false;
                StreamBuffer.this.input.getSupplier().resume(this);
            }
            if (StreamBuffer.this.input.getEndOfStream().isResult()) {
                sendEndOfStream();
            }
        }

        @Override // io.datakernel.stream.AbstractStreamSupplier
        protected void onSuspended() {
            StreamBuffer.this.suspended = true;
            if (StreamBuffer.this.maxBuffered == 0) {
                StreamBuffer.this.input.getSupplier().suspend();
            }
        }

        @Override // io.datakernel.stream.AbstractStreamSupplier, io.datakernel.stream.StreamSupplier
        public Set<StreamCapability> getCapabilities() {
            return addCapabilities(StreamBuffer.this.input.getSupplier(), StreamCapability.IMMEDIATE_SUSPEND, new StreamCapability[0]);
        }

        @Override // io.datakernel.stream.AbstractStreamSupplier
        protected void onError(Throwable th) {
            StreamBuffer.this.input.close(th);
        }
    }

    private StreamBuffer(int i, int i2) {
        this.minBuffered = i;
        this.maxBuffered = i2;
    }

    public static <T> StreamBuffer<T> create() {
        return new StreamBuffer<>(0, 0);
    }

    public static <T> StreamBuffer<T> create(int i, int i2) {
        return new StreamBuffer<>(i, i2);
    }

    @Override // io.datakernel.stream.StreamInput
    public StreamConsumer<T> getInput() {
        return this.input;
    }

    @Override // io.datakernel.stream.StreamOutput
    public StreamSupplier<T> getOutput() {
        return this.output;
    }
}
