/*
 * Decompiled with CFR 0.152.
 */
package alluxio.stress.cli;

import alluxio.ClientContext;
import alluxio.grpc.Command;
import alluxio.grpc.LocationBlockIdListEntry;
import alluxio.grpc.Metric;
import alluxio.master.MasterClientContext;
import alluxio.stress.CachingBlockMasterClient;
import alluxio.stress.cli.RpcBench;
import alluxio.stress.cli.RpcBenchPreparationUtils;
import alluxio.stress.rpc.BlockMasterBenchParameters;
import alluxio.stress.rpc.RpcTaskResult;
import alluxio.util.FormatUtils;
import alluxio.worker.block.BlockMasterClient;
import alluxio.worker.block.BlockStoreLocation;
import com.beust.jcommander.ParametersDelegate;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WorkerHeartbeatBench
extends RpcBench<BlockMasterBenchParameters> {
    private static final Logger LOG = LoggerFactory.getLogger(WorkerHeartbeatBench.class);
    private static final List<Metric> EMPTY_METRICS = ImmutableList.of();
    private static final List<Long> EMPTY_REMOVED_BLOCKS = ImmutableList.of();
    @ParametersDelegate
    private final BlockMasterBenchParameters mParameters = new BlockMasterBenchParameters();
    private Deque<Long> mWorkerPool = new ArrayDeque<Long>();
    private List<LocationBlockIdListEntry> mLocationBlockIdList;

    @Override
    public RpcTaskResult runRPC() {
        RpcTaskResult result = new RpcTaskResult();
        if (this.mWorkerPool == null) {
            result.addError("Worker ID pool is null");
            return result;
        }
        if (this.mWorkerPool.isEmpty()) {
            result.addError("No more worker IDs for use");
            return result;
        }
        long workerId = this.mWorkerPool.poll();
        LOG.info("Acquired worker ID {}", (Object)workerId);
        CachingBlockMasterClient client = new CachingBlockMasterClient(MasterClientContext.newBuilder((ClientContext)ClientContext.create()).build(), this.mLocationBlockIdList);
        long durationMs = FormatUtils.parseTimeSize((String)this.mParameters.mDuration);
        Instant startTime = Instant.now();
        Instant endTime = startTime.plus(durationMs, ChronoUnit.MILLIS);
        LOG.info("Test start time {}, end time {}", (Object)startTime, (Object)endTime);
        RpcTaskResult taskResult = this.simulateBlockHeartbeat((BlockMasterClient)client, workerId, endTime);
        LOG.info("Test finished with results: {}", (Object)taskResult);
        return taskResult;
    }

    @Override
    public BlockMasterBenchParameters getParameters() {
        return this.mParameters;
    }

    private RpcTaskResult simulateBlockHeartbeat(BlockMasterClient client, long workerId, Instant endTime) {
        RpcTaskResult result = new RpcTaskResult();
        long i = 0L;
        while (Instant.now().isBefore(endTime)) {
            Instant s = Instant.now();
            try {
                Command cmd = client.heartbeat(workerId, RpcBenchPreparationUtils.CAPACITY_MEM, RpcBenchPreparationUtils.USED_MEM_EMPTY, EMPTY_REMOVED_BLOCKS, (Map)ImmutableMap.of(), RpcBenchPreparationUtils.LOST_STORAGE, EMPTY_METRICS);
                LOG.debug("Received command from heartbeat {}", (Object)cmd);
                Instant e = Instant.now();
                Duration d = Duration.between(s, e);
                RpcTaskResult.Point p = new RpcTaskResult.Point(d.toMillis());
                LOG.debug("Iter {} took {}ms", (Object)i, (Object)p.mDurationMs);
                result.addPoint(p);
            }
            catch (Exception e) {
                LOG.error("Failed to run blockHeartbeat {}", (Object)i, (Object)e);
                result.addError(e.getMessage());
            }
        }
        return result;
    }

    @Override
    public String getBenchDescription() {
        return String.join((CharSequence)"\n", (Iterable<? extends CharSequence>)ImmutableList.of((Object)"A benchmarking tool for the WorkerHeartbeat RPC.", (Object)"The test will generate a specified number of blocks in the master (without associated files). The test will also register the simulated workers with the master. Then it will keep generating heartbeats with the specified load and sending heartbeats to the master nonstop, until the specified time has elapsed.", (Object)"", (Object)"Example:", (Object)"# 2 job workers will be chosen to run the benchmark", (Object)"# Each job worker runs 3 threads each simulating one worker", (Object)"# Each worker will have 3000 blocks on tier 0 and 10000 blocks on tier 1", (Object)"# Keep sending heartbeats for 30s", (Object)"$ bin/alluxio runClass alluxio.stress.cli.WorkerHeartbeatBench --concurrency 3 \\", (Object)"--cluster --cluster-limit 2 --tiers \"1000,1000,1000;5000,5000\" --duration 30s", (Object)""));
    }

    @Override
    public void prepare() throws Exception {
        LOG.info("Task ID is {}", (Object)this.mBaseParameters.mId);
        Map<BlockStoreLocation, List<Long>> blockMap = RpcBenchPreparationUtils.generateBlockIdOnTiers(this.mParameters.mTiers);
        BlockMasterClient client = new BlockMasterClient(MasterClientContext.newBuilder((ClientContext)ClientContext.create()).build());
        this.mLocationBlockIdList = client.convertBlockListMapToProto(blockMap);
        if (!this.mBaseParameters.mDistributed) {
            LOG.info("Preparing block IDs at the master");
            RpcBenchPreparationUtils.prepareBlocksInMaster(blockMap);
            LOG.info("Created all blocks at the master");
        }
        int numWorkers = this.mParameters.mConcurrency;
        LOG.info("Register {} simulated workers for the test", (Object)numWorkers);
        this.mWorkerPool = RpcBenchPreparationUtils.prepareWorkerIds(client, numWorkers);
        Preconditions.checkState((this.mWorkerPool.size() == numWorkers ? 1 : 0) != 0, (String)"Expecting %s workers but registered %s", (int)numWorkers, (int)this.mWorkerPool.size());
        RpcBenchPreparationUtils.registerWorkers(client, this.mWorkerPool);
        LOG.info("All workers registered with the master {}", this.mWorkerPool);
    }

    public static void main(String[] args) {
        WorkerHeartbeatBench.mainInternal(args, new WorkerHeartbeatBench());
    }
}

