/*
 * Decompiled with CFR 0.152.
 */
package alluxio.client.file;

import alluxio.AlluxioURI;
import alluxio.ClientContext;
import alluxio.client.block.BlockMasterClient;
import alluxio.client.block.BlockMasterClientPool;
import alluxio.client.block.stream.BlockWorkerClient;
import alluxio.client.block.stream.BlockWorkerClientPool;
import alluxio.client.file.FileSystemContextReinitializer;
import alluxio.client.file.FileSystemMasterClient;
import alluxio.client.file.FileSystemMasterClientPool;
import alluxio.client.metrics.MetricsHeartbeatContext;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.conf.path.SpecificPathConfiguration;
import alluxio.exception.ExceptionMessage;
import alluxio.exception.status.AlluxioStatusException;
import alluxio.exception.status.UnavailableException;
import alluxio.grpc.GrpcServerAddress;
import alluxio.master.MasterClientContext;
import alluxio.master.MasterInquireClient;
import alluxio.resource.CloseableResource;
import alluxio.security.authentication.AuthenticationUserUtils;
import alluxio.shaded.client.com.google.common.annotations.VisibleForTesting;
import alluxio.shaded.client.com.google.common.base.MoreObjects;
import alluxio.shaded.client.com.google.common.base.Objects;
import alluxio.shaded.client.com.google.common.base.Preconditions;
import alluxio.shaded.client.io.netty.channel.EventLoopGroup;
import alluxio.shaded.client.javax.annotation.Nullable;
import alluxio.shaded.client.javax.annotation.concurrent.GuardedBy;
import alluxio.shaded.client.javax.annotation.concurrent.ThreadSafe;
import alluxio.util.IdUtils;
import alluxio.util.network.NettyUtils;
import alluxio.util.network.NetworkAddressUtils;
import alluxio.wire.WorkerInfo;
import alluxio.wire.WorkerNetAddress;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.security.auth.Subject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public final class FileSystemContext
implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(FileSystemContext.class);
    private final String mId;
    private AtomicBoolean mClosed = new AtomicBoolean(false);
    @GuardedBy(value="this")
    private boolean mMetricsEnabled;
    private volatile MasterClientContext mMasterClientContext;
    private volatile FileSystemMasterClientPool mFileSystemMasterClientPool;
    private volatile BlockMasterClientPool mBlockMasterClientPool;
    private final ConcurrentHashMap<ClientPoolKey, BlockWorkerClientPool> mBlockWorkerClientPool = new ConcurrentHashMap();
    private volatile EventLoopGroup mWorkerGroup;
    @GuardedBy(value="this")
    private boolean mLocalWorkerInitialized;
    @GuardedBy(value="this")
    private WorkerNetAddress mLocalWorker;
    private volatile FileSystemContextReinitializer mReinitializer;

    public static FileSystemContext create(AlluxioConfiguration conf) {
        Preconditions.checkNotNull(conf);
        return FileSystemContext.create(null, conf);
    }

    public static FileSystemContext create(@Nullable Subject subject, @Nullable AlluxioConfiguration conf) {
        FileSystemContext context = new FileSystemContext();
        ClientContext ctx = ClientContext.create(subject, conf);
        MasterInquireClient inquireClient = MasterInquireClient.Factory.create(ctx.getClusterConf());
        context.init(ctx, inquireClient);
        return context;
    }

    public static FileSystemContext create(ClientContext clientContext) {
        FileSystemContext ctx = new FileSystemContext();
        ctx.init(clientContext, MasterInquireClient.Factory.create(clientContext.getClusterConf()));
        return ctx;
    }

    @VisibleForTesting
    public static FileSystemContext create(Subject subject, MasterInquireClient masterInquireClient, AlluxioConfiguration alluxioConf) {
        FileSystemContext context = new FileSystemContext();
        ClientContext ctx = ClientContext.create(subject, alluxioConf);
        context.init(ctx, masterInquireClient);
        return context;
    }

    private FileSystemContext() {
        this.mId = IdUtils.createFileSystemContextId();
    }

    private synchronized void init(ClientContext clientContext, MasterInquireClient masterInquireClient) {
        this.initContext(clientContext, masterInquireClient);
        this.mReinitializer = new FileSystemContextReinitializer(this);
    }

    private synchronized void initContext(ClientContext ctx, MasterInquireClient masterInquireClient) {
        this.mClosed.set(false);
        this.mMasterClientContext = MasterClientContext.newBuilder(ctx).setMasterInquireClient(masterInquireClient).build();
        this.mFileSystemMasterClientPool = new FileSystemMasterClientPool(this.mMasterClientContext);
        this.mBlockMasterClientPool = new BlockMasterClientPool(this.mMasterClientContext);
        this.mWorkerGroup = NettyUtils.createEventLoop(NettyUtils.getUserChannel(this.getClusterConf()), this.getClusterConf().getInt(PropertyKey.USER_NETWORK_NETTY_WORKER_THREADS), String.format("alluxio-client-nettyPool-%s-%%d", this.mId), true);
        this.mMetricsEnabled = this.getClusterConf().getBoolean(PropertyKey.USER_METRICS_COLLECTION_ENABLED);
        if (this.mMetricsEnabled) {
            MetricsHeartbeatContext.addHeartbeat(this.getClientContext(), masterInquireClient);
        }
    }

    @Override
    public synchronized void close() throws IOException {
        this.mReinitializer.close();
        this.closeContext();
    }

    private synchronized void closeContext() throws IOException {
        if (!this.mClosed.get()) {
            this.mClosed.set(true);
            this.mWorkerGroup.shutdownGracefully(1L, 10L, TimeUnit.SECONDS);
            this.mFileSystemMasterClientPool.close();
            this.mFileSystemMasterClientPool = null;
            this.mBlockMasterClientPool.close();
            this.mBlockMasterClientPool = null;
            for (BlockWorkerClientPool pool : this.mBlockWorkerClientPool.values()) {
                pool.close();
            }
            this.mBlockWorkerClientPool.clear();
            this.mLocalWorkerInitialized = false;
            this.mLocalWorker = null;
            if (this.mMetricsEnabled) {
                MetricsHeartbeatContext.removeHeartbeat(this.getClientContext());
            }
        } else {
            LOG.warn("Attempted to close FileSystemContext which has already been closed or not initialized.");
        }
    }

    public FileSystemContextReinitializer.ReinitBlockerResource blockReinit() {
        try {
            return this.mReinitializer.block();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void reinit(boolean updateClusterConf, boolean updatePathConf) throws UnavailableException, IOException {
        try (FileSystemContextReinitializer.ReinitAllowerResource r = this.mReinitializer.allow();){
            InetSocketAddress masterAddr;
            try {
                masterAddr = this.getMasterAddress();
            }
            catch (IOException e) {
                throw new UnavailableException("Failed to get master address during reinitialization", e);
            }
            try {
                this.getClientContext().loadConf(masterAddr, updateClusterConf, updatePathConf);
            }
            catch (AlluxioStatusException e) {
                throw new UnavailableException(String.format("Failed to load configuration from meta master (%s) during reinitialization", masterAddr), e);
            }
            this.closeContext();
            this.initContext(this.getClientContext(), MasterInquireClient.Factory.create(this.getClusterConf()));
            this.mReinitializer.onSuccess();
        }
    }

    public String getId() {
        return this.mId;
    }

    public MasterClientContext getMasterClientContext() {
        return this.mMasterClientContext;
    }

    public ClientContext getClientContext() {
        return this.mMasterClientContext;
    }

    public AlluxioConfiguration getClusterConf() {
        return this.getClientContext().getClusterConf();
    }

    public AlluxioConfiguration getPathConf(AlluxioURI path) {
        return new SpecificPathConfiguration(this.getClientContext().getClusterConf(), this.getClientContext().getPathConf(), path);
    }

    public synchronized InetSocketAddress getMasterAddress() throws UnavailableException {
        return this.mMasterClientContext.getMasterInquireClient().getPrimaryRpcAddress();
    }

    private FileSystemMasterClient acquireMasterClient() {
        try (FileSystemContextReinitializer.ReinitBlockerResource r = this.blockReinit();){
            FileSystemMasterClient fileSystemMasterClient = (FileSystemMasterClient)this.mFileSystemMasterClientPool.acquire();
            return fileSystemMasterClient;
        }
    }

    private void releaseMasterClient(FileSystemMasterClient client) {
        try (FileSystemContextReinitializer.ReinitBlockerResource r = this.blockReinit();){
            if (!client.isClosed()) {
                this.mFileSystemMasterClientPool.release(client);
            }
        }
    }

    public CloseableResource<FileSystemMasterClient> acquireMasterClientResource() {
        return new CloseableResource<FileSystemMasterClient>(this.acquireMasterClient()){

            @Override
            public void close() {
                FileSystemContext.this.releaseMasterClient((FileSystemMasterClient)this.get());
            }
        };
    }

    private BlockMasterClient acquireBlockMasterClient() {
        try (FileSystemContextReinitializer.ReinitBlockerResource r = this.blockReinit();){
            BlockMasterClient blockMasterClient = (BlockMasterClient)this.mBlockMasterClientPool.acquire();
            return blockMasterClient;
        }
    }

    private void releaseBlockMasterClient(BlockMasterClient client) {
        try (FileSystemContextReinitializer.ReinitBlockerResource r = this.blockReinit();){
            if (!client.isClosed()) {
                this.mBlockMasterClientPool.release(client);
            }
        }
    }

    public CloseableResource<BlockMasterClient> acquireBlockMasterClientResource() {
        return new CloseableResource<BlockMasterClient>(this.acquireBlockMasterClient()){

            @Override
            public void close() {
                FileSystemContext.this.releaseBlockMasterClient((BlockMasterClient)this.get());
            }
        };
    }

    public BlockWorkerClient acquireBlockWorkerClient(WorkerNetAddress workerNetAddress) throws IOException {
        try (FileSystemContextReinitializer.ReinitBlockerResource r = this.blockReinit();){
            BlockWorkerClient blockWorkerClient = this.acquireBlockWorkerClientInternal(workerNetAddress, this.getClientContext().getSubject());
            return blockWorkerClient;
        }
    }

    private BlockWorkerClient acquireBlockWorkerClientInternal(WorkerNetAddress workerNetAddress, Subject subject) throws IOException {
        SocketAddress address = NetworkAddressUtils.getDataPortSocketAddress(workerNetAddress, this.getClusterConf());
        GrpcServerAddress serverAddress = new GrpcServerAddress(workerNetAddress.getHost(), address);
        ClientPoolKey key = new ClientPoolKey(address, AuthenticationUserUtils.getImpersonationUser(subject, this.getClusterConf()));
        return (BlockWorkerClient)this.mBlockWorkerClientPool.computeIfAbsent(key, k -> new BlockWorkerClientPool(subject, serverAddress, this.getClusterConf().getInt(PropertyKey.USER_BLOCK_WORKER_CLIENT_POOL_SIZE), this.getClusterConf(), this.mWorkerGroup)).acquire();
    }

    public void releaseBlockWorkerClient(WorkerNetAddress workerNetAddress, BlockWorkerClient client) {
        if (client.isShutdown()) {
            return;
        }
        try (FileSystemContextReinitializer.ReinitBlockerResource r = this.blockReinit();){
            SocketAddress address = NetworkAddressUtils.getDataPortSocketAddress(workerNetAddress, this.getClusterConf());
            ClientPoolKey key = new ClientPoolKey(address, AuthenticationUserUtils.getImpersonationUser(this.getClientContext().getSubject(), this.getClusterConf()));
            if (this.mBlockWorkerClientPool.containsKey(key)) {
                this.mBlockWorkerClientPool.get(key).release(client);
            } else {
                LOG.warn("No client pool for key {}, closing client instead. Context is closed: {}", (Object)key, (Object)this.mClosed.get());
                try {
                    client.close();
                }
                catch (IOException e) {
                    LOG.warn("Error closing block worker client for key {}", (Object)key, (Object)e);
                }
            }
        }
    }

    public synchronized boolean hasLocalWorker() throws IOException {
        if (!this.mLocalWorkerInitialized) {
            this.initializeLocalWorker();
        }
        return this.mLocalWorker != null;
    }

    public synchronized WorkerNetAddress getLocalWorker() throws IOException {
        if (!this.mLocalWorkerInitialized) {
            this.initializeLocalWorker();
        }
        return this.mLocalWorker;
    }

    private void initializeLocalWorker() throws IOException {
        List<WorkerNetAddress> addresses = this.getWorkerAddresses();
        if (!addresses.isEmpty() && addresses.get(0).getHost().equals(NetworkAddressUtils.getClientHostName(this.getClusterConf()))) {
            this.mLocalWorker = addresses.get(0);
        }
        this.mLocalWorkerInitialized = true;
    }

    private List<WorkerNetAddress> getWorkerAddresses() throws IOException {
        List<WorkerInfo> infos;
        BlockMasterClient blockMasterClient = (BlockMasterClient)this.mBlockMasterClientPool.acquire();
        try {
            infos = blockMasterClient.getWorkerInfoList();
        }
        finally {
            this.mBlockMasterClientPool.release(blockMasterClient);
        }
        if (infos.isEmpty()) {
            throw new UnavailableException(ExceptionMessage.NO_WORKER_AVAILABLE.getMessage(new Object[0]));
        }
        ArrayList<WorkerNetAddress> workerNetAddresses = new ArrayList<WorkerNetAddress>();
        ArrayList<WorkerNetAddress> localWorkerNetAddresses = new ArrayList<WorkerNetAddress>();
        String localHostname = NetworkAddressUtils.getClientHostName(this.getClusterConf());
        for (WorkerInfo info : infos) {
            WorkerNetAddress netAddress = info.getAddress();
            if (netAddress.getHost().equals(localHostname)) {
                localWorkerNetAddresses.add(netAddress);
            }
            workerNetAddresses.add(netAddress);
        }
        return localWorkerNetAddresses.isEmpty() ? workerNetAddresses : localWorkerNetAddresses;
    }

    private static final class ClientPoolKey {
        private final SocketAddress mSocketAddress;
        private final String mUsername;

        public ClientPoolKey(SocketAddress socketAddress, String username) {
            this.mSocketAddress = socketAddress;
            this.mUsername = username;
        }

        public int hashCode() {
            return Objects.hashCode(this.mSocketAddress, this.mUsername);
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (!(o instanceof ClientPoolKey)) {
                return false;
            }
            ClientPoolKey that = (ClientPoolKey)o;
            return Objects.equal(this.mSocketAddress, that.mSocketAddress) && Objects.equal(this.mUsername, that.mUsername);
        }

        public String toString() {
            return MoreObjects.toStringHelper(this).add("socketAddress", this.mSocketAddress).add("username", this.mUsername).toString();
        }
    }
}

