/*
 * Decompiled with CFR 0.152.
 */
package alluxio.stress.cli;

import alluxio.ClientContext;
import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.grpc.LocationBlockIdListEntry;
import alluxio.master.MasterClientContext;
import alluxio.stress.CachingBlockMasterClient;
import alluxio.stress.cli.RpcBench;
import alluxio.stress.cli.RpcBenchPreparationUtils;
import alluxio.stress.rpc.BlockMasterBenchParameters;
import alluxio.stress.rpc.RpcTaskResult;
import alluxio.stress.rpc.TierAlias;
import alluxio.worker.block.BlockMasterClient;
import alluxio.worker.block.BlockMasterSync;
import alluxio.worker.block.BlockStoreLocation;
import com.beust.jcommander.ParametersDelegate;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamRegisterWorkerBench
extends RpcBench<BlockMasterBenchParameters> {
    private static final Logger LOG = LoggerFactory.getLogger(StreamRegisterWorkerBench.class);
    @ParametersDelegate
    private final BlockMasterBenchParameters mParameters = new BlockMasterBenchParameters();
    private List<String> mTierAliases;
    private Map<String, Long> mCapacityMap;
    private Map<String, Long> mUsedMap;
    private Map<BlockStoreLocation, List<Long>> mBlockMap;
    private Deque<Long> mWorkerPool = new ArrayDeque<Long>();

    @Override
    public String getBenchDescription() {
        return String.join((CharSequence)"\n", (Iterable<? extends CharSequence>)ImmutableList.of((Object)"A benchmarking tool for the RegisterWorker streaming RPC.", (Object)"The test will generate a specified number of blocks in the master (without associated files). Then it will trigger the specified number of simulated workers to register at once using the streaming API.", (Object)"Each simulated worker will have the specified number of blocks, in order to incur the controlled stress on the master side.", (Object)"", (Object)"Example:", (Object)"# 2 job workers will be chosen to run the benchmark", (Object)"# Each job worker runs 3 simulated workers", (Object)"# Each simulated worker has 3000 blocks on tier 0 and 10000 on tier 1", (Object)"# Each simulated worker sends the register RPC once", (Object)"$ bin/alluxio runClass alluxio.stress.cli.StreamRegisterWorkerBench --concurrency 3 \\", (Object)"--cluster --cluster-limit 2 --tiers \"1000,1000,1000;5000,5000\"", (Object)"", (Object[])new String[0]));
    }

    @Override
    public void prepare() throws Exception {
        LOG.info("Task ID is {}", (Object)this.mBaseParameters.mId);
        this.mTierAliases = StreamRegisterWorkerBench.getTierAliases(this.mParameters.mTiers);
        this.mCapacityMap = Maps.toMap(this.mTierAliases, tier -> 0x500000000L);
        this.mUsedMap = Maps.toMap(this.mTierAliases, tier -> 0L);
        Map<BlockStoreLocation, List<Long>> blockMap = RpcBenchPreparationUtils.generateBlockIdOnTiers(this.mParameters.mTiers);
        BlockMasterClient client = new BlockMasterClient(MasterClientContext.newBuilder((ClientContext)ClientContext.create()).build());
        this.mBlockMap = blockMap;
        if (!this.mBaseParameters.mDistributed) {
            LOG.info("Preparing blocks at the master");
            RpcBenchPreparationUtils.prepareBlocksInMaster(blockMap);
            LOG.info("Created all blocks at the master");
        }
        int numWorkers = this.mParameters.mConcurrency;
        this.mWorkerPool = RpcBenchPreparationUtils.prepareWorkerIds(client, numWorkers);
        Preconditions.checkState((this.mWorkerPool.size() == numWorkers ? 1 : 0) != 0, (String)"Expecting %s workers but registered %s", (int)numWorkers, (int)this.mWorkerPool.size());
        LOG.info("Prepared worker IDs: {}", this.mWorkerPool);
    }

    private static void debriefBlockListProto(List<LocationBlockIdListEntry> entries) {
        StringBuilder sb = new StringBuilder();
        for (LocationBlockIdListEntry e : entries) {
            sb.append(String.format("%s,", e.getKey()));
        }
        LOG.info("Generated locations: {}", (Object)sb);
    }

    public static void main(String[] args) {
        StreamRegisterWorkerBench.mainInternal(args, new StreamRegisterWorkerBench());
    }

    private RpcTaskResult simulateRegisterWorkerStream(BlockMasterClient client) {
        RpcTaskResult result = new RpcTaskResult();
        long i = 0L;
        if (this.mWorkerPool == null) {
            result.addError("Worker ID pool is null");
            return result;
        }
        if (this.mWorkerPool.isEmpty()) {
            result.addError("No more worker IDs for use");
            return result;
        }
        long workerId = this.mWorkerPool.poll();
        this.runOnce(client, result, i, workerId);
        return result;
    }

    private static List<String> getTierAliases(Map<TierAlias, List<Integer>> tierConfig) {
        LOG.info("Simulate {} tiers with config {}", (Object)tierConfig.size(), tierConfig);
        return tierConfig.keySet().stream().map(TierAlias::toString).collect(Collectors.toList());
    }

    private void runOnce(BlockMasterClient client, RpcTaskResult result, long i, long workerId) {
        try {
            Instant s = Instant.now();
            if (Configuration.getBoolean((PropertyKey)PropertyKey.WORKER_REGISTER_LEASE_ENABLED)) {
                LOG.info("Acquiring lease for {}", (Object)workerId);
                int blockCount = 0;
                for (Map.Entry<BlockStoreLocation, List<Long>> entry : this.mBlockMap.entrySet()) {
                    blockCount += entry.getValue().size();
                }
                client.acquireRegisterLeaseWithBackoff(workerId, blockCount, BlockMasterSync.getDefaultAcquireLeaseRetryPolicy());
                LOG.info("Lease acquired for {}", (Object)workerId);
            }
            client.registerWithStream(workerId, this.mTierAliases, this.mCapacityMap, this.mUsedMap, this.mBlockMap, RpcBenchPreparationUtils.LOST_STORAGE, RpcBenchPreparationUtils.EMPTY_CONFIG);
            Instant e = Instant.now();
            RpcTaskResult.Point p = new RpcTaskResult.Point(Duration.between(s, e).toMillis());
            result.addPoint(p);
            LOG.debug("Iter {} took {}ns", (Object)i, (Object)p.mDurationMs);
        }
        catch (Exception e) {
            LOG.error("Failed to run iter {}", (Object)i, (Object)e);
            result.addError(e.getMessage());
        }
    }

    @Override
    public RpcTaskResult runRPC() throws Exception {
        CachingBlockMasterClient client = new CachingBlockMasterClient(MasterClientContext.newBuilder((ClientContext)ClientContext.create()).build(), this.mBlockMap);
        RpcTaskResult taskResult = this.simulateRegisterWorkerStream((BlockMasterClient)client);
        LOG.info("Received task result {}", (Object)taskResult);
        LOG.info("Run finished");
        return taskResult;
    }

    @Override
    public BlockMasterBenchParameters getParameters() {
        return this.mParameters;
    }
}

