/*
 * Decompiled with CFR 0.152.
 */
package alluxio.client.block.stream;

import alluxio.client.ReadType;
import alluxio.client.block.stream.BlockWorkerClient;
import alluxio.client.block.stream.DataReader;
import alluxio.client.block.stream.GrpcBlockingStream;
import alluxio.client.file.FileSystemContext;
import alluxio.client.file.options.InStreamOptions;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.grpc.OpenLocalBlockRequest;
import alluxio.grpc.OpenLocalBlockResponse;
import alluxio.metrics.MetricKey;
import alluxio.metrics.MetricsSystem;
import alluxio.network.protocol.databuffer.DataBuffer;
import alluxio.network.protocol.databuffer.NioDataBuffer;
import alluxio.resource.CloseableResource;
import alluxio.shaded.client.com.google.common.base.MoreObjects;
import alluxio.shaded.client.com.google.common.base.Preconditions;
import alluxio.shaded.client.javax.annotation.concurrent.NotThreadSafe;
import alluxio.wire.WorkerNetAddress;
import alluxio.worker.block.io.LocalFileBlockReader;
import java.io.IOException;
import java.nio.ByteBuffer;

@NotThreadSafe
public final class LocalFileDataReader
implements DataReader {
    private final LocalFileBlockReader mReader;
    private final long mEnd;
    private final long mChunkSize;
    private long mPos;
    private boolean mClosed;

    private LocalFileDataReader(LocalFileBlockReader reader, long offset, long len, long chunkSize) {
        this.mReader = reader;
        Preconditions.checkArgument(chunkSize > 0L);
        this.mPos = offset;
        this.mEnd = Math.min(this.mReader.getLength(), offset + len);
        this.mChunkSize = chunkSize;
    }

    @Override
    public DataBuffer readChunk() throws IOException {
        if (this.mPos >= this.mEnd) {
            return null;
        }
        ByteBuffer buffer = this.mReader.read(this.mPos, Math.min(this.mChunkSize, this.mEnd - this.mPos));
        NioDataBuffer dataBuffer = new NioDataBuffer(buffer, buffer.remaining());
        this.mPos += dataBuffer.getLength();
        MetricsSystem.counter(MetricKey.CLIENT_BYTES_READ_LOCAL.getName()).inc(dataBuffer.getLength());
        MetricsSystem.meter(MetricKey.CLIENT_BYTES_READ_LOCAL_THROUGHPUT.getName()).mark(dataBuffer.getLength());
        return dataBuffer;
    }

    @Override
    public long pos() {
        return this.mPos;
    }

    @Override
    public void close() throws IOException {
        if (this.mClosed) {
            return;
        }
        this.mClosed = true;
        this.mReader.decreaseUsageCount();
    }

    @NotThreadSafe
    public static class Factory
    implements DataReader.Factory {
        private final CloseableResource<BlockWorkerClient> mBlockWorker;
        private final long mBlockId;
        private final String mPath;
        private final long mLocalReaderChunkSize;
        private final int mReadBufferSize;
        private final GrpcBlockingStream<OpenLocalBlockRequest, OpenLocalBlockResponse> mStream;
        private LocalFileBlockReader mReader;
        private final long mDataTimeoutMs;
        private boolean mClosed;

        public Factory(FileSystemContext context, WorkerNetAddress address, long blockId, long localReaderChunkSize, InStreamOptions options) throws IOException {
            AlluxioConfiguration conf = context.getClusterConf();
            this.mBlockId = blockId;
            this.mLocalReaderChunkSize = localReaderChunkSize;
            this.mReadBufferSize = conf.getInt(PropertyKey.USER_STREAMING_READER_BUFFER_SIZE_MESSAGES);
            this.mDataTimeoutMs = conf.getMs(PropertyKey.USER_STREAMING_DATA_TIMEOUT);
            boolean isPromote = ReadType.fromProto(options.getOptions().getReadType()).isPromote();
            OpenLocalBlockRequest request = OpenLocalBlockRequest.newBuilder().setBlockId(this.mBlockId).setPromote(isPromote).build();
            this.mBlockWorker = context.acquireBlockWorkerClient(address);
            try {
                this.mStream = new GrpcBlockingStream(this.mBlockWorker.get()::openLocalBlock, this.mReadBufferSize, MoreObjects.toStringHelper(LocalFileDataReader.class).add("request", request).add("address", address).toString());
                this.mStream.send(request, this.mDataTimeoutMs);
                OpenLocalBlockResponse response = this.mStream.receive(this.mDataTimeoutMs);
                Preconditions.checkState(response.hasPath());
                this.mPath = response.getPath();
            }
            catch (Exception e) {
                this.mBlockWorker.close();
                throw e;
            }
        }

        @Override
        public DataReader create(long offset, long len) throws IOException {
            if (this.mReader == null) {
                this.mReader = new LocalFileBlockReader(this.mPath);
            }
            Preconditions.checkState(this.mReader.getUsageCount() == 0);
            this.mReader.increaseUsageCount();
            return new LocalFileDataReader(this.mReader, offset, len, this.mLocalReaderChunkSize);
        }

        @Override
        public boolean isShortCircuit() {
            return true;
        }

        @Override
        public void close() throws IOException {
            if (this.mClosed) {
                return;
            }
            try {
                if (this.mReader != null) {
                    this.mReader.close();
                }
                this.mStream.close();
                this.mStream.waitForComplete(this.mDataTimeoutMs);
            }
            finally {
                this.mClosed = true;
                this.mBlockWorker.close();
            }
        }
    }
}

