/*
 * Decompiled with CFR 0.152.
 */
package alluxio.client.block;

import alluxio.client.WriteType;
import alluxio.client.block.BlockMasterClient;
import alluxio.client.block.BlockWorkerInfo;
import alluxio.client.block.policy.BlockLocationPolicy;
import alluxio.client.block.policy.options.GetWorkerOptions;
import alluxio.client.block.stream.BlockInStream;
import alluxio.client.block.stream.BlockOutStream;
import alluxio.client.block.util.BlockLocationUtils;
import alluxio.client.file.FileSystemContext;
import alluxio.client.file.options.InStreamOptions;
import alluxio.client.file.options.OutStreamOptions;
import alluxio.collections.Pair;
import alluxio.conf.PropertyKey;
import alluxio.exception.ExceptionMessage;
import alluxio.exception.PreconditionMessage;
import alluxio.exception.status.ResourceExhaustedException;
import alluxio.exception.status.UnavailableException;
import alluxio.network.TieredIdentityFactory;
import alluxio.refresh.RefreshPolicy;
import alluxio.refresh.TimeoutRefresh;
import alluxio.resource.CloseableResource;
import alluxio.shaded.client.com.google.common.annotations.VisibleForTesting;
import alluxio.shaded.client.com.google.common.base.Preconditions;
import alluxio.shaded.client.com.google.common.collect.ImmutableMap;
import alluxio.shaded.client.com.google.common.collect.Lists;
import alluxio.shaded.client.com.google.common.collect.Sets;
import alluxio.shaded.client.javax.annotation.concurrent.ThreadSafe;
import alluxio.util.FormatUtils;
import alluxio.wire.BlockInfo;
import alluxio.wire.BlockLocation;
import alluxio.wire.TieredIdentity;
import alluxio.wire.WorkerNetAddress;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public final class AlluxioBlockStore {
    private static final Logger LOG = LoggerFactory.getLogger(AlluxioBlockStore.class);
    private final FileSystemContext mContext;
    private final TieredIdentity mTieredIdentity;
    private List<BlockWorkerInfo> mWorkerInfoList = null;
    private final RefreshPolicy mWorkerRefreshPolicy;

    public static AlluxioBlockStore create(FileSystemContext context) {
        return new AlluxioBlockStore(context, TieredIdentityFactory.localIdentity(context.getClusterConf()));
    }

    @VisibleForTesting
    AlluxioBlockStore(FileSystemContext context, TieredIdentity tieredIdentity) {
        this.mContext = context;
        this.mTieredIdentity = tieredIdentity;
        this.mWorkerRefreshPolicy = new TimeoutRefresh(this.mContext.getClusterConf().getMs(PropertyKey.USER_WORKER_LIST_REFRESH_INTERVAL));
    }

    public BlockInfo getInfo(long blockId) throws IOException {
        try (CloseableResource<BlockMasterClient> masterClientResource = this.mContext.acquireBlockMasterClientResource();){
            BlockInfo blockInfo = masterClientResource.get().getBlockInfo(blockId);
            return blockInfo;
        }
    }

    public synchronized List<BlockWorkerInfo> getEligibleWorkers() throws IOException {
        if (this.mWorkerInfoList == null || this.mWorkerRefreshPolicy.attempt()) {
            this.mWorkerInfoList = this.getAllWorkers();
        }
        return this.mWorkerInfoList;
    }

    public List<BlockWorkerInfo> getAllWorkers() throws IOException {
        try (CloseableResource<BlockMasterClient> masterClientResource = this.mContext.acquireBlockMasterClientResource();){
            List<BlockWorkerInfo> list = masterClientResource.get().getWorkerInfoList().stream().map(w -> new BlockWorkerInfo(w.getAddress(), w.getCapacityBytes(), w.getUsedBytes())).collect(Collectors.toList());
            return list;
        }
    }

    public BlockInStream getInStream(long blockId, InStreamOptions options) throws IOException {
        return this.getInStream(blockId, options, ImmutableMap.of());
    }

    public BlockInStream getInStream(long blockId, InStreamOptions options, Map<WorkerNetAddress, Long> failedWorkers) throws IOException {
        Set<WorkerNetAddress> workerPool;
        BlockInfo info;
        try (CloseableResource<BlockMasterClient> masterClientResource = this.mContext.acquireBlockMasterClientResource();){
            info = masterClientResource.get().getBlockInfo(blockId);
        }
        List<BlockLocation> locations = info.getLocations();
        List<BlockWorkerInfo> blockWorkerInfo = Collections.EMPTY_LIST;
        if (options.getStatus().isPersisted() || options.getStatus().getPersistenceState().equals("TO_BE_PERSISTED")) {
            blockWorkerInfo = this.getEligibleWorkers();
            if (blockWorkerInfo.isEmpty()) {
                throw new UnavailableException(ExceptionMessage.NO_WORKER_AVAILABLE.getMessage(new Object[0]));
            }
            workerPool = blockWorkerInfo.stream().map(BlockWorkerInfo::getNetAddress).collect(Collectors.toSet());
        } else {
            if (locations.isEmpty()) {
                blockWorkerInfo = this.getEligibleWorkers();
                if (blockWorkerInfo.isEmpty()) {
                    throw new UnavailableException(ExceptionMessage.NO_WORKER_AVAILABLE.getMessage(new Object[0]));
                }
                throw new UnavailableException(ExceptionMessage.BLOCK_UNAVAILABLE.getMessage(info.getBlockId()));
            }
            workerPool = locations.stream().map(BlockLocation::getWorkerAddress).collect(Collectors.toSet());
        }
        Set<WorkerNetAddress> workers = this.handleFailedWorkers(workerPool, failedWorkers);
        BlockInStream.BlockInStreamSource dataSourceType = null;
        WorkerNetAddress dataSource = null;
        locations = locations.stream().filter(location -> workers.contains(location.getWorkerAddress())).collect(Collectors.toList());
        if (!locations.isEmpty()) {
            List<WorkerNetAddress> tieredLocations = locations.stream().map(location -> location.getWorkerAddress()).collect(Collectors.toList());
            Collections.shuffle(tieredLocations);
            Optional<Pair<WorkerNetAddress, Boolean>> nearest = BlockLocationUtils.nearest(this.mTieredIdentity, tieredLocations, this.mContext.getClusterConf());
            if (nearest.isPresent()) {
                dataSource = nearest.get().getFirst();
                BlockInStream.BlockInStreamSource blockInStreamSource = dataSourceType = nearest.get().getSecond() != false ? BlockInStream.BlockInStreamSource.LOCAL : BlockInStream.BlockInStreamSource.REMOTE;
            }
        }
        if (dataSource == null) {
            dataSourceType = BlockInStream.BlockInStreamSource.UFS;
            BlockLocationPolicy policy = Preconditions.checkNotNull(options.getUfsReadLocationPolicy(), (Object)PreconditionMessage.UFS_READ_LOCATION_POLICY_UNSPECIFIED);
            blockWorkerInfo = blockWorkerInfo.stream().filter(workerInfo -> workers.contains(workerInfo.getNetAddress())).collect(Collectors.toList());
            GetWorkerOptions getWorkerOptions = GetWorkerOptions.defaults().setBlockInfo(new BlockInfo().setBlockId(info.getBlockId()).setLength(info.getLength()).setLocations(locations)).setBlockWorkerInfos(blockWorkerInfo);
            dataSource = policy.getWorker(getWorkerOptions);
        }
        if (dataSource == null) {
            throw new UnavailableException(ExceptionMessage.NO_WORKER_AVAILABLE.getMessage(new Object[0]));
        }
        try {
            return BlockInStream.create(this.mContext, info, dataSource, dataSourceType, options);
        }
        catch (UnavailableException e) {
            failedWorkers.put(dataSource, System.currentTimeMillis());
            throw e;
        }
    }

    private Set<WorkerNetAddress> handleFailedWorkers(Set<WorkerNetAddress> workers, Map<WorkerNetAddress, Long> failedWorkers) {
        if (workers.isEmpty()) {
            return Collections.EMPTY_SET;
        }
        Set<WorkerNetAddress> nonFailed = workers.stream().filter(worker -> !failedWorkers.containsKey(worker)).collect(Collectors.toSet());
        if (nonFailed.isEmpty()) {
            return Collections.singleton(workers.stream().min((x, y) -> Long.compare((Long)failedWorkers.get(x), (Long)failedWorkers.get(y))).get());
        }
        return nonFailed;
    }

    public BlockOutStream getOutStream(long blockId, long blockSize, WorkerNetAddress address, OutStreamOptions options) throws IOException {
        if (blockSize == -1L) {
            try (CloseableResource<BlockMasterClient> blockMasterClientResource = this.mContext.acquireBlockMasterClientResource();){
                blockSize = blockMasterClientResource.get().getBlockInfo(blockId).getLength();
            }
        }
        if (address == null) {
            throw new ResourceExhaustedException(ExceptionMessage.NO_SPACE_FOR_BLOCK_ON_WORKER.getMessage(FormatUtils.getSizeFromBytes(blockSize)));
        }
        LOG.debug("Create block outstream for {} of block size {} at address {}, using options: {}", new Object[]{blockId, blockSize, address, options});
        return BlockOutStream.create(this.mContext, blockId, blockSize, address, options);
    }

    public BlockOutStream getOutStream(long blockId, long blockSize, OutStreamOptions options) throws IOException {
        WorkerNetAddress address;
        int initialReplicas;
        BlockLocationPolicy locationPolicy = Preconditions.checkNotNull(options.getLocationPolicy(), (Object)PreconditionMessage.BLOCK_WRITE_LOCATION_POLICY_UNSPECIFIED);
        GetWorkerOptions workerOptions = GetWorkerOptions.defaults().setBlockInfo(new BlockInfo().setBlockId(blockId).setLength(blockSize)).setBlockWorkerInfos(new ArrayList<BlockWorkerInfo>(this.getEligibleWorkers()));
        int n = initialReplicas = options.getWriteType() == WriteType.ASYNC_THROUGH && options.getReplicationDurable() > options.getReplicationMin() ? options.getReplicationDurable() : options.getReplicationMin();
        if (initialReplicas <= 1) {
            WorkerNetAddress address2 = locationPolicy.getWorker(workerOptions);
            if (address2 == null) {
                if (this.getEligibleWorkers().isEmpty()) {
                    throw new UnavailableException(ExceptionMessage.NO_WORKER_AVAILABLE.getMessage(new Object[0]));
                }
                throw new UnavailableException(ExceptionMessage.NO_SPACE_FOR_BLOCK_ON_WORKER.getMessage(blockSize));
            }
            return this.getOutStream(blockId, blockSize, address2, options);
        }
        HashMap<String, HashSet<BlockWorkerInfo>> blockWorkersByHost = new HashMap<String, HashSet<BlockWorkerInfo>>();
        for (BlockWorkerInfo blockWorker : workerOptions.getBlockWorkerInfos()) {
            String hostName = blockWorker.getNetAddress().getHost();
            if (blockWorkersByHost.containsKey(hostName)) {
                ((Set)blockWorkersByHost.get(hostName)).add(blockWorker);
                continue;
            }
            blockWorkersByHost.put(hostName, Sets.newHashSet(blockWorker));
        }
        ArrayList<WorkerNetAddress> workerAddressList = new ArrayList<WorkerNetAddress>();
        ArrayList<BlockWorkerInfo> updatedInfos = Lists.newArrayList(workerOptions.getBlockWorkerInfos());
        for (int i = 0; i < initialReplicas && (address = locationPolicy.getWorker(workerOptions)) != null; ++i) {
            workerAddressList.add(address);
            updatedInfos.removeAll((Collection)blockWorkersByHost.get(address.getHost()));
            workerOptions.setBlockWorkerInfos(updatedInfos);
        }
        if (workerAddressList.size() < initialReplicas) {
            throw new ResourceExhaustedException(String.format("Not enough workers for replications, %d workers selected but %d required", workerAddressList.size(), initialReplicas));
        }
        return BlockOutStream.createReplicatedBlockOutStream(this.mContext, blockId, blockSize, workerAddressList, options);
    }

    public long getCapacityBytes() throws IOException {
        try (CloseableResource<BlockMasterClient> blockMasterClientResource = this.mContext.acquireBlockMasterClientResource();){
            long l = blockMasterClientResource.get().getCapacityBytes();
            return l;
        }
    }

    public long getUsedBytes() throws IOException {
        try (CloseableResource<BlockMasterClient> blockMasterClientResource = this.mContext.acquireBlockMasterClientResource();){
            long l = blockMasterClientResource.get().getUsedBytes();
            return l;
        }
    }
}

