/*
 * Decompiled with CFR 0.152.
 */
package alluxio.client.block.stream;

import alluxio.Seekable;
import alluxio.client.BoundedStream;
import alluxio.client.PositionedReadable;
import alluxio.client.ReadType;
import alluxio.client.block.stream.BlockWorkerDataReader;
import alluxio.client.block.stream.DataReader;
import alluxio.client.block.stream.GrpcDataReader;
import alluxio.client.block.stream.LocalFileDataReader;
import alluxio.client.block.stream.SharedGrpcDataReader;
import alluxio.client.file.FileSystemContext;
import alluxio.client.file.options.InStreamOptions;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.exception.PreconditionMessage;
import alluxio.exception.status.NotFoundException;
import alluxio.grpc.ReadRequest;
import alluxio.network.protocol.databuffer.DataBuffer;
import alluxio.proto.dataserver.Protocol;
import alluxio.shaded.client.com.google.common.annotations.VisibleForTesting;
import alluxio.shaded.client.com.google.common.base.Preconditions;
import alluxio.shaded.client.javax.annotation.concurrent.NotThreadSafe;
import alluxio.util.io.BufferUtils;
import alluxio.util.network.NettyUtils;
import alluxio.util.network.NetworkAddressUtils;
import alluxio.wire.BlockInfo;
import alluxio.wire.WorkerNetAddress;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
public class BlockInStream
extends InputStream
implements BoundedStream,
Seekable,
PositionedReadable {
    private static final Logger LOG = LoggerFactory.getLogger(BlockInStream.class);
    private final WorkerNetAddress mAddress;
    private final BlockInStreamSource mInStreamSource;
    private final long mId;
    private final long mLength;
    private final byte[] mSingleByte = new byte[1];
    private long mPos = 0L;
    private DataBuffer mCurrentChunk;
    private DataReader mDataReader;
    private final DataReader.Factory mDataReaderFactory;
    private boolean mClosed = false;
    private boolean mEOF = false;

    public static BlockInStream create(FileSystemContext context, BlockInfo info, WorkerNetAddress dataSource, BlockInStreamSource dataSourceType, InStreamOptions options) throws IOException {
        boolean sourceIsLocal;
        long blockId = info.getBlockId();
        long blockSize = info.getLength();
        if (dataSourceType == BlockInStreamSource.PROCESS_LOCAL && dataSource.equals(context.getNodeLocalWorker())) {
            return BlockInStream.createProcessLocalBlockInStream(context, dataSource, blockId, blockSize, options);
        }
        AlluxioConfiguration alluxioConf = context.getClusterConf();
        boolean shortCircuit = alluxioConf.getBoolean(PropertyKey.USER_SHORT_CIRCUIT_ENABLED);
        boolean shortCircuitPreferred = alluxioConf.getBoolean(PropertyKey.USER_SHORT_CIRCUIT_PREFERRED);
        boolean sourceSupportsDomainSocket = NettyUtils.isDomainSocketSupported(dataSource);
        boolean bl = sourceIsLocal = dataSourceType == BlockInStreamSource.NODE_LOCAL;
        if (sourceIsLocal && shortCircuit && (shortCircuitPreferred || !sourceSupportsDomainSocket)) {
            LOG.debug("Creating short circuit input stream for block {} @ {}", (Object)blockId, (Object)dataSource);
            try {
                return BlockInStream.createLocalBlockInStream(context, dataSource, blockId, blockSize, options);
            }
            catch (NotFoundException e) {
                LOG.warn("Failed to create short circuit input stream for block {} @ {}. Falling back to network transfer", (Object)blockId, (Object)dataSource);
            }
        }
        LOG.debug("Creating gRPC input stream for block {} @ {} from client {} reading through {}", new Object[]{blockId, dataSource, NetworkAddressUtils.getClientHostName(alluxioConf), dataSource});
        return BlockInStream.createGrpcBlockInStream(context, dataSource, dataSourceType, blockId, blockSize, options);
    }

    private static BlockInStream createProcessLocalBlockInStream(FileSystemContext context, WorkerNetAddress address, long blockId, long length, InStreamOptions options) {
        AlluxioConfiguration conf = context.getClusterConf();
        long chunkSize = conf.getBytes(PropertyKey.USER_LOCAL_READER_CHUNK_SIZE_BYTES);
        return new BlockInStream(new BlockWorkerDataReader.Factory(context.getProcessLocalWorker(), blockId, chunkSize, options), conf, address, BlockInStreamSource.PROCESS_LOCAL, blockId, length);
    }

    private static BlockInStream createLocalBlockInStream(FileSystemContext context, WorkerNetAddress address, long blockId, long length, InStreamOptions options) throws IOException {
        AlluxioConfiguration conf = context.getClusterConf();
        long chunkSize = conf.getBytes(PropertyKey.USER_LOCAL_READER_CHUNK_SIZE_BYTES);
        return new BlockInStream(new LocalFileDataReader.Factory(context, address, blockId, chunkSize, options), conf, address, BlockInStreamSource.NODE_LOCAL, blockId, length);
    }

    private static BlockInStream createGrpcBlockInStream(FileSystemContext context, WorkerNetAddress address, BlockInStreamSource blockSource, long blockId, long blockSize, InStreamOptions options) {
        AlluxioConfiguration conf = context.getClusterConf();
        long chunkSize = conf.getBytes(PropertyKey.USER_STREAMING_READER_CHUNK_SIZE_BYTES);
        ReadRequest.Builder builder = ReadRequest.newBuilder().setBlockId(blockId).setPromote(ReadType.fromProto(options.getOptions().getReadType()).isPromote()).setOpenUfsBlockOptions(options.getOpenUfsBlockOptions(blockId)).setPositionShort(options.getPositionShort()).setChunkSize(chunkSize);
        DataReader.Factory factory = context.getClusterConf().getBoolean(PropertyKey.FUSE_SHARED_CACHING_READER_ENABLED) && blockSize > chunkSize * 4L ? new SharedGrpcDataReader.Factory(context, address, builder, blockSize) : new GrpcDataReader.Factory(context, address, builder);
        return new BlockInStream(factory, conf, address, blockSource, blockId, blockSize);
    }

    public static BlockInStream createRemoteBlockInStream(FileSystemContext context, long blockId, WorkerNetAddress address, BlockInStreamSource blockSource, long blockSize, Protocol.OpenUfsBlockOptions ufsOptions) {
        AlluxioConfiguration conf = context.getClusterConf();
        long chunkSize = conf.getBytes(PropertyKey.USER_STREAMING_READER_CHUNK_SIZE_BYTES);
        ReadRequest readRequest = ReadRequest.newBuilder().setBlockId(blockId).setOpenUfsBlockOptions(ufsOptions).setChunkSize(chunkSize).buildPartial();
        GrpcDataReader.Factory factory = new GrpcDataReader.Factory(context, address, readRequest.toBuilder());
        return new BlockInStream(factory, conf, address, blockSource, blockId, blockSize);
    }

    @VisibleForTesting
    protected BlockInStream(DataReader.Factory dataReaderFactory, AlluxioConfiguration conf, WorkerNetAddress address, BlockInStreamSource blockSource, long id, long length) {
        this.mDataReaderFactory = dataReaderFactory;
        this.mAddress = address;
        this.mInStreamSource = blockSource;
        this.mId = id;
        this.mLength = length;
    }

    @Override
    public long getPos() {
        return this.mPos;
    }

    @Override
    public int read() throws IOException {
        int bytesRead = this.read(this.mSingleByte);
        if (bytesRead == -1) {
            return -1;
        }
        Preconditions.checkState(bytesRead == 1);
        return BufferUtils.byteToInt(this.mSingleByte[0]);
    }

    @Override
    public int read(byte[] b) throws IOException {
        return this.read(b, 0, b.length);
    }

    @Override
    public int read(byte[] b, int off, int len) throws IOException {
        Preconditions.checkArgument(b != null, (Object)PreconditionMessage.ERR_READ_BUFFER_NULL);
        return this.read(ByteBuffer.wrap(b), off, len);
    }

    public int read(ByteBuffer byteBuffer, int off, int len) throws IOException {
        Preconditions.checkArgument(off >= 0 && len >= 0 && len + off <= byteBuffer.capacity(), PreconditionMessage.ERR_BUFFER_STATE.toString(), (Object)byteBuffer.capacity(), (Object)off, (Object)len);
        this.checkIfClosed();
        if (len == 0) {
            return 0;
        }
        this.readChunk();
        if (this.mCurrentChunk == null) {
            this.mEOF = true;
        }
        if (this.mEOF) {
            this.closeDataReader();
            Preconditions.checkState(this.mPos >= this.mLength, PreconditionMessage.BLOCK_LENGTH_INCONSISTENT.toString(), (Object)this.mId, (Object)this.mLength, (Object)this.mPos);
            return -1;
        }
        int toRead = Math.min(len, this.mCurrentChunk.readableBytes());
        byteBuffer.position(off).limit(off + toRead);
        this.mCurrentChunk.readBytes(byteBuffer);
        this.mPos += (long)toRead;
        return toRead;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public int positionedRead(long pos, byte[] b, int off, int len) throws IOException {
        if (len == 0) {
            return 0;
        }
        if (pos < 0L || pos >= this.mLength) {
            return -1;
        }
        int lenCopy = len;
        try (DataReader reader = this.mDataReaderFactory.create(pos, len);){
            while (len > 0) {
                DataBuffer dataBuffer = null;
                try {
                    dataBuffer = reader.readChunk();
                    if (dataBuffer != null) {
                        Preconditions.checkState(dataBuffer.readableBytes() <= len);
                        int toRead = dataBuffer.readableBytes();
                        dataBuffer.readBytes(b, off, toRead);
                        len -= toRead;
                        off += toRead;
                        continue;
                    }
                    break;
                }
                finally {
                    if (dataBuffer == null) continue;
                    dataBuffer.release();
                }
            }
        }
        if (lenCopy == len) {
            return -1;
        }
        return lenCopy - len;
    }

    @Override
    public long remaining() {
        return this.mEOF ? 0L : this.mLength - this.mPos;
    }

    @Override
    public void seek(long pos) throws IOException {
        this.checkIfClosed();
        Preconditions.checkArgument(pos >= 0L, PreconditionMessage.ERR_SEEK_NEGATIVE.toString(), pos);
        Preconditions.checkArgument(pos <= this.mLength, PreconditionMessage.ERR_SEEK_PAST_END_OF_REGION.toString(), this.mId);
        if (pos == this.mPos) {
            return;
        }
        if (this.mDataReader instanceof SharedGrpcDataReader) {
            this.seekForSharedGrpcDataReader(pos);
            return;
        }
        if (pos < this.mPos) {
            this.mEOF = false;
        }
        this.closeDataReader();
        this.mPos = pos;
    }

    private void seekForSharedGrpcDataReader(long pos) throws IOException {
        if (pos < this.mPos) {
            this.mEOF = false;
            ((SharedGrpcDataReader)this.mDataReader).seek(pos);
            if (this.mCurrentChunk != null) {
                this.mCurrentChunk.release();
                this.mCurrentChunk = null;
            }
        } else {
            long curPos = this.mPos;
            while (this.mCurrentChunk != null && curPos < pos) {
                long nextPos = curPos + (long)this.mCurrentChunk.readableBytes();
                if (nextPos <= pos) {
                    curPos = nextPos;
                    this.mCurrentChunk.release();
                    this.mCurrentChunk = this.mDataReader.readChunk();
                    continue;
                }
                int toRead = (int)(pos - curPos);
                byte[] b = new byte[toRead];
                this.mCurrentChunk.readBytes(b, 0, toRead);
                curPos = pos;
            }
            if (curPos < pos) {
                this.closeDataReader();
            }
        }
        this.mPos = pos;
    }

    @Override
    public long skip(long n) throws IOException {
        this.checkIfClosed();
        if (n <= 0L) {
            return 0L;
        }
        long toSkip = Math.min(this.remaining(), n);
        this.mPos += toSkip;
        this.closeDataReader();
        return toSkip;
    }

    @Override
    public void close() throws IOException {
        if (this.mClosed) {
            return;
        }
        try {
            this.closeDataReader();
        }
        finally {
            this.mDataReaderFactory.close();
        }
        this.mClosed = true;
    }

    public boolean isShortCircuit() {
        return this.mDataReaderFactory.isShortCircuit();
    }

    private void readChunk() throws IOException {
        if (this.mDataReader == null) {
            this.mDataReader = this.mDataReaderFactory.create(this.mPos, this.mLength - this.mPos);
        }
        if (this.mCurrentChunk != null && this.mCurrentChunk.readableBytes() == 0) {
            this.mCurrentChunk.release();
            this.mCurrentChunk = null;
        }
        if (this.mCurrentChunk == null) {
            this.mCurrentChunk = this.mDataReader.readChunk();
        }
    }

    private void closeDataReader() throws IOException {
        if (this.mCurrentChunk != null) {
            this.mCurrentChunk.release();
            this.mCurrentChunk = null;
        }
        if (this.mDataReader != null) {
            this.mDataReader.close();
        }
        this.mDataReader = null;
    }

    private void checkIfClosed() {
        Preconditions.checkState(!this.mClosed, (Object)PreconditionMessage.ERR_CLOSED_BLOCK_IN_STREAM);
    }

    public WorkerNetAddress getAddress() {
        return this.mAddress;
    }

    public BlockInStreamSource getSource() {
        return this.mInStreamSource;
    }

    public long getId() {
        return this.mId;
    }

    public static enum BlockInStreamSource {
        PROCESS_LOCAL,
        NODE_LOCAL,
        REMOTE,
        UFS;

    }
}

