/*
 * Decompiled with CFR 0.152.
 */
package alluxio.client.block.stream;

import alluxio.client.block.stream.BlockWorkerClient;
import alluxio.client.block.stream.StreamSerializationClientInterceptor;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.exception.status.AlluxioStatusException;
import alluxio.grpc.AsyncCacheRequest;
import alluxio.grpc.AsyncCacheResponse;
import alluxio.grpc.BlockWorkerGrpc;
import alluxio.grpc.CreateLocalBlockRequest;
import alluxio.grpc.CreateLocalBlockResponse;
import alluxio.grpc.DataMessageMarshaller;
import alluxio.grpc.DataMessageMarshallerProvider;
import alluxio.grpc.GrpcChannel;
import alluxio.grpc.GrpcChannelBuilder;
import alluxio.grpc.GrpcChannelKey;
import alluxio.grpc.GrpcSerializationUtils;
import alluxio.grpc.GrpcServerAddress;
import alluxio.grpc.MoveBlockRequest;
import alluxio.grpc.MoveBlockResponse;
import alluxio.grpc.OpenLocalBlockRequest;
import alluxio.grpc.OpenLocalBlockResponse;
import alluxio.grpc.ReadRequest;
import alluxio.grpc.ReadResponse;
import alluxio.grpc.RemoveBlockRequest;
import alluxio.grpc.RemoveBlockResponse;
import alluxio.grpc.WriteRequest;
import alluxio.grpc.WriteResponse;
import alluxio.shaded.client.com.google.common.base.Preconditions;
import alluxio.shaded.client.com.google.common.io.Closer;
import alluxio.shaded.client.io.grpc.StatusRuntimeException;
import alluxio.shaded.client.io.grpc.stub.StreamObserver;
import alluxio.shaded.client.io.netty.channel.EventLoopGroup;
import alluxio.util.network.NettyUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import javax.security.auth.Subject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultBlockWorkerClient
implements BlockWorkerClient {
    private static final Logger LOG = LoggerFactory.getLogger((String)DefaultBlockWorkerClient.class.getName());
    private GrpcChannel mStreamingChannel;
    private GrpcChannel mRpcChannel;
    private GrpcServerAddress mAddress;
    private final long mDataTimeoutMs;
    private BlockWorkerGrpc.BlockWorkerStub mStreamingAsyncStub;
    private BlockWorkerGrpc.BlockWorkerBlockingStub mRpcBlockingStub;
    private BlockWorkerGrpc.BlockWorkerStub mRpcAsyncStub;

    public DefaultBlockWorkerClient(Subject subject, GrpcServerAddress address, AlluxioConfiguration alluxioConf, EventLoopGroup workerGroup) throws IOException {
        try {
            this.mStreamingChannel = this.buildChannel(subject, address, GrpcChannelKey.PoolingStrategy.DISABLED, alluxioConf, workerGroup);
            this.mStreamingChannel.intercept(new StreamSerializationClientInterceptor());
            this.mRpcChannel = this.buildChannel(subject, address, GrpcChannelKey.PoolingStrategy.DEFAULT, alluxioConf, workerGroup);
        }
        catch (StatusRuntimeException e) {
            throw AlluxioStatusException.fromStatusRuntimeException(e);
        }
        this.mStreamingAsyncStub = BlockWorkerGrpc.newStub(this.mStreamingChannel);
        this.mRpcBlockingStub = BlockWorkerGrpc.newBlockingStub(this.mRpcChannel);
        this.mRpcAsyncStub = BlockWorkerGrpc.newStub(this.mRpcChannel);
        this.mAddress = address;
        this.mDataTimeoutMs = alluxioConf.getMs(PropertyKey.USER_NETWORK_DATA_TIMEOUT_MS);
    }

    @Override
    public boolean isShutdown() {
        return this.mStreamingChannel.isShutdown() || this.mRpcChannel.isShutdown();
    }

    @Override
    public boolean isHealthy() {
        return !this.isShutdown() && this.mStreamingChannel.isHealthy() && this.mRpcChannel.isHealthy();
    }

    @Override
    public void close() throws IOException {
        try (Closer closer = Closer.create();){
            closer.register(() -> this.mStreamingChannel.shutdown());
            closer.register(() -> this.mRpcChannel.shutdown());
        }
    }

    @Override
    public StreamObserver<WriteRequest> writeBlock(StreamObserver<WriteResponse> responseObserver) {
        if (responseObserver instanceof DataMessageMarshallerProvider) {
            DataMessageMarshaller marshaller = ((DataMessageMarshallerProvider)((Object)responseObserver)).getRequestMarshaller();
            Preconditions.checkNotNull(marshaller, "marshaller");
            return ((BlockWorkerGrpc.BlockWorkerStub)this.mStreamingAsyncStub.withOption(GrpcSerializationUtils.OVERRIDDEN_METHOD_DESCRIPTOR, BlockWorkerGrpc.getWriteBlockMethod().toBuilder().setRequestMarshaller(marshaller).build())).writeBlock(responseObserver);
        }
        return this.mStreamingAsyncStub.writeBlock(responseObserver);
    }

    @Override
    public StreamObserver<ReadRequest> readBlock(StreamObserver<ReadResponse> responseObserver) {
        if (responseObserver instanceof DataMessageMarshallerProvider) {
            DataMessageMarshaller marshaller = ((DataMessageMarshallerProvider)((Object)responseObserver)).getResponseMarshaller();
            Preconditions.checkNotNull(marshaller);
            return ((BlockWorkerGrpc.BlockWorkerStub)this.mStreamingAsyncStub.withOption(GrpcSerializationUtils.OVERRIDDEN_METHOD_DESCRIPTOR, BlockWorkerGrpc.getReadBlockMethod().toBuilder().setResponseMarshaller(marshaller).build())).readBlock(responseObserver);
        }
        return this.mStreamingAsyncStub.readBlock(responseObserver);
    }

    @Override
    public StreamObserver<CreateLocalBlockRequest> createLocalBlock(StreamObserver<CreateLocalBlockResponse> responseObserver) {
        return this.mStreamingAsyncStub.createLocalBlock(responseObserver);
    }

    @Override
    public StreamObserver<OpenLocalBlockRequest> openLocalBlock(StreamObserver<OpenLocalBlockResponse> responseObserver) {
        return this.mStreamingAsyncStub.openLocalBlock(responseObserver);
    }

    @Override
    public RemoveBlockResponse removeBlock(RemoveBlockRequest request) {
        return ((BlockWorkerGrpc.BlockWorkerBlockingStub)this.mRpcBlockingStub.withDeadlineAfter(this.mDataTimeoutMs, TimeUnit.MILLISECONDS)).removeBlock(request);
    }

    @Override
    public MoveBlockResponse moveBlock(MoveBlockRequest request) {
        return ((BlockWorkerGrpc.BlockWorkerBlockingStub)this.mRpcBlockingStub.withDeadlineAfter(this.mDataTimeoutMs, TimeUnit.MILLISECONDS)).moveBlock(request);
    }

    @Override
    public void asyncCache(final AsyncCacheRequest request) {
        ((BlockWorkerGrpc.BlockWorkerStub)this.mRpcAsyncStub.withDeadlineAfter(this.mDataTimeoutMs, TimeUnit.MILLISECONDS)).asyncCache(request, new StreamObserver<AsyncCacheResponse>(){

            @Override
            public void onNext(AsyncCacheResponse value) {
            }

            @Override
            public void onError(Throwable t) {
                LOG.warn("Error sending async cache request {} to worker {}.", new Object[]{request, DefaultBlockWorkerClient.this.mAddress, t});
            }

            @Override
            public void onCompleted() {
            }
        });
    }

    private GrpcChannel buildChannel(Subject subject, GrpcServerAddress address, GrpcChannelKey.PoolingStrategy poolingStrategy, AlluxioConfiguration alluxioConf, EventLoopGroup workerGroup) throws AlluxioStatusException {
        return GrpcChannelBuilder.newBuilder(address, alluxioConf).setSubject(subject).setChannelType(NettyUtils.getClientChannelClass(!(address.getSocketAddress() instanceof InetSocketAddress), alluxioConf)).setPoolingStrategy(poolingStrategy).setEventLoopGroup(workerGroup).setKeepAliveTime(alluxioConf.getMs(PropertyKey.USER_NETWORK_KEEPALIVE_TIME_MS), TimeUnit.MILLISECONDS).setKeepAliveTimeout(alluxioConf.getMs(PropertyKey.USER_NETWORK_KEEPALIVE_TIMEOUT_MS), TimeUnit.MILLISECONDS).setMaxInboundMessageSize((int)alluxioConf.getBytes(PropertyKey.USER_NETWORK_MAX_INBOUND_MESSAGE_SIZE)).setFlowControlWindow((int)alluxioConf.getBytes(PropertyKey.USER_NETWORK_FLOWCONTROL_WINDOW)).setClientType("DefaultBlockWorkerClient").build();
    }
}

