package io.datakernel.stream;

import io.datakernel.async.Promise;
import io.datakernel.csp.queue.ChannelBuffer;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:io/datakernel/stream/StreamSupplierEndpoint.class */
public final class StreamSupplierEndpoint<T> extends AbstractStreamSupplier<T> {
    public static final int DEFAULT_BUFFER_SIZE = 10;
    private final ChannelBuffer<T> buffer;

    public StreamSupplierEndpoint() {
        this(0, 10);
    }

    public StreamSupplierEndpoint(int i) {
        this(0, i);
    }

    private StreamSupplierEndpoint(int i, int i2) {
        this.buffer = new ChannelBuffer<>(i, i2);
    }

    public void add(T t) throws Exception {
        postProduce();
        this.buffer.add(t);
    }

    public Promise<Void> put(@Nullable T t) {
        postProduce();
        return this.buffer.put(t);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // io.datakernel.stream.AbstractStreamSupplier
    protected void produce(AbstractStreamSupplier<T>.AsyncProduceController asyncProduceController) {
        while (!this.buffer.isEmpty()) {
            try {
                Object poll = this.buffer.poll();
                if (poll != null) {
                    send(poll);
                } else {
                    sendEndOfStream();
                }
            } catch (RuntimeException e) {
                throw e;
            } catch (Exception e2) {
                close(e2);
                return;
            }
        }
    }

    @Override // io.datakernel.stream.AbstractStreamSupplier
    protected void onError(Throwable th) {
        this.buffer.close(th);
    }
}
