/*
 * Decompiled with CFR 0.152.
 */
package alluxio.client.file.dora;

import alluxio.AlluxioURI;
import alluxio.CloseableSupplier;
import alluxio.PositionReader;
import alluxio.client.block.BlockWorkerInfo;
import alluxio.client.block.stream.BlockWorkerClient;
import alluxio.client.block.stream.GrpcDataReader;
import alluxio.client.file.DoraFileOutStream;
import alluxio.client.file.FileOutStream;
import alluxio.client.file.FileSystemContext;
import alluxio.client.file.PositionReadFileInStream;
import alluxio.client.file.URIStatus;
import alluxio.client.file.dora.DoraCachePositionReader;
import alluxio.client.file.dora.WorkerLocationPolicy;
import alluxio.client.file.dora.netty.NettyDataReader;
import alluxio.client.file.dora.netty.NettyDataWriter;
import alluxio.client.file.options.OutStreamOptions;
import alluxio.collections.Pair;
import alluxio.conf.PropertyKey;
import alluxio.exception.AlluxioException;
import alluxio.exception.FileDoesNotExistException;
import alluxio.exception.InvalidPathException;
import alluxio.exception.status.PermissionDeniedException;
import alluxio.grpc.CacheDataRequest;
import alluxio.grpc.CacheDataResponse;
import alluxio.grpc.CompleteFilePOptions;
import alluxio.grpc.CompleteFilePRequest;
import alluxio.grpc.CreateDirectoryPOptions;
import alluxio.grpc.CreateDirectoryPRequest;
import alluxio.grpc.CreateFilePOptions;
import alluxio.grpc.CreateFilePRequest;
import alluxio.grpc.CreateFilePResponse;
import alluxio.grpc.DeletePOptions;
import alluxio.grpc.DeletePRequest;
import alluxio.grpc.ExistsPOptions;
import alluxio.grpc.ExistsPRequest;
import alluxio.grpc.ExistsPResponse;
import alluxio.grpc.FileInfo;
import alluxio.grpc.GetStatusPOptions;
import alluxio.grpc.GetStatusPRequest;
import alluxio.grpc.GrpcUtils;
import alluxio.grpc.ListStatusPOptions;
import alluxio.grpc.ListStatusPRequest;
import alluxio.grpc.ReadRequest;
import alluxio.grpc.RenamePOptions;
import alluxio.grpc.RenamePRequest;
import alluxio.grpc.RequestType;
import alluxio.grpc.SetAttributePOptions;
import alluxio.grpc.SetAttributePRequest;
import alluxio.proto.dataserver.Protocol;
import alluxio.resource.CloseableResource;
import alluxio.shaded.client.com.google.common.base.Preconditions;
import alluxio.shaded.client.com.google.common.util.concurrent.FutureCallback;
import alluxio.shaded.client.com.google.common.util.concurrent.Futures;
import alluxio.shaded.client.com.google.common.util.concurrent.ListenableFuture;
import alluxio.shaded.client.com.google.common.util.concurrent.MoreExecutors;
import alluxio.shaded.client.javax.annotation.Nullable;
import alluxio.wire.WorkerNetAddress;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DoraCacheClient {
    public static final int DUMMY_BLOCK_ID = -1;
    private final FileSystemContext mContext;
    private final long mChunkSize;
    private final WorkerLocationPolicy mWorkerLocationPolicy;
    private final boolean mNettyTransEnabled;
    private final int mPreferredWorkerCount;
    private final boolean mEnableDynamicHashRing;
    private static final Logger LOG = LoggerFactory.getLogger(DoraCacheClient.class);

    public DoraCacheClient(FileSystemContext context) {
        this.mContext = context;
        this.mWorkerLocationPolicy = WorkerLocationPolicy.Factory.create(context.getClusterConf());
        this.mChunkSize = this.mContext.getClusterConf().getBytes(PropertyKey.USER_STREAMING_READER_CHUNK_SIZE_BYTES);
        this.mNettyTransEnabled = context.getClusterConf().getBoolean(PropertyKey.USER_NETTY_DATA_TRANSMISSION_ENABLED);
        this.mEnableDynamicHashRing = context.getClusterConf().getBoolean(PropertyKey.USER_DYNAMIC_CONSISTENT_HASH_RING_ENABLED);
        int minReplicaCount = context.getClusterConf().getInt(PropertyKey.USER_FILE_REPLICATION_MIN);
        this.mPreferredWorkerCount = Math.max(1, minReplicaCount);
    }

    public PositionReadFileInStream getInStream(URIStatus status, Protocol.OpenUfsBlockOptions ufsOptions) {
        WorkerNetAddress workerNetAddress = this.getWorkerNetAddress(status.getUfsPath());
        if (!this.mNettyTransEnabled) {
            throw new UnsupportedOperationException("Grpc dora reader not implemented");
        }
        NettyDataReader reader = this.createNettyDataReader(workerNetAddress, ufsOptions);
        return new PositionReadFileInStream(reader, status, this);
    }

    public DoraFileOutStream getOutStream(AlluxioURI alluxioPath, FileSystemContext fsContext, OutStreamOptions outStreamOptions, @Nullable FileOutStream ufsOutStream, String uuid) throws IOException {
        WorkerNetAddress workerNetAddress = this.getWorkerNetAddress(alluxioPath.toString());
        NettyDataWriter writer = NettyDataWriter.create(fsContext, workerNetAddress, Long.MAX_VALUE, RequestType.ALLUXIO_BLOCK, outStreamOptions);
        return new DoraFileOutStream(this, writer, alluxioPath, outStreamOptions, fsContext, ufsOutStream, uuid);
    }

    protected long getChunkSize() {
        return this.mChunkSize;
    }

    public DoraCachePositionReader createNettyPositionReader(URIStatus status, Protocol.OpenUfsBlockOptions ufsOptions, Optional<CloseableSupplier<PositionReader>> externalPositionReader) {
        WorkerNetAddress workerNetAddress = this.getWorkerNetAddress(status.toString());
        NettyDataReader reader = this.createNettyDataReader(workerNetAddress, ufsOptions);
        return new DoraCachePositionReader(reader, status.getLength(), externalPositionReader);
    }

    protected GrpcDataReader.Factory createGrpcDataReader(WorkerNetAddress workerNetAddress, Protocol.OpenUfsBlockOptions ufsOptions) {
        ReadRequest.Builder builder = ReadRequest.newBuilder().setBlockId(-1L).setOpenUfsBlockOptions(ufsOptions).setChunkSize(this.mChunkSize);
        return new GrpcDataReader.Factory(this.mContext, workerNetAddress, builder);
    }

    protected NettyDataReader createNettyDataReader(WorkerNetAddress workerNetAddress, Protocol.OpenUfsBlockOptions ufsOptions) {
        Protocol.ReadRequest.Builder builder = Protocol.ReadRequest.newBuilder().setBlockId(-1L).setOpenUfsBlockOptions(ufsOptions).setChunkSize(this.mChunkSize);
        return new NettyDataReader(this.mContext, workerNetAddress, builder);
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public List<URIStatus> listStatus(String path, ListStatusPOptions options) throws PermissionDeniedException {
        try (CloseableResource<BlockWorkerClient> client = this.mContext.acquireBlockWorkerClient(this.getWorkerNetAddress(path));){
            ArrayList<URIStatus> result = new ArrayList<URIStatus>();
            client.get().listStatus(ListStatusPRequest.newBuilder().setPath(path).setOptions(options).build()).forEachRemaining(pListStatusResponse -> result.addAll(pListStatusResponse.getFileInfosList().stream().map(pFileInfo -> new URIStatus(GrpcUtils.fromProto(pFileInfo))).collect(Collectors.toList())));
            ArrayList<URIStatus> arrayList = result;
            return arrayList;
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public URIStatus getStatus(String path, GetStatusPOptions options) throws PermissionDeniedException {
        return this.getStatusByGrpc(path, options);
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    protected URIStatus getStatusByGrpc(String path, GetStatusPOptions options) throws PermissionDeniedException {
        try (CloseableResource<BlockWorkerClient> client = this.mContext.acquireBlockWorkerClient(this.getWorkerNetAddress(path));){
            GetStatusPRequest request = GetStatusPRequest.newBuilder().setPath(path).setOptions(options).build();
            FileInfo fileInfo = client.get().getStatus(request).getFileInfo();
            URIStatus uRIStatus = new URIStatus(GrpcUtils.fromProto(fileInfo));
            return uRIStatus;
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public Pair<URIStatus, String> createFile(String path, CreateFilePOptions options) throws PermissionDeniedException {
        try (CloseableResource<BlockWorkerClient> client = this.mContext.acquireBlockWorkerClient(this.getWorkerNetAddress(path));){
            CreateFilePRequest request = CreateFilePRequest.newBuilder().setPath(path).setOptions(options).build();
            CreateFilePResponse response = client.get().createFile(request);
            FileInfo fileInfo = response.getFileInfo();
            String uuid = response.getUuid();
            Pair<URIStatus, String> pair = new Pair<URIStatus, String>(new URIStatus(GrpcUtils.fromProto(fileInfo)), uuid);
            return pair;
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public Map<String, List<WorkerNetAddress>> checkFileLocation(String path, GetStatusPOptions options) throws IOException {
        HashMap<String, List<WorkerNetAddress>> pathDistributionMap = new HashMap<String, List<WorkerNetAddress>>();
        List<BlockWorkerInfo> workers = this.mContext.getLiveWorkers();
        for (BlockWorkerInfo worker : workers) {
            try {
                CloseableResource<BlockWorkerClient> client = this.mContext.acquireBlockWorkerClient(worker.getNetAddress());
                Throwable throwable = null;
                try {
                    GetStatusPRequest request = GetStatusPRequest.newBuilder().setPath(path).setOptions(options).build();
                    try {
                        FileInfo fileInfo = client.get().getStatus(request).getFileInfo();
                        URIStatus uriStatus = new URIStatus(GrpcUtils.fromProto(fileInfo));
                        if (uriStatus.getInAlluxioPercentage() <= 0) continue;
                        ArrayList<WorkerNetAddress> assignedWorkers = (ArrayList<WorkerNetAddress>)pathDistributionMap.get(path);
                        if (assignedWorkers == null) {
                            assignedWorkers = new ArrayList<WorkerNetAddress>();
                        }
                        assignedWorkers.add(worker.getNetAddress());
                        pathDistributionMap.put(path, assignedWorkers);
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                }
                catch (Throwable throwable2) {
                    throwable = throwable2;
                    throw throwable2;
                }
                finally {
                    if (client == null) continue;
                    if (throwable != null) {
                        try {
                            client.close();
                        }
                        catch (Throwable throwable3) {
                            throwable.addSuppressed(throwable3);
                        }
                        continue;
                    }
                    client.close();
                }
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
        return pathDistributionMap;
    }

    public void completeFile(String path, CompleteFilePOptions options, String uuid) throws PermissionDeniedException {
        try (CloseableResource<BlockWorkerClient> client = this.mContext.acquireBlockWorkerClient(this.getWorkerNetAddress(path));){
            CompleteFilePRequest request = CompleteFilePRequest.newBuilder().setPath(path).setOptions(options).setUuid(uuid).build();
            client.get().completeFile(request);
            return;
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void delete(String path, DeletePOptions options) throws PermissionDeniedException {
        try (CloseableResource<BlockWorkerClient> client = this.mContext.acquireBlockWorkerClient(this.getWorkerNetAddress(path));){
            DeletePRequest request = DeletePRequest.newBuilder().setPath(path).setOptions(options).build();
            client.get().delete(request);
            return;
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void rename(String src, String dst, RenamePOptions options) throws PermissionDeniedException {
        try (CloseableResource<BlockWorkerClient> client = this.mContext.acquireBlockWorkerClient(this.getWorkerNetAddress(src));){
            RenamePRequest request = RenamePRequest.newBuilder().setPath(src).setDstPath(dst).setOptions(options).build();
            client.get().rename(request);
            return;
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void createDirectory(String path, CreateDirectoryPOptions options) throws PermissionDeniedException {
        try (CloseableResource<BlockWorkerClient> client = this.mContext.acquireBlockWorkerClient(this.getWorkerNetAddress(path));){
            CreateDirectoryPRequest request = CreateDirectoryPRequest.newBuilder().setPath(path).setOptions(options).build();
            client.get().createDirectory(request);
            return;
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public boolean exists(String path, ExistsPOptions options) throws InvalidPathException, IOException, AlluxioException {
        try (CloseableResource<BlockWorkerClient> client = this.mContext.acquireBlockWorkerClient(this.getWorkerNetAddress(path));){
            ExistsPRequest request = ExistsPRequest.newBuilder().setPath(path).setOptions(options).build();
            ExistsPResponse response = client.get().exists(request);
            boolean bl = response.getExists();
            return bl;
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void setAttribute(String path, SetAttributePOptions options) throws FileDoesNotExistException, IOException, AlluxioException {
        try (CloseableResource<BlockWorkerClient> client = this.mContext.acquireBlockWorkerClient(this.getWorkerNetAddress(path));){
            SetAttributePRequest request = SetAttributePRequest.newBuilder().setPath(path).setOptions(options).build();
            client.get().setAttribute(request);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public WorkerNetAddress getWorkerNetAddress(String path) {
        try {
            List<BlockWorkerInfo> workers = this.mEnableDynamicHashRing ? this.mContext.getCachedWorkers(FileSystemContext.GetWorkerListType.LIVE) : this.mContext.getCachedWorkers(FileSystemContext.GetWorkerListType.ALL);
            List<BlockWorkerInfo> preferredWorkers = this.mWorkerLocationPolicy.getPreferredWorkers(workers, path, this.mPreferredWorkerCount);
            Preconditions.checkState(preferredWorkers.size() > 0);
            BlockWorkerInfo worker = this.choosePreferredWorker(preferredWorkers);
            if (!worker.isActive()) {
                throw new RuntimeException("The preferred worker is not active.");
            }
            WorkerNetAddress workerNetAddress = worker.getNetAddress();
            return workerNetAddress;
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void cacheData(final String ufsPath, long pos, long length) {
        try (CloseableResource<BlockWorkerClient> client = this.mContext.acquireBlockWorkerClient(this.getWorkerNetAddress(ufsPath));){
            CacheDataRequest request = CacheDataRequest.newBuilder().setUfsPath(ufsPath).setPos(pos).setLength(length).setAsync(true).build();
            ListenableFuture<CacheDataResponse> future = client.get().cacheData(request);
            Futures.addCallback(future, new FutureCallback<CacheDataResponse>(){

                @Override
                public void onSuccess(CacheDataResponse result) {
                }

                @Override
                public void onFailure(Throwable t) {
                    LOG.warn("Preloading {} failed", (Object)ufsPath, (Object)t);
                }
            }, MoreExecutors.directExecutor());
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    protected BlockWorkerInfo choosePreferredWorker(List<BlockWorkerInfo> workers) {
        return workers.get(0);
    }

    public FileSystemContext getContext() {
        return this.mContext;
    }
}

