/*
 * Decompiled with CFR 0.152.
 */
package alluxio.stress.cli;

import alluxio.AlluxioURI;
import alluxio.ClientContext;
import alluxio.annotation.SuppressFBWarnings;
import alluxio.cli.fs.command.DistributedLoadCommand;
import alluxio.client.file.FileOutStream;
import alluxio.client.file.FileSystem;
import alluxio.client.file.FileSystemContext;
import alluxio.client.job.JobMasterClient;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.InstancedConfiguration;
import alluxio.exception.AlluxioException;
import alluxio.grpc.CreateFilePOptions;
import alluxio.grpc.DeletePOptions;
import alluxio.grpc.WritePType;
import alluxio.job.JobConfig;
import alluxio.job.plan.NoopPlanConfig;
import alluxio.job.wire.JobInfo;
import alluxio.job.wire.Status;
import alluxio.stress.cli.Benchmark;
import alluxio.stress.jobservice.JobServiceBenchParameters;
import alluxio.stress.jobservice.JobServiceBenchTaskResult;
import alluxio.stress.jobservice.JobServiceBenchTaskResultStatistics;
import alluxio.util.CommonUtils;
import alluxio.util.ConfigurationUtils;
import alluxio.util.FormatUtils;
import alluxio.util.WaitForOptions;
import alluxio.util.executor.ExecutorServiceFactories;
import alluxio.worker.job.JobMasterClientContext;
import com.beust.jcommander.ParametersDelegate;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.RateLimiter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.HdrHistogram.Histogram;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StressJobServiceBench
extends Benchmark<JobServiceBenchTaskResult> {
    private static final Logger LOG = LoggerFactory.getLogger(StressJobServiceBench.class);
    public static final int MAX_RESPONSE_TIME_BUCKET_INDEX = 0;
    @ParametersDelegate
    private JobServiceBenchParameters mParameters = new JobServiceBenchParameters();
    private FileSystemContext mFsContext;
    private JobMasterClient mJobMasterClient;

    public static void main(String[] args) {
        StressJobServiceBench.mainInternal(args, new StressJobServiceBench());
    }

    @Override
    public void prepare() throws Exception {
        this.mFsContext = FileSystemContext.create((AlluxioConfiguration)new InstancedConfiguration(ConfigurationUtils.defaults()));
        ClientContext clientContext = this.mFsContext.getClientContext();
        this.mJobMasterClient = JobMasterClient.Factory.create((JobMasterClientContext)JobMasterClientContext.newBuilder((ClientContext)clientContext).build());
    }

    @Override
    public String getBenchDescription() {
        return String.join((CharSequence)"\n", (Iterable<? extends CharSequence>)ImmutableList.of((Object)"A benchmarking tool for the job service.", (Object)"This test will measure the different aspects of job service performance with different operations.", (Object)"", (Object)"Example:", (Object)"# This invokes the DistributedLoad jobs to job master", (Object)"# 256 requests would be sent concurrently to job master", (Object)"# Each request contains 1000 files with file size 1k", (Object)"$ bin/alluxio runClass alluxio.stress.cli.StressJobServiceBench --file-size 1k \\--files-per-dir 1000 --threads 256 --operation DistributedLoad --cluster", (Object)""));
    }

    @Override
    public JobServiceBenchTaskResult runLocal() throws Exception {
        ExecutorService service = ExecutorServiceFactories.fixedThreadPool((String)"bench-thread", (int)this.mParameters.mThreads).create();
        long timeOutMs = FormatUtils.parseTimeSize((String)this.mBaseParameters.mBenchTimeout);
        long durationMs = FormatUtils.parseTimeSize((String)this.mParameters.mDuration);
        long warmupMs = FormatUtils.parseTimeSize((String)this.mParameters.mWarmup);
        long startMs = this.mBaseParameters.mStartMs;
        if (this.mBaseParameters.mStartMs == -1L) {
            startMs = CommonUtils.getCurrentMs() + 1000L;
        }
        long endMs = startMs + warmupMs + durationMs;
        RateLimiter rateLimiter = RateLimiter.create((double)this.mParameters.mTargetThroughput);
        BenchContext context = new BenchContext(rateLimiter, startMs, endMs);
        ArrayList<BenchThread> callables = new ArrayList<BenchThread>(this.mParameters.mThreads);
        for (int dirId = 0; dirId < this.mParameters.mThreads; ++dirId) {
            String filePath = String.format("%s/%s/%d", this.mParameters.mBasePath, this.mBaseParameters.mId, dirId);
            callables.add(new BenchThread(context, filePath));
        }
        service.invokeAll(callables, timeOutMs, TimeUnit.MILLISECONDS);
        service.shutdownNow();
        service.awaitTermination(30L, TimeUnit.SECONDS);
        if (!this.mBaseParameters.mProfileAgent.isEmpty()) {
            context.addAdditionalResult();
        }
        return context.getResult();
    }

    private void deletePath(FileSystem fs, String dirPath) throws IOException, AlluxioException {
        AlluxioURI path = new AlluxioURI(dirPath);
        if (fs.exists(path)) {
            DeletePOptions options = DeletePOptions.newBuilder().setRecursive(true).build();
            fs.delete(path, options);
        }
    }

    private void runNoop() throws IOException, InterruptedException, TimeoutException {
        long jobId = this.mJobMasterClient.run((JobConfig)new NoopPlanConfig());
        ImmutableSet statuses = ImmutableSet.of((Object)Status.COMPLETED, (Object)Status.CANCELED, (Object)Status.FAILED);
        AtomicReference singleton = new AtomicReference();
        CommonUtils.waitFor((String)String.format("job %d to be one of status %s", jobId, Arrays.toString(statuses.toArray())), () -> {
            try {
                JobInfo info = this.mJobMasterClient.getJobStatus(jobId);
                if (statuses.contains((Object)info.getStatus())) {
                    singleton.set(info);
                }
                return statuses.contains((Object)info.getStatus());
            }
            catch (IOException e) {
                throw Throwables.propagate((Throwable)e);
            }
        }, (WaitForOptions)WaitForOptions.defaults().setTimeoutMs(30000));
        JobInfo jobInfo = (JobInfo)singleton.get();
        if (jobInfo.getStatus().equals((Object)Status.FAILED)) {
            throw new IOException(jobInfo.getErrorMessage());
        }
    }

    private final class BenchThread
    implements Callable<Void> {
        private final BenchContext mContext;
        private final Histogram mResponseTimeNs;
        private final String mPath;
        private final JobServiceBenchTaskResult mResult = new JobServiceBenchTaskResult();

        private BenchThread(BenchContext context, String path) {
            this.mContext = context;
            this.mResponseTimeNs = new Histogram(1800000000000L, 3);
            this.mPath = path;
        }

        @Override
        public Void call() {
            try {
                this.runInternal();
            }
            catch (Exception e) {
                this.mResult.addErrorMessage(e.toString());
            }
            this.mResult.setEndMs(CommonUtils.getCurrentMs());
            this.mResult.getStatistics().encodeResponseTimeNsRaw(this.mResponseTimeNs);
            this.mResult.setParameters(StressJobServiceBench.this.mParameters);
            this.mResult.setBaseParameters(StressJobServiceBench.this.mBaseParameters);
            this.mContext.mergeThreadResult(this.mResult);
            return null;
        }

        private void runInternal() throws Exception {
            long waitMs = this.mContext.getStartMs() - CommonUtils.getCurrentMs();
            if (waitMs < 0L) {
                throw new IllegalStateException(String.format("Thread missed barrier. Set the start time to a later time. start: %d current: %d", this.mContext.getStartMs(), CommonUtils.getCurrentMs()));
            }
            CommonUtils.sleepMs((long)waitMs);
            this.applyOperation(this.mPath);
        }

        private void applyOperation(String dirPath) throws IOException, AlluxioException, InterruptedException, TimeoutException {
            switch (((StressJobServiceBench)StressJobServiceBench.this).mParameters.mOperation) {
                case DISTRIBUTED_LOAD: {
                    this.mResult.setRecordStartMs(this.mContext.getStartMs());
                    long startNs = System.nanoTime();
                    long endNs = this.runDistributedLoad(dirPath);
                    this.recordResponseTimeInfo(startNs, endNs);
                    break;
                }
                case CREATE_FILES: {
                    FileSystem fileSystem = FileSystem.Factory.create((FileSystemContext)StressJobServiceBench.this.mFsContext);
                    long start = CommonUtils.getCurrentMs();
                    StressJobServiceBench.this.deletePath(fileSystem, dirPath);
                    long deleteEnd = CommonUtils.getCurrentMs();
                    LOG.info("Cleanup delete took: {} s", (Object)((double)(deleteEnd - start) / 1000.0));
                    long fileSize = FormatUtils.parseSpaceSize((String)((StressJobServiceBench)StressJobServiceBench.this).mParameters.mFileSize);
                    this.mResult.setRecordStartMs(this.mContext.getStartMs());
                    this.createFiles(fileSystem, ((StressJobServiceBench)StressJobServiceBench.this).mParameters.mNumFilesPerDir, dirPath, fileSize);
                    long createEnd = CommonUtils.getCurrentMs();
                    LOG.info("Create files took: {} s", (Object)((double)(createEnd - deleteEnd) / 1000.0));
                    break;
                }
                case NO_OP: {
                    while (!Thread.currentThread().isInterrupted() && CommonUtils.getCurrentMs() < this.mContext.getEndMs()) {
                        long recordMs = this.mContext.getStartMs() + FormatUtils.parseTimeSize((String)((StressJobServiceBench)StressJobServiceBench.this).mParameters.mWarmup);
                        this.mResult.setRecordStartMs(recordMs);
                        this.mContext.getRateLimiter().acquire();
                        long startNs = System.nanoTime();
                        StressJobServiceBench.this.runNoop();
                        long endNs = System.nanoTime();
                        long currentMs = CommonUtils.getCurrentMs();
                        if (currentMs <= recordMs) continue;
                        this.mResult.incrementNumSuccess(1L);
                        this.recordResponseTimeInfo(startNs, endNs);
                    }
                    break;
                }
                default: {
                    throw new IllegalStateException("Unknown operation: " + ((StressJobServiceBench)StressJobServiceBench.this).mParameters.mOperation);
                }
            }
        }

        private void recordResponseTimeInfo(long startNs, long endNs) {
            long responseTimeNs = endNs - startNs;
            this.mResponseTimeNs.recordValue(responseTimeNs);
            long[] maxResponseTimeNs = this.mResult.getStatistics().mMaxResponseTimeNs;
            if (responseTimeNs > maxResponseTimeNs[0]) {
                maxResponseTimeNs[0] = responseTimeNs;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private long runDistributedLoad(String dirPath) throws AlluxioException, IOException {
            long stopTime;
            int numReplication = 1;
            DistributedLoadCommand cmd = new DistributedLoadCommand(StressJobServiceBench.this.mFsContext);
            try {
                long jobControlId = cmd.runDistLoad(new AlluxioURI(dirPath), numReplication, ((StressJobServiceBench)StressJobServiceBench.this).mParameters.mBatchSize, new HashSet(), new HashSet(), new HashSet(), new HashSet(), false);
                cmd.waitForCmd(jobControlId);
                stopTime = System.nanoTime();
                cmd.postProcessing(jobControlId);
            }
            finally {
                this.mResult.incrementNumSuccess((long)cmd.getCompletedCount());
            }
            return stopTime;
        }

        private void createFiles(FileSystem fs, int numFiles, String dirPath, long fileSize) throws IOException, AlluxioException {
            CreateFilePOptions options = CreateFilePOptions.newBuilder().setRecursive(true).setWriteType(WritePType.THROUGH).build();
            byte[] buf = new byte[0x100000];
            Arrays.fill(buf, (byte)65);
            for (int fileId = 0; fileId < numFiles; ++fileId) {
                String filePath = String.format("%s/%d", dirPath, fileId);
                try (FileOutStream os = fs.createFile(new AlluxioURI(filePath), options);){
                    int toWrite;
                    for (long bytesWritten = 0L; bytesWritten < fileSize; bytesWritten += (long)toWrite) {
                        toWrite = (int)Math.min((long)buf.length, fileSize - bytesWritten);
                        os.write(buf, 0, toWrite);
                    }
                    continue;
                }
            }
            this.mResult.incrementNumSuccess((long)numFiles);
        }
    }

    private final class BenchContext {
        private final RateLimiter mRateLimiter;
        private final long mStartMs;
        private final long mEndMs;
        private JobServiceBenchTaskResult mResult;

        public BenchContext(RateLimiter rateLimiter, long startMs, long endMs) {
            this.mRateLimiter = rateLimiter;
            this.mStartMs = startMs;
            this.mEndMs = endMs;
        }

        public RateLimiter getRateLimiter() {
            return this.mRateLimiter;
        }

        public long getStartMs() {
            return this.mStartMs;
        }

        public long getEndMs() {
            return this.mEndMs;
        }

        public synchronized void mergeThreadResult(JobServiceBenchTaskResult threadResult) {
            if (this.mResult == null) {
                this.mResult = threadResult;
                return;
            }
            try {
                this.mResult.merge(threadResult);
            }
            catch (Exception e) {
                this.mResult.addErrorMessage(e.toString());
            }
        }

        @SuppressFBWarnings(value={"DMI_HARDCODED_ABSOLUTE_FILENAME"})
        public synchronized void addAdditionalResult() throws IOException {
            if (this.mResult == null) {
                return;
            }
            Map<String, Benchmark.MethodStatistics> nameStatistics = StressJobServiceBench.this.processMethodProfiles(this.mResult.getRecordStartMs(), this.mResult.getEndMs(), profileInput -> {
                String method = profileInput.getMethod();
                if (profileInput.getType().contains("RPC")) {
                    int classNameDivider = profileInput.getMethod().lastIndexOf(".");
                    method = profileInput.getMethod().substring(classNameDivider + 1);
                }
                return profileInput.getType() + ":" + method;
            });
            for (Map.Entry<String, Benchmark.MethodStatistics> entry : nameStatistics.entrySet()) {
                JobServiceBenchTaskResultStatistics stats = new JobServiceBenchTaskResultStatistics();
                stats.encodeResponseTimeNsRaw(entry.getValue().getTimeNs());
                stats.mNumSuccess = entry.getValue().getNumSuccess();
                stats.mMaxResponseTimeNs = entry.getValue().getMaxTimeNs();
                this.mResult.putStatisticsForMethod(entry.getKey(), stats);
            }
        }

        public synchronized JobServiceBenchTaskResult getResult() {
            return this.mResult;
        }
    }
}

