package io.datakernel.stream.net;

import io.datakernel.bytebuf.ByteBuf;
import io.datakernel.bytebuf.ByteBufQueue;
import io.datakernel.eventloop.AsyncTcpSocket;
import io.datakernel.stream.AbstractStreamProducer;

/* loaded from: input_file:io/datakernel/stream/net/SocketStreamProducer.class */
final class SocketStreamProducer extends AbstractStreamProducer<ByteBuf> {
    private final AsyncTcpSocket asyncTcpSocket;
    protected final ByteBufQueue readQueue = ByteBufQueue.create();
    private boolean readEndOfStream;

    private SocketStreamProducer(AsyncTcpSocket asyncTcpSocket) {
        this.asyncTcpSocket = asyncTcpSocket;
    }

    public static SocketStreamProducer create(AsyncTcpSocket asyncTcpSocket) {
        return new SocketStreamProducer(asyncTcpSocket);
    }

    @Override // io.datakernel.stream.AbstractStreamProducer
    protected void produce() {
        while (isReceiverReady() && this.readQueue.hasRemaining()) {
            send(this.readQueue.take());
        }
        if (!this.readEndOfStream) {
            if (this.readQueue.remainingBufs() <= 1) {
                this.asyncTcpSocket.read();
            }
        } else if (isReceiverReady()) {
            if (this.readQueue.hasRemaining()) {
                send(this.readQueue.takeRemaining());
            }
            sendEndOfStream();
        }
    }

    @Override // io.datakernel.stream.AbstractStreamProducer
    protected void onError(Throwable th) {
    }

    public void onRead(ByteBuf byteBuf) {
        this.readQueue.add(byteBuf);
        produce();
    }

    public void onReadEndOfStream() {
        this.readEndOfStream = true;
        produce();
    }

    public boolean isClosed() {
        return !isWired() || getStatus().isClosed();
    }
}
