package io.datakernel.stream.file;

import io.datakernel.bytebuf.ByteBuf;
import io.datakernel.bytebuf.ByteBufPool;
import io.datakernel.file.AsyncFile;
import io.datakernel.stream.AbstractStreamProducer;
import io.datakernel.stream.StreamDataReceiver;
import io.datakernel.util.MemSize;
import io.datakernel.util.Preconditions;
import java.io.IOException;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.ExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datakernel/stream/file/StreamFileReader.class */
public final class StreamFileReader extends AbstractStreamProducer<ByteBuf> {
    private static final Logger logger = LoggerFactory.getLogger(StreamFileReader.class);
    public static final OpenOption[] READ_OPTIONS = {StandardOpenOption.READ};
    public static final MemSize DEFAULT_BUFFER_SIZE = MemSize.kilobytes(8);
    private final AsyncFile asyncFile;
    private boolean reading = false;
    private int bufferSize = DEFAULT_BUFFER_SIZE.toInt();
    private long position = 0;
    private long length = Long.MAX_VALUE;

    private StreamFileReader(AsyncFile asyncFile) {
        this.asyncFile = asyncFile;
    }

    public static StreamFileReader readFile(ExecutorService executorService, Path path) throws IOException {
        return new StreamFileReader(AsyncFile.open(executorService, path, READ_OPTIONS));
    }

    public static StreamFileReader readFile(AsyncFile asyncFile) {
        return new StreamFileReader(asyncFile);
    }

    public StreamFileReader withBufferSize(MemSize memSize) {
        Preconditions.checkArgument(memSize.toInt() > 0, "Buffer size cannot be less than or equal to zero");
        this.bufferSize = memSize.toInt();
        return this;
    }

    public StreamFileReader withStartingPosition(long j) {
        Preconditions.checkState(getConsumer() == null, "Cannot set position after binding the reader");
        Preconditions.checkArgument(j >= 0, "Position cannot be less than zero");
        this.position = j;
        return this;
    }

    public StreamFileReader withReadingLength(long j) {
        Preconditions.checkState(getConsumer() == null, "Cannot set reading length after binding the reader");
        Preconditions.checkArgument(j >= -1, "Reading length cannot be less than -1");
        this.length = j == -1 ? Long.MAX_VALUE : j;
        return this;
    }

    public long getPosition() {
        return this.position;
    }

    private void process() {
        if (!isReceiverReady()) {
            this.reading = false;
        } else {
            ByteBuf allocate = ByteBufPool.allocate((int) Math.min(this.bufferSize, this.length));
            this.asyncFile.read(allocate, this.position).whenComplete((num, th) -> {
                if (th != null) {
                    allocate.recycle();
                    closeWithError(th);
                } else if (num.intValue() == -1) {
                    allocate.recycle();
                    sendEndOfStream();
                } else {
                    send(allocate);
                    this.position += num.intValue();
                    if (this.length != Long.MAX_VALUE) {
                        this.length -= num.intValue();
                    }
                    if (this.length == 0) {
                        sendEndOfStream();
                    }
                }
                process();
            });
        }
    }

    @Override // io.datakernel.stream.AbstractStreamProducer
    protected void onProduce(StreamDataReceiver<ByteBuf> streamDataReceiver) {
        if (this.reading) {
            return;
        }
        this.reading = true;
        process();
    }

    @Override // io.datakernel.stream.AbstractStreamProducer
    protected void cleanup() {
        this.asyncFile.close().whenComplete((r5, th) -> {
            if (th == null) {
                logger.trace(this + ": closed file");
            } else {
                logger.error(this + ": failed to close file", th);
            }
        });
    }

    @Override // io.datakernel.stream.AbstractStreamProducer
    protected void onError(Throwable th) {
    }

    @Override // io.datakernel.stream.AbstractStreamProducer
    public String toString() {
        return "StreamFileReader{" + this.asyncFile + ", pos=" + this.position + (this.length == Long.MAX_VALUE ? "" : ", len=" + this.length) + (this.reading ? ", reading" : "") + '}';
    }
}
