/*
 * Decompiled with CFR 0.152.
 */
package alluxio.client.block.stream;

import alluxio.client.block.stream.BlockWorkerClient;
import alluxio.client.block.stream.StreamSerializationClientInterceptor;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.exception.status.AlluxioStatusException;
import alluxio.exception.status.UnauthenticatedException;
import alluxio.grpc.BlockWorkerGrpc;
import alluxio.grpc.CacheRequest;
import alluxio.grpc.ClearMetricsRequest;
import alluxio.grpc.ClearMetricsResponse;
import alluxio.grpc.CopyRequest;
import alluxio.grpc.CopyResponse;
import alluxio.grpc.CreateLocalBlockRequest;
import alluxio.grpc.CreateLocalBlockResponse;
import alluxio.grpc.DataMessageMarshaller;
import alluxio.grpc.DataMessageMarshallerProvider;
import alluxio.grpc.FreeWorkerRequest;
import alluxio.grpc.GetStatusPRequest;
import alluxio.grpc.GetStatusPResponse;
import alluxio.grpc.GrpcChannel;
import alluxio.grpc.GrpcChannelBuilder;
import alluxio.grpc.GrpcNetworkGroup;
import alluxio.grpc.GrpcSerializationUtils;
import alluxio.grpc.GrpcServerAddress;
import alluxio.grpc.ListStatusPRequest;
import alluxio.grpc.ListStatusPResponse;
import alluxio.grpc.LoadRequest;
import alluxio.grpc.LoadResponse;
import alluxio.grpc.MoveBlockRequest;
import alluxio.grpc.MoveBlockResponse;
import alluxio.grpc.OpenLocalBlockRequest;
import alluxio.grpc.OpenLocalBlockResponse;
import alluxio.grpc.ReadRequest;
import alluxio.grpc.ReadResponse;
import alluxio.grpc.RemoveBlockRequest;
import alluxio.grpc.RemoveBlockResponse;
import alluxio.grpc.WriteRequest;
import alluxio.grpc.WriteResponse;
import alluxio.resource.AlluxioResourceLeakDetectorFactory;
import alluxio.resource.LockResource;
import alluxio.retry.RetryPolicy;
import alluxio.retry.RetryUtils;
import alluxio.security.user.UserState;
import alluxio.shaded.client.com.google.common.io.Closer;
import alluxio.shaded.client.com.google.common.util.concurrent.ListenableFuture;
import alluxio.shaded.client.com.google.common.util.concurrent.SettableFuture;
import alluxio.shaded.client.io.grpc.StatusRuntimeException;
import alluxio.shaded.client.io.grpc.stub.StreamObserver;
import alluxio.shaded.client.io.netty.util.ResourceLeakDetector;
import alluxio.shaded.client.io.netty.util.ResourceLeakTracker;
import alluxio.shaded.client.javax.annotation.Nullable;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultBlockWorkerClient
implements BlockWorkerClient {
    private static final Logger LOG = LoggerFactory.getLogger((String)DefaultBlockWorkerClient.class.getName());
    private static final ResourceLeakDetector<DefaultBlockWorkerClient> DETECTOR = AlluxioResourceLeakDetectorFactory.instance().newResourceLeakDetector(DefaultBlockWorkerClient.class);
    private GrpcChannel mStreamingChannel;
    private GrpcChannel mRpcChannel;
    private final GrpcServerAddress mAddress;
    private final long mRpcTimeoutMs;
    private final BlockWorkerGrpc.BlockWorkerStub mStreamingAsyncStub;
    private final BlockWorkerGrpc.BlockWorkerBlockingStub mRpcBlockingStub;
    private final BlockWorkerGrpc.BlockWorkerFutureStub mRpcFutureStub;
    @Nullable
    private final ResourceLeakTracker<DefaultBlockWorkerClient> mTracker;

    public DefaultBlockWorkerClient(UserState userState, GrpcServerAddress address, AlluxioConfiguration alluxioConf) throws IOException {
        RetryPolicy retryPolicy = RetryUtils.defaultClientRetry();
        UnauthenticatedException lastException = null;
        while (retryPolicy.attempt()) {
            try {
                this.mStreamingChannel = GrpcChannelBuilder.newBuilder(address, alluxioConf).setSubject(userState.getSubject()).setNetworkGroup(GrpcNetworkGroup.STREAMING).build();
                this.mStreamingChannel.intercept(new StreamSerializationClientInterceptor());
                this.mRpcChannel = GrpcChannelBuilder.newBuilder(address, alluxioConf).setSubject(userState.getSubject()).setNetworkGroup(GrpcNetworkGroup.RPC).build();
                lastException = null;
                break;
            }
            catch (StatusRuntimeException e) {
                this.close();
                throw AlluxioStatusException.fromStatusRuntimeException(e);
            }
            catch (UnauthenticatedException e) {
                this.close();
                userState.relogin();
                lastException = e;
            }
        }
        if (lastException != null) {
            throw lastException;
        }
        this.mStreamingAsyncStub = BlockWorkerGrpc.newStub(this.mStreamingChannel);
        this.mRpcBlockingStub = BlockWorkerGrpc.newBlockingStub(this.mRpcChannel);
        this.mRpcFutureStub = BlockWorkerGrpc.newFutureStub(this.mRpcChannel);
        this.mAddress = address;
        this.mRpcTimeoutMs = alluxioConf.getMs(PropertyKey.USER_RPC_RETRY_MAX_DURATION);
        this.mTracker = DETECTOR.track(this);
    }

    protected DefaultBlockWorkerClient(UserState userState, GrpcServerAddress address, AlluxioConfiguration alluxioConf, GrpcChannel streamingChannel, GrpcChannel rpcChannel) throws IOException {
        this.mStreamingChannel = streamingChannel;
        this.mRpcChannel = rpcChannel;
        this.mStreamingAsyncStub = BlockWorkerGrpc.newStub(this.mStreamingChannel);
        this.mRpcBlockingStub = BlockWorkerGrpc.newBlockingStub(this.mRpcChannel);
        this.mRpcFutureStub = BlockWorkerGrpc.newFutureStub(this.mRpcChannel);
        this.mAddress = address;
        this.mRpcTimeoutMs = alluxioConf.getMs(PropertyKey.USER_RPC_RETRY_MAX_DURATION);
        this.mTracker = DETECTOR.track(this);
    }

    @Override
    public boolean isShutdown() {
        return this.mStreamingChannel.isShutdown() || this.mRpcChannel.isShutdown();
    }

    @Override
    public boolean isHealthy() {
        return !this.isShutdown() && this.mStreamingChannel.isHealthy() && this.mRpcChannel.isHealthy();
    }

    @Override
    public void close() throws IOException {
        try (Closer closer = Closer.create();){
            closer.register(() -> {
                if (this.mStreamingChannel != null) {
                    this.mStreamingChannel.shutdown();
                }
            });
            closer.register(() -> {
                if (this.mRpcChannel != null) {
                    this.mRpcChannel.shutdown();
                }
            });
            closer.register(() -> {
                if (this.mTracker != null) {
                    this.mTracker.close(this);
                }
            });
        }
    }

    @Override
    public StreamObserver<WriteRequest> writeBlock(StreamObserver<WriteResponse> responseObserver) {
        if (responseObserver instanceof DataMessageMarshallerProvider) {
            DataMessageMarshaller marshaller = ((DataMessageMarshallerProvider)((Object)responseObserver)).getRequestMarshaller().orElseThrow(NullPointerException::new);
            return ((BlockWorkerGrpc.BlockWorkerStub)this.mStreamingAsyncStub.withOption(GrpcSerializationUtils.OVERRIDDEN_METHOD_DESCRIPTOR, BlockWorkerGrpc.getWriteBlockMethod().toBuilder().setRequestMarshaller(marshaller).build())).writeBlock(responseObserver);
        }
        return this.mStreamingAsyncStub.writeBlock(responseObserver);
    }

    @Override
    public ListenableFuture<Object> readBlockNoDataBack(ReadRequest request) {
        NoDataReadStreamObserver responseStreamObserver = new NoDataReadStreamObserver();
        StreamObserver<ReadRequest> requestStreamObserver = this.mStreamingAsyncStub.readBlock(responseStreamObserver);
        requestStreamObserver.onNext(request);
        requestStreamObserver.onCompleted();
        return responseStreamObserver.getFuture();
    }

    @Override
    public StreamObserver<ReadRequest> readBlock(StreamObserver<ReadResponse> responseObserver) {
        if (responseObserver instanceof DataMessageMarshallerProvider) {
            DataMessageMarshaller marshaller = ((DataMessageMarshallerProvider)((Object)responseObserver)).getResponseMarshaller().orElseThrow(NullPointerException::new);
            return ((BlockWorkerGrpc.BlockWorkerStub)this.mStreamingAsyncStub.withOption(GrpcSerializationUtils.OVERRIDDEN_METHOD_DESCRIPTOR, BlockWorkerGrpc.getReadBlockMethod().toBuilder().setResponseMarshaller(marshaller).build())).readBlock(responseObserver);
        }
        return this.mStreamingAsyncStub.readBlock(responseObserver);
    }

    @Override
    public StreamObserver<CreateLocalBlockRequest> createLocalBlock(StreamObserver<CreateLocalBlockResponse> responseObserver) {
        return this.mStreamingAsyncStub.createLocalBlock(responseObserver);
    }

    @Override
    public StreamObserver<OpenLocalBlockRequest> openLocalBlock(StreamObserver<OpenLocalBlockResponse> responseObserver) {
        return this.mStreamingAsyncStub.openLocalBlock(responseObserver);
    }

    @Override
    public RemoveBlockResponse removeBlock(RemoveBlockRequest request) {
        return ((BlockWorkerGrpc.BlockWorkerBlockingStub)this.mRpcBlockingStub.withDeadlineAfter(this.mRpcTimeoutMs, TimeUnit.MILLISECONDS)).removeBlock(request);
    }

    @Override
    public MoveBlockResponse moveBlock(MoveBlockRequest request) {
        return ((BlockWorkerGrpc.BlockWorkerBlockingStub)this.mRpcBlockingStub.withDeadlineAfter(this.mRpcTimeoutMs, TimeUnit.MILLISECONDS)).moveBlock(request);
    }

    @Override
    public ClearMetricsResponse clearMetrics(ClearMetricsRequest request) {
        return ((BlockWorkerGrpc.BlockWorkerBlockingStub)this.mRpcBlockingStub.withDeadlineAfter(this.mRpcTimeoutMs, TimeUnit.MILLISECONDS)).clearMetrics(request);
    }

    @Override
    public void cache(CacheRequest request) {
        boolean async = request.getAsync();
        try {
            ((BlockWorkerGrpc.BlockWorkerBlockingStub)this.mRpcBlockingStub.withDeadlineAfter(this.mRpcTimeoutMs, TimeUnit.MILLISECONDS)).cache(request);
        }
        catch (Exception e) {
            if (!async) {
                throw e;
            }
            LOG.warn("Error sending async cache request {} to worker {}.", new Object[]{request, this.mAddress, e});
        }
    }

    @Override
    public void freeWorker() {
        ((BlockWorkerGrpc.BlockWorkerBlockingStub)this.mRpcBlockingStub.withDeadlineAfter(this.mRpcTimeoutMs, TimeUnit.MILLISECONDS)).freeWorker(FreeWorkerRequest.getDefaultInstance());
    }

    @Override
    public ListenableFuture<LoadResponse> load(LoadRequest request) {
        return this.mRpcFutureStub.load(request);
    }

    @Override
    public GetStatusPResponse getStatus(GetStatusPRequest request) {
        return ((BlockWorkerGrpc.BlockWorkerBlockingStub)this.mRpcBlockingStub.withDeadlineAfter(this.mRpcTimeoutMs, TimeUnit.MILLISECONDS)).getStatus(request);
    }

    @Override
    public Iterator<ListStatusPResponse> listStatus(ListStatusPRequest request) {
        return ((BlockWorkerGrpc.BlockWorkerBlockingStub)this.mRpcBlockingStub.withDeadlineAfter(this.mRpcTimeoutMs, TimeUnit.MILLISECONDS)).listStatus(request);
    }

    @Override
    public ListenableFuture<CopyResponse> copy(CopyRequest request) {
        return this.mRpcFutureStub.copy(request);
    }

    public static class NoDataReadStreamObserver
    implements StreamObserver<ReadResponse> {
        SettableFuture<Object> mFuture = SettableFuture.create();
        ReentrantLock mLock = new ReentrantLock();

        @Override
        public void onNext(ReadResponse response) {
        }

        @Override
        public void onError(Throwable t) {
            try (LockResource ignored = new LockResource(this.mLock);){
                LOG.warn("onError : {}", t);
                this.mFuture.setException(t);
            }
        }

        @Override
        public void onCompleted() {
            try (LockResource ignored = new LockResource(this.mLock);){
                LOG.info("onComplete.");
                this.mFuture.set(true);
            }
        }

        public ListenableFuture<Object> getFuture() {
            return this.mFuture;
        }
    }
}

