/*
 * Decompiled with CFR 0.152.
 */
package alluxio.stress.cli.worker;

import alluxio.annotation.SuppressFBWarnings;
import alluxio.client.block.BlockWorkerInfo;
import alluxio.client.file.FileSystemContext;
import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.grpc.WritePType;
import alluxio.stress.cli.AbstractStressBench;
import alluxio.stress.worker.WorkerBenchDataPoint;
import alluxio.stress.worker.WorkerBenchParameters;
import alluxio.stress.worker.WorkerBenchTaskResult;
import alluxio.util.CommonUtils;
import alluxio.util.FormatUtils;
import alluxio.util.executor.ExecutorServiceFactories;
import alluxio.util.logging.SamplingLogger;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressFBWarnings(value={"BC_UNCONFIRMED_CAST"}, justification="There is a downcast from FileSystemParameters to WorkerBenchParameters in the generic")
public class StressWorkerBench
extends AbstractStressBench<WorkerBenchTaskResult, WorkerBenchParameters> {
    private static final Logger LOG = LoggerFactory.getLogger(StressWorkerBench.class);
    private static final Logger SAMPLING_LOG = new SamplingLogger(LoggerFactory.getLogger(StressWorkerBench.class), 10000L);
    private static final long DUMMY_BLOCK_SIZE = 0x4000000L;
    private FileSystem[] mCachedFs;
    private Path[] mFilePaths;
    private Integer[] mOffsets;
    private Integer[] mLengths;
    private FileSystemContext mFsContext;

    private Integer randomNumInRange(Random rand, int min, int max) {
        return rand.nextInt(max - min + 1) + min;
    }

    public StressWorkerBench() {
        this.mParameters = new WorkerBenchParameters();
        this.mFsContext = FileSystemContext.create();
    }

    private int getTotalFileNumber() {
        int clusterSize = this.mBaseParameters.mClusterLimit;
        int threads = ((WorkerBenchParameters)this.mParameters).mThreads;
        int numFiles = clusterSize * threads;
        LOG.info("Total {} * {} = {} files will be generated", new Object[]{clusterSize, threads, numFiles});
        return numFiles;
    }

    private Path calculateFilePath(Path base, int workerIdx, int threadIdx) {
        return new Path(base, "worker-" + workerIdx + "-thread-" + threadIdx);
    }

    public static void main(String[] args) {
        StressWorkerBench.mainInternal(args, new StressWorkerBench());
    }

    @Override
    public String getBenchDescription() {
        return String.join((CharSequence)"\n", (Iterable<? extends CharSequence>)ImmutableList.of((Object)"A benchmarking tool to measure the read performance of alluxio workers in the cluster", (Object)"The test will run with multiple threads and perform concurrent I/O. One file will ", (Object)"be prepared for each thread that thread will read that one file repeatedly until ", (Object)"the specified duration has elapsed.", (Object)"", (Object)"Example:", (Object)"# The command below spawn 32 test threads per worker in your cluster. One 100MB file willbe prepared for each test thread.# The threads will keeping reading for 30s including a 10s warmup.# So the result captures I/O performance from the last 20s.", (Object)"$ bin/alluxio runClass alluxio.stress.cli.worker.StressWorkerBench \\\n--threads 32 --base alluxio:///stress-worker-base --file-size 100m \\\n--warmup 10s --duration 30s --cluster\n"));
    }

    @Override
    public void prepare() throws Exception {
        org.apache.hadoop.conf.Configuration hdfsConf;
        this.validateParams();
        if (this.mBaseParameters.mClusterLimit == 0) {
            this.mBaseParameters.mClusterLimit = this.mFsContext.getCachedWorkers().size();
            LOG.info("No --cluster-limit was set, use all {} workers in the cluster", (Object)this.mBaseParameters.mClusterLimit);
        }
        if (this.mBaseParameters.mStartMs == -1L) {
            LOG.info("Start time is unspecified, leaving 5s for preparation");
            this.mBaseParameters.mStartMs = CommonUtils.getCurrentMs() + 5000L;
        }
        Path basePath = new Path(((WorkerBenchParameters)this.mParameters).mBasePath);
        int fileSize = (int)FormatUtils.parseSpaceSize((String)((WorkerBenchParameters)this.mParameters).mFileSize);
        int numFiles = this.getTotalFileNumber();
        this.mFilePaths = new Path[numFiles];
        this.mLengths = new Integer[numFiles];
        this.mOffsets = new Integer[numFiles];
        this.generateTestFilePaths(basePath);
        if (this.mBaseParameters.mDistributed) {
            LOG.info("Running in distributed mode on a job worker. The test file should have been prepared in the commandline process before distributing the tasks.");
        } else if (((WorkerBenchParameters)this.mParameters).mSkipCreation) {
            LOG.info("Test file preparation is skipped");
        } else {
            LOG.info("Preparing the test files in the command line");
            hdfsConf = new org.apache.hadoop.conf.Configuration();
            hdfsConf.set("alluxio.user.file.delete.unchecked", "true");
            hdfsConf.set("alluxio.user.file.writetype.default", ((WorkerBenchParameters)this.mParameters).mWriteType);
            FileSystem prepareFs = FileSystem.get((URI)new URI(((WorkerBenchParameters)this.mParameters).mBasePath), (org.apache.hadoop.conf.Configuration)hdfsConf);
            this.prepareTestFiles(basePath, fileSize, prepareFs);
        }
        hdfsConf = new org.apache.hadoop.conf.Configuration();
        hdfsConf.set(String.format("fs.%s.impl.disable.cache", new URI(((WorkerBenchParameters)this.mParameters).mBasePath).getScheme()), "true");
        hdfsConf.set("alluxio.user.worker.selection.policy", "alluxio.client.file.dora.LocalWorkerPolicy");
        for (Map.Entry entry : ((WorkerBenchParameters)this.mParameters).mConf.entrySet()) {
            hdfsConf.set((String)entry.getKey(), (String)entry.getValue());
        }
        LOG.info("HDFS config used in the test: {}", (Object)hdfsConf);
        this.mCachedFs = new FileSystem[((WorkerBenchParameters)this.mParameters).mClients];
        for (int i = 0; i < this.mCachedFs.length; ++i) {
            this.mCachedFs[i] = FileSystem.get((URI)new URI(((WorkerBenchParameters)this.mParameters).mBasePath), (org.apache.hadoop.conf.Configuration)hdfsConf);
        }
    }

    public void generateTestFilePaths(Path basePath) throws IOException {
        int fileSize = (int)FormatUtils.parseSpaceSize((String)((WorkerBenchParameters)this.mParameters).mFileSize);
        int clusterSize = this.mBaseParameters.mClusterLimit;
        int threads = ((WorkerBenchParameters)this.mParameters).mThreads;
        List workers = this.mFsContext.getCachedWorkers();
        Random rand = new Random();
        if (((WorkerBenchParameters)this.mParameters).mIsRandom) {
            rand = new Random(((WorkerBenchParameters)this.mParameters).mRandomSeed.intValue());
        }
        for (int i = 0; i < clusterSize; ++i) {
            BlockWorkerInfo localWorker = (BlockWorkerInfo)workers.get(i);
            LOG.info("Building file paths for worker {}", (Object)localWorker);
            for (int j = 0; j < threads; ++j) {
                Path filePath = this.calculateFilePath(basePath, i, j);
                int index = i * threads + j;
                this.mFilePaths[index] = filePath;
                if (((WorkerBenchParameters)this.mParameters).mIsRandom) {
                    int randomMin = (int)FormatUtils.parseSpaceSize((String)((WorkerBenchParameters)this.mParameters).mRandomMinReadLength);
                    int randomMax = (int)FormatUtils.parseSpaceSize((String)((WorkerBenchParameters)this.mParameters).mRandomMaxReadLength);
                    this.mOffsets[index] = this.randomNumInRange(rand, 0, fileSize - 1 - randomMin);
                    this.mLengths[index] = this.randomNumInRange(rand, randomMin, Integer.min(fileSize - this.mOffsets[i], randomMax));
                    continue;
                }
                this.mOffsets[index] = 0;
                this.mLengths[index] = fileSize;
            }
        }
        LOG.info("{} file paths generated", (Object)this.mFilePaths.length);
    }

    private void prepareTestFiles(Path basePath, int fileSize, FileSystem prepareFs) throws IOException {
        int numFiles = this.mFilePaths.length;
        LOG.info("Preparing {} test files under {}", (Object)numFiles, (Object)basePath);
        if (prepareFs.exists(basePath)) {
            LOG.info("The base path exists, delete it first.");
            prepareFs.delete(basePath, true);
        }
        LOG.info("Creating the new base path directory");
        prepareFs.mkdirs(basePath);
        LOG.info("Empty base path directory created");
        byte[] buffer = new byte[(int)FormatUtils.parseSpaceSize((String)((WorkerBenchParameters)this.mParameters).mBufferSize)];
        Arrays.fill(buffer, (byte)65);
        LOG.info("Creating {} files...", (Object)numFiles);
        block9: for (int i = 0; i < numFiles; ++i) {
            if (i > 0 && i % 1000 == 0) {
                LOG.info("{} files created", (Object)i);
            }
            Path filePath = this.mFilePaths[i];
            LOG.info("Creating file {}", (Object)filePath);
            try (FSDataOutputStream mOutStream = prepareFs.create(filePath, false, buffer.length, (short)1, 0x4000000L);){
                while (true) {
                    int bytesToWrite;
                    if ((bytesToWrite = (int)Math.min((long)fileSize - mOutStream.getPos(), (long)buffer.length)) == 0) {
                        continue block9;
                    }
                    mOutStream.write(buffer, 0, bytesToWrite);
                }
            }
        }
        LOG.info("All test files created");
    }

    @Override
    public WorkerBenchTaskResult runLocal() throws Exception {
        Preconditions.checkArgument((this.mBaseParameters.mStartMs >= 0L ? 1 : 0) != 0, (Object)"startMs was not specified correctly!");
        Preconditions.checkArgument((this.mBaseParameters.mClusterLimit > 0 ? 1 : 0) != 0, (Object)"clusterLimit was not specified correctly!");
        LOG.info("Worker ID is {}, index is {}", (Object)this.mBaseParameters.mId, (Object)this.mBaseParameters.mIndex);
        LOG.info("This test will use {} workers in the cluster", (Object)this.mBaseParameters.mClusterLimit);
        int startFileIndex = 0;
        int endFileIndex = this.getTotalFileNumber();
        if (this.mBaseParameters.mIndex.equals("local-task-0")) {
            LOG.info("This is running in the command line process. Read all {} files with {} threads.", (Object)endFileIndex, (Object)((WorkerBenchParameters)this.mParameters).mThreads);
        } else {
            LOG.info("This job worker has index {} among {} workers", (Object)this.mBaseParameters.mIndex, (Object)this.mBaseParameters.mClusterLimit);
            int threadNum = ((WorkerBenchParameters)this.mParameters).mThreads;
            int workerIndex = Integer.parseInt(this.mBaseParameters.mIndex);
            startFileIndex = workerIndex * threadNum;
            endFileIndex = startFileIndex + threadNum;
            LOG.info("This job worker threads read files [{}, {})", (Object)startFileIndex, (Object)endFileIndex);
        }
        ExecutorService service = ExecutorServiceFactories.fixedThreadPool((String)"bench-thread", (int)((WorkerBenchParameters)this.mParameters).mThreads).create();
        long durationMs = FormatUtils.parseTimeSize((String)((WorkerBenchParameters)this.mParameters).mDuration);
        long warmupMs = FormatUtils.parseTimeSize((String)((WorkerBenchParameters)this.mParameters).mWarmup);
        long startMs = this.mBaseParameters.mStartMs;
        long endMs = startMs + warmupMs + durationMs;
        String datePattern = Configuration.global().getString(PropertyKey.USER_DATE_FORMAT_PATTERN);
        SAMPLING_LOG.info("StressWorkerBench has start={}, warmup={}ms, end={}", new Object[]{CommonUtils.convertMsToDate((long)startMs, (String)datePattern), warmupMs, CommonUtils.convertMsToDate((long)endMs, (String)datePattern)});
        BenchContext context = new BenchContext(startMs, endMs);
        ArrayList<BenchThread> callables = new ArrayList<BenchThread>(((WorkerBenchParameters)this.mParameters).mThreads);
        for (int threadIndex = 0; threadIndex < ((WorkerBenchParameters)this.mParameters).mThreads; ++threadIndex) {
            int fileIndex = startFileIndex + threadIndex;
            LOG.info("Thread {} reads file {} path {}", new Object[]{threadIndex, fileIndex, this.mFilePaths[fileIndex]});
            callables.add(new BenchThread(context, fileIndex, this.mCachedFs[threadIndex % this.mCachedFs.length]));
        }
        service.invokeAll(callables, FormatUtils.parseTimeSize((String)this.mBaseParameters.mBenchTimeout), TimeUnit.MILLISECONDS);
        service.shutdownNow();
        service.awaitTermination(30L, TimeUnit.SECONDS);
        return context.getResult();
    }

    @Override
    public void validateParams() throws Exception {
        List workers = this.mFsContext.getCachedWorkers();
        LOG.info("Available workers in the cluster are {}", (Object)workers);
        if (this.mBaseParameters.mClusterLimit < 0) {
            throw new IllegalStateException("--cluster-limit cannot be " + this.mBaseParameters.mClusterLimit + " in StressWorkerBench. It should be a positive number. 0 means running on all workers in the cluster.");
        }
        if (this.mBaseParameters.mClusterLimit > workers.size()) {
            throw new IllegalStateException(String.format("Specified --cluster-limit %d but only have %d workers in the cluster!", this.mBaseParameters.mClusterLimit, workers.size()));
        }
        if (((WorkerBenchParameters)this.mParameters).mThreads <= 0) {
            throw new IllegalStateException("Thread number cannot be " + ((WorkerBenchParameters)this.mParameters).mThreads + " in StressWorkerBench. It should be a positive number.");
        }
        if (((WorkerBenchParameters)this.mParameters).mFree && WritePType.MUST_CACHE.name().equals(((WorkerBenchParameters)this.mParameters).mWriteType)) {
            throw new IllegalStateException(String.format("%s cannot be %s when %s option provided", "--write-type", WritePType.MUST_CACHE, "--free"));
        }
    }

    private final class BenchThread
    implements Callable<Void> {
        private final BenchContext mContext;
        private final int mTargetFileIndex;
        private final FileSystem mFs;
        private final byte[] mBuffer;
        private final WorkerBenchTaskResult mResult;
        private final boolean mIsRandomRead;
        private FSDataInputStream mInStream;

        private BenchThread(BenchContext context, int targetFileIndex, FileSystem fs) {
            this.mContext = context;
            this.mTargetFileIndex = targetFileIndex;
            this.mFs = fs;
            this.mBuffer = new byte[(int)FormatUtils.parseSpaceSize((String)((WorkerBenchParameters)((StressWorkerBench)StressWorkerBench.this).mParameters).mBufferSize)];
            this.mResult = new WorkerBenchTaskResult();
            this.mResult.setParameters((WorkerBenchParameters)StressWorkerBench.this.mParameters);
            this.mResult.setBaseParameters(StressWorkerBench.this.mBaseParameters);
            this.mIsRandomRead = ((WorkerBenchParameters)((StressWorkerBench)StressWorkerBench.this).mParameters).mIsRandom;
        }

        @Override
        public Void call() {
            try {
                this.runInternal();
            }
            catch (Exception e) {
                LOG.error(Thread.currentThread().getName() + ": failed", (Throwable)e);
                this.mResult.addErrorMessage(e.getMessage());
            }
            finally {
                this.closeInStream();
            }
            this.mResult.setEndMs(CommonUtils.getCurrentMs());
            this.mContext.mergeThreadResult(this.mResult);
            return null;
        }

        private void runInternal() throws Exception {
            long recordMs = this.mContext.getStartMs() + FormatUtils.parseTimeSize((String)((WorkerBenchParameters)((StressWorkerBench)StressWorkerBench.this).mParameters).mWarmup);
            this.mResult.setRecordStartMs(recordMs);
            long waitMs = this.mContext.getStartMs() - CommonUtils.getCurrentMs();
            if (waitMs < 0L) {
                throw new IllegalStateException(String.format("Thread missed barrier. Increase the start delay. start: %d current: %d", this.mContext.getStartMs(), CommonUtils.getCurrentMs()));
            }
            String dateFormat = Configuration.global().getString(PropertyKey.USER_DATE_FORMAT_PATTERN);
            SAMPLING_LOG.info("Scheduled to start at {}, wait {}ms for the scheduled start", (Object)CommonUtils.convertMsToDate((long)this.mContext.getStartMs(), (String)dateFormat), (Object)waitMs);
            CommonUtils.sleepMs((long)waitMs);
            SAMPLING_LOG.info("Test started and recording will be started after the warm up at {}", (Object)CommonUtils.convertMsToDate((long)recordMs, (String)dateFormat));
            while (!Thread.currentThread().isInterrupted() && CommonUtils.getCurrentMs() < this.mContext.getEndMs()) {
                WorkerBenchDataPoint dataPoint = this.applyOperation();
                long currentMs = CommonUtils.getCurrentMs();
                if (currentMs > recordMs) {
                    this.mResult.addDataPoint(dataPoint);
                    if (dataPoint.getIOBytes() > 0L) {
                        this.mResult.incrementIOBytes(dataPoint.getIOBytes());
                        continue;
                    }
                    LOG.warn("Thread for file {} read 0 bytes from I/O", (Object)StressWorkerBench.this.mFilePaths[this.mTargetFileIndex]);
                    continue;
                }
                SAMPLING_LOG.info("Ignored data point during warmup: {}", (Object)dataPoint);
            }
        }

        private WorkerBenchDataPoint applyOperation() throws IOException {
            int actualReadLength;
            Path filePath = StressWorkerBench.this.mFilePaths[this.mTargetFileIndex];
            int offset = StressWorkerBench.this.mOffsets[this.mTargetFileIndex];
            int length = StressWorkerBench.this.mLengths[this.mTargetFileIndex];
            long startOperation = CommonUtils.getCurrentMs();
            if (this.mInStream == null) {
                this.mInStream = this.mFs.open(filePath);
            }
            int bytesRead = 0;
            if (this.mIsRandomRead) {
                while (length > 0) {
                    actualReadLength = this.mInStream.read((long)offset, this.mBuffer, 0, this.mBuffer.length);
                    if (actualReadLength < 0) {
                        this.closeInStream();
                        break;
                    }
                    bytesRead += actualReadLength;
                    length -= actualReadLength;
                    offset += actualReadLength;
                }
                this.closeInStream();
            } else {
                while (true) {
                    if ((actualReadLength = this.mInStream.read(this.mBuffer)) < 0) {
                        this.closeInStream();
                        this.mInStream = this.mFs.open(filePath);
                        break;
                    }
                    bytesRead += actualReadLength;
                }
            }
            long endOperation = CommonUtils.getCurrentMs();
            return new WorkerBenchDataPoint(((StressWorkerBench)StressWorkerBench.this).mBaseParameters.mIndex, Thread.currentThread().getId(), startOperation, endOperation - startOperation, (long)bytesRead);
        }

        private void closeInStream() {
            try {
                if (this.mInStream != null) {
                    this.mInStream.close();
                }
            }
            catch (IOException e) {
                this.mResult.addErrorMessage(e.getMessage());
            }
            finally {
                this.mInStream = null;
            }
        }
    }

    private static final class BenchContext {
        private final long mStartMs;
        private final long mEndMs;
        private WorkerBenchTaskResult mResult;

        public BenchContext(long startMs, long endMs) {
            this.mStartMs = startMs;
            this.mEndMs = endMs;
        }

        public long getStartMs() {
            return this.mStartMs;
        }

        public long getEndMs() {
            return this.mEndMs;
        }

        public synchronized void mergeThreadResult(WorkerBenchTaskResult threadResult) {
            if (this.mResult == null) {
                this.mResult = threadResult;
                return;
            }
            try {
                this.mResult.merge(threadResult);
            }
            catch (Exception e) {
                this.mResult.addErrorMessage(e.getMessage());
            }
        }

        synchronized WorkerBenchTaskResult getResult() {
            return this.mResult;
        }
    }
}

