/*
 * Decompiled with CFR 0.152.
 */
package alluxio.client.block.policy;

import alluxio.client.block.BlockWorkerInfo;
import alluxio.client.block.policy.BlockLocationPolicy;
import alluxio.client.block.policy.options.GetWorkerOptions;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.shaded.client.com.google.common.annotations.VisibleForTesting;
import alluxio.shaded.client.com.google.common.base.Preconditions;
import alluxio.shaded.client.com.google.common.collect.Streams;
import alluxio.shaded.client.org.apache.commons.codec.digest.MurmurHash3;
import alluxio.wire.WorkerNetAddress;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;

public class CapacityBasedDeterministicHashPolicy
implements BlockLocationPolicy {
    private final int mShards;

    public CapacityBasedDeterministicHashPolicy(AlluxioConfiguration conf) {
        int numShards = conf.getInt(PropertyKey.USER_UFS_BLOCK_READ_LOCATION_POLICY_DETERMINISTIC_HASH_SHARDS);
        Preconditions.checkArgument(numShards >= 1, "number of shards must be no less than 1");
        this.mShards = numShards;
    }

    @Override
    public Optional<WorkerNetAddress> getWorker(GetWorkerOptions options) {
        TreeMap<Long, BlockWorkerInfo> capacityCdf = new TreeMap<Long, BlockWorkerInfo>();
        AtomicLong totalCapacity = new AtomicLong(0L);
        Streams.stream(options.getBlockWorkerInfos()).filter(workerInfo -> workerInfo.getCapacityBytes() >= options.getBlockInfo().getLength()).sorted(Comparator.comparing(w -> w.getNetAddress().getHost())).forEach(workerInfo -> {
            capacityCdf.put(totalCapacity.get(), (BlockWorkerInfo)workerInfo);
            totalCapacity.getAndAdd(workerInfo.getCapacityBytes());
        });
        if (totalCapacity.get() == 0L || capacityCdf.isEmpty()) {
            return Optional.empty();
        }
        long blockId = options.getBlockInfo().getBlockId();
        BlockWorkerInfo chosenWorker = this.pickWorker(capacityCdf, blockId, totalCapacity.get());
        return Optional.of(chosenWorker.getNetAddress());
    }

    private BlockWorkerInfo pickWorker(TreeMap<Long, BlockWorkerInfo> capacityCdf, long blockId, long totalCapacity) {
        if (this.mShards == 1) {
            long startPoint = Math.abs(this.hashBlockId(blockId)) % totalCapacity;
            return capacityCdf.floorEntry(startPoint).getValue();
        }
        long hashedBlockId = blockId;
        ArrayList<BlockWorkerInfo> candidates = new ArrayList<BlockWorkerInfo>();
        for (int i = 1; i <= Math.min(this.mShards, capacityCdf.size()); ++i) {
            hashedBlockId = this.hashBlockId(hashedBlockId);
            BlockWorkerInfo candidate = capacityCdf.floorEntry(Math.abs(hashedBlockId) % totalCapacity).getValue();
            candidates.add(candidate);
        }
        return this.getRandomCandidate(candidates);
    }

    @VisibleForTesting
    protected long hashBlockId(long blockId) {
        return MurmurHash3.hash64(blockId);
    }

    @VisibleForTesting
    protected BlockWorkerInfo getRandomCandidate(List<BlockWorkerInfo> candidates) {
        int randomIndex = ThreadLocalRandom.current().nextInt(candidates.size());
        return candidates.get(randomIndex);
    }
}

