/*
 * Decompiled with CFR 0.152.
 */
package alluxio.client.block.stream;

import alluxio.client.block.stream.BlockWorkerClient;
import alluxio.client.block.stream.DataReader;
import alluxio.client.block.stream.GrpcBlockingStream;
import alluxio.client.block.stream.GrpcDataMessageBlockingStream;
import alluxio.client.file.FileSystemContext;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.grpc.DataMessage;
import alluxio.grpc.ReadRequest;
import alluxio.grpc.ReadResponse;
import alluxio.grpc.ReadResponseMarshaller;
import alluxio.network.protocol.databuffer.DataBuffer;
import alluxio.network.protocol.databuffer.NioDataBuffer;
import alluxio.resource.CloseableResource;
import alluxio.shaded.client.com.google.common.base.MoreObjects;
import alluxio.shaded.client.com.google.common.base.Preconditions;
import alluxio.shaded.client.javax.annotation.concurrent.NotThreadSafe;
import alluxio.util.logging.SamplingLogger;
import alluxio.wire.WorkerNetAddress;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
public final class GrpcDataReader
implements DataReader {
    private static final Logger LOG = LoggerFactory.getLogger(GrpcDataReader.class);
    private static final Logger SLOW_CLOSE_LOG = new SamplingLogger(LOG, 60000L);
    private final int mReaderBufferSizeMessages;
    private final long mDataTimeoutMs;
    private final FileSystemContext mContext;
    private final CloseableResource<BlockWorkerClient> mClient;
    private final ReadRequest mReadRequest;
    private final WorkerNetAddress mAddress;
    private final GrpcBlockingStream<ReadRequest, ReadResponse> mStream;
    private final ReadResponseMarshaller mMarshaller;
    private final long mCloseWaitMs;
    private long mPosToRead;

    private GrpcDataReader(FileSystemContext context, WorkerNetAddress address, ReadRequest readRequest) throws IOException {
        this.mContext = context;
        this.mAddress = address;
        this.mPosToRead = readRequest.getOffset();
        this.mReadRequest = readRequest;
        AlluxioConfiguration alluxioConf = context.getClusterConf();
        this.mReaderBufferSizeMessages = alluxioConf.getInt(PropertyKey.USER_STREAMING_READER_BUFFER_SIZE_MESSAGES);
        this.mDataTimeoutMs = alluxioConf.getMs(PropertyKey.USER_STREAMING_DATA_TIMEOUT);
        this.mMarshaller = new ReadResponseMarshaller();
        this.mClient = this.mContext.acquireBlockWorkerClient(address);
        this.mCloseWaitMs = alluxioConf.getMs(PropertyKey.USER_STREAMING_READER_CLOSE_TIMEOUT);
        try {
            if (alluxioConf.getBoolean(PropertyKey.USER_STREAMING_ZEROCOPY_ENABLED)) {
                String desc = "Zero Copy GrpcDataReader";
                if (LOG.isDebugEnabled()) {
                    desc = MoreObjects.toStringHelper(this).add("request", this.mReadRequest).add("address", address).toString();
                }
                this.mStream = new GrpcDataMessageBlockingStream<ReadRequest, ReadResponse>(this.mClient.get()::readBlock, this.mReaderBufferSizeMessages, desc, null, this.mMarshaller);
            } else {
                String desc = "GrpcDataReader";
                if (LOG.isDebugEnabled()) {
                    desc = MoreObjects.toStringHelper(this).add("request", this.mReadRequest).add("address", address).toString();
                }
                this.mStream = new GrpcBlockingStream(this.mClient.get()::readBlock, this.mReaderBufferSizeMessages, desc);
            }
            this.mStream.send(this.mReadRequest, this.mDataTimeoutMs);
        }
        catch (Exception e) {
            this.mClient.close();
            throw e;
        }
    }

    @Override
    public long pos() {
        return this.mPosToRead;
    }

    @Override
    public DataBuffer readChunk() throws IOException {
        Preconditions.checkState(!this.mClient.get().isShutdown(), "Data reader is closed while reading data chunks.");
        DataBuffer buffer = null;
        ReadResponse response = null;
        if (this.mStream instanceof GrpcDataMessageBlockingStream) {
            DataMessage message = ((GrpcDataMessageBlockingStream)this.mStream).receiveDataMessage(this.mDataTimeoutMs);
            if (message != null) {
                response = (ReadResponse)message.getMessage();
                buffer = message.getBuffer();
                if (buffer == null && response.hasChunk() && response.getChunk().hasData()) {
                    ByteBuffer byteBuffer = response.getChunk().getData().asReadOnlyByteBuffer();
                    buffer = new NioDataBuffer(byteBuffer, byteBuffer.remaining());
                }
                Preconditions.checkState(buffer != null, "response should always contain chunk");
            }
        } else {
            response = this.mStream.receive(this.mDataTimeoutMs);
            if (response != null) {
                Preconditions.checkState(response.hasChunk() && response.getChunk().hasData(), "response should always contain chunk");
                ByteBuffer byteBuffer = response.getChunk().getData().asReadOnlyByteBuffer();
                buffer = new NioDataBuffer(byteBuffer, byteBuffer.remaining());
            }
        }
        if (response == null) {
            return null;
        }
        this.mPosToRead += (long)buffer.readableBytes();
        try {
            this.mStream.send(this.mReadRequest.toBuilder().setOffsetReceived(this.mPosToRead).build());
        }
        catch (Exception e) {
            LOG.debug("Failed to send receipt of data to worker {} for request {}: {}.", new Object[]{this.mAddress, this.mReadRequest, e.getMessage()});
        }
        Preconditions.checkState(this.mPosToRead - this.mReadRequest.getOffset() <= this.mReadRequest.getLength());
        return buffer;
    }

    @Override
    public void close() throws IOException {
        try {
            if (this.mClient.get().isShutdown()) {
                return;
            }
            this.mStream.close();
            try {
                if (this.mCloseWaitMs > 0L) {
                    this.mStream.waitForComplete(this.mCloseWaitMs);
                }
            }
            catch (Throwable e) {
                SLOW_CLOSE_LOG.warn("Closing gRPC read stream took longer than {}ms, moving on. blockId: {}, address: {}", new Object[]{this.mCloseWaitMs, this.mReadRequest.getBlockId(), this.mAddress});
            }
        }
        finally {
            this.mMarshaller.close();
            this.mClient.close();
        }
    }

    public static class Factory
    implements DataReader.Factory {
        private final FileSystemContext mContext;
        private final WorkerNetAddress mAddress;
        private final ReadRequest.Builder mReadRequestBuilder;

        public Factory(FileSystemContext context, WorkerNetAddress address, ReadRequest.Builder readRequestBuilder) {
            this.mContext = context;
            this.mAddress = address;
            this.mReadRequestBuilder = readRequestBuilder;
        }

        @Override
        public DataReader create(long offset, long len) throws IOException {
            return new GrpcDataReader(this.mContext, this.mAddress, this.mReadRequestBuilder.setOffset(offset).setLength(len).build());
        }

        @Override
        public boolean isShortCircuit() {
            return false;
        }

        @Override
        public void close() throws IOException {
        }
    }
}

