/*
 * Decompiled with CFR 0.152.
 */
package alluxio.stress.cli.worker;

import alluxio.AlluxioURI;
import alluxio.annotation.SuppressFBWarnings;
import alluxio.client.file.FileSystem;
import alluxio.stress.cli.AbstractStressBench;
import alluxio.stress.cli.client.ClientIOWritePolicy;
import alluxio.stress.worker.WorkerBenchParameters;
import alluxio.stress.worker.WorkerBenchTaskResult;
import alluxio.util.CommonUtils;
import alluxio.util.FormatUtils;
import alluxio.util.executor.ExecutorServiceFactories;
import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StressWorkerBench
extends AbstractStressBench<WorkerBenchTaskResult, WorkerBenchParameters> {
    private static final Logger LOG = LoggerFactory.getLogger(StressWorkerBench.class);
    private FileSystem[] mCachedFs;
    private Path mFilePath;

    public StressWorkerBench() {
        this.mParameters = new WorkerBenchParameters();
    }

    public static void main(String[] args) {
        StressWorkerBench.mainInternal(args, new StressWorkerBench());
    }

    @Override
    public String getBenchDescription() {
        return String.join((CharSequence)"\n", (Iterable<? extends CharSequence>)ImmutableList.of((Object)"A benchmarking tool to measure the read performance of alluxio workers in the cluster", (Object)"The test will create one file and repeatedly read the created file to test the performance", (Object)"", (Object)"Example:", (Object)"# This would create a 100MB file with block size of 16KB and then read the file for 30s after 10s warmup", (Object)"$ bin/alluxio runClass alluxio.stress.cli.worker.StressWorkerBench --clients 1 --base alluxio:///stress-worker-base --block-size 16k --file-size 100m --warmup 10s --duration 30s --cluster\n"));
    }

    @Override
    @SuppressFBWarnings(value={"BC_UNCONFIRMED_CAST"})
    public void prepare() throws Exception {
        Configuration hdfsConf;
        this.mFilePath = new Path(((WorkerBenchParameters)this.mParameters).mBasePath, "data");
        ClientIOWritePolicy.setMaxWorkers(1);
        if (!this.mBaseParameters.mDistributed) {
            hdfsConf = new Configuration();
            hdfsConf.set("alluxio.user.file.delete.unchecked", "true");
            hdfsConf.set("alluxio.user.file.writetype.default", "CACHE_THROUGH");
            hdfsConf.set("alluxio.user.block.write.location.policy.class", ClientIOWritePolicy.class.getName());
            hdfsConf.set("alluxio.user.ufs.block.read.location.policy", ClientIOWritePolicy.class.getName());
            FileSystem prepareFs = FileSystem.get((URI)new URI(((WorkerBenchParameters)this.mParameters).mBasePath), (Configuration)hdfsConf);
            Path path = new Path(((WorkerBenchParameters)this.mParameters).mBasePath);
            prepareFs.delete(path, true);
            prepareFs.mkdirs(path);
            int fileSize = (int)FormatUtils.parseSpaceSize((String)((WorkerBenchParameters)this.mParameters).mFileSize);
            byte[] buffer = new byte[(int)FormatUtils.parseSpaceSize((String)((WorkerBenchParameters)this.mParameters).mBufferSize)];
            Arrays.fill(buffer, (byte)65);
            try (FSDataOutputStream mOutStream = prepareFs.create(this.mFilePath, false, buffer.length, (short)1, FormatUtils.parseSpaceSize((String)((WorkerBenchParameters)this.mParameters).mBlockSize));){
                int bytesToWrite;
                while ((bytesToWrite = (int)Math.min((long)fileSize - mOutStream.getPos(), (long)buffer.length)) != 0) {
                    mOutStream.write(buffer, 0, bytesToWrite);
                }
            }
            if (((WorkerBenchParameters)this.mParameters).mFree && "alluxio".equals(this.mFilePath.toUri().getScheme())) {
                FileSystem.Factory.get().free(new AlluxioURI(this.mFilePath.toString()));
                LOG.info("Freed file before reading: " + this.mFilePath);
            }
        }
        hdfsConf = new Configuration();
        hdfsConf.set(String.format("fs.%s.impl.disable.cache", new URI(((WorkerBenchParameters)this.mParameters).mBasePath).getScheme()), "true");
        hdfsConf.set("alluxio.user.block.write.location.policy.class", ClientIOWritePolicy.class.getName());
        hdfsConf.set("alluxio.user.ufs.block.read.location.policy", ClientIOWritePolicy.class.getName());
        for (Map.Entry entry : ((WorkerBenchParameters)this.mParameters).mConf.entrySet()) {
            hdfsConf.set((String)entry.getKey(), (String)entry.getValue());
        }
        this.mCachedFs = new FileSystem[((WorkerBenchParameters)this.mParameters).mClients];
        for (int i = 0; i < this.mCachedFs.length; ++i) {
            this.mCachedFs[i] = FileSystem.get((URI)new URI(((WorkerBenchParameters)this.mParameters).mBasePath), (Configuration)hdfsConf);
        }
    }

    @Override
    @SuppressFBWarnings(value={"BC_UNCONFIRMED_CAST"})
    public WorkerBenchTaskResult runLocal() throws Exception {
        ExecutorService service = ExecutorServiceFactories.fixedThreadPool((String)"bench-thread", (int)((WorkerBenchParameters)this.mParameters).mThreads).create();
        long durationMs = FormatUtils.parseTimeSize((String)((WorkerBenchParameters)this.mParameters).mDuration);
        long warmupMs = FormatUtils.parseTimeSize((String)((WorkerBenchParameters)this.mParameters).mWarmup);
        long startMs = this.mBaseParameters.mStartMs;
        if (this.mBaseParameters.mStartMs == -1L) {
            startMs = CommonUtils.getCurrentMs() + 5000L;
        }
        long endMs = startMs + warmupMs + durationMs;
        BenchContext context = new BenchContext(startMs, endMs);
        ArrayList<BenchThread> callables = new ArrayList<BenchThread>(((WorkerBenchParameters)this.mParameters).mThreads);
        for (int i = 0; i < ((WorkerBenchParameters)this.mParameters).mThreads; ++i) {
            callables.add(new BenchThread(context, this.mCachedFs[i % this.mCachedFs.length]));
        }
        service.invokeAll(callables, FormatUtils.parseTimeSize((String)this.mBaseParameters.mBenchTimeout), TimeUnit.MILLISECONDS);
        service.shutdownNow();
        service.awaitTermination(30L, TimeUnit.SECONDS);
        return context.getResult();
    }

    private final class BenchThread
    implements Callable<Void> {
        private final BenchContext mContext;
        private final FileSystem mFs;
        private final byte[] mBuffer;
        private final WorkerBenchTaskResult mResult;
        private FSDataInputStream mInStream = null;

        @SuppressFBWarnings(value={"BC_UNCONFIRMED_CAST"})
        private BenchThread(BenchContext context, FileSystem fs) {
            this.mContext = context;
            this.mFs = fs;
            this.mBuffer = new byte[(int)FormatUtils.parseSpaceSize((String)((WorkerBenchParameters)((StressWorkerBench)StressWorkerBench.this).mParameters).mBufferSize)];
            this.mResult = new WorkerBenchTaskResult();
            this.mResult.setParameters((WorkerBenchParameters)StressWorkerBench.this.mParameters);
            this.mResult.setBaseParameters(StressWorkerBench.this.mBaseParameters);
        }

        @Override
        public Void call() {
            try {
                this.runInternal();
            }
            catch (Exception e) {
                LOG.error(Thread.currentThread().getName() + ": failed", (Throwable)e);
                this.mResult.addErrorMessage(e.getMessage());
            }
            finally {
                this.closeInStream();
            }
            this.mResult.setEndMs(CommonUtils.getCurrentMs());
            this.mContext.mergeThreadResult(this.mResult);
            return null;
        }

        @SuppressFBWarnings(value={"BC_UNCONFIRMED_CAST"})
        private void runInternal() throws Exception {
            long recordMs = this.mContext.getStartMs() + FormatUtils.parseTimeSize((String)((WorkerBenchParameters)((StressWorkerBench)StressWorkerBench.this).mParameters).mWarmup);
            this.mResult.setRecordStartMs(recordMs);
            long waitMs = this.mContext.getStartMs() - CommonUtils.getCurrentMs();
            if (waitMs < 0L) {
                throw new IllegalStateException(String.format("Thread missed barrier. Increase the start delay. start: %d current: %d", this.mContext.getStartMs(), CommonUtils.getCurrentMs()));
            }
            CommonUtils.sleepMs((long)waitMs);
            while (!Thread.currentThread().isInterrupted() && CommonUtils.getCurrentMs() < this.mContext.getEndMs()) {
                int ioBytes = this.applyOperation();
                long currentMs = CommonUtils.getCurrentMs();
                if (currentMs <= recordMs || ioBytes <= 0) continue;
                this.mResult.incrementIOBytes((long)ioBytes);
            }
        }

        private int applyOperation() throws IOException {
            int bytesRead;
            if (this.mInStream == null) {
                this.mInStream = this.mFs.open(StressWorkerBench.this.mFilePath);
            }
            if ((bytesRead = this.mInStream.read(this.mBuffer)) < 0) {
                this.closeInStream();
                this.mInStream = this.mFs.open(StressWorkerBench.this.mFilePath);
            }
            return bytesRead;
        }

        private void closeInStream() {
            try {
                if (this.mInStream != null) {
                    this.mInStream.close();
                }
            }
            catch (IOException e) {
                this.mResult.addErrorMessage(e.getMessage());
            }
            finally {
                this.mInStream = null;
            }
        }
    }

    private final class BenchContext {
        private final long mStartMs;
        private final long mEndMs;
        private WorkerBenchTaskResult mResult;

        public BenchContext(long startMs, long endMs) {
            this.mStartMs = startMs;
            this.mEndMs = endMs;
        }

        public long getStartMs() {
            return this.mStartMs;
        }

        public long getEndMs() {
            return this.mEndMs;
        }

        public synchronized void mergeThreadResult(WorkerBenchTaskResult threadResult) {
            if (this.mResult == null) {
                this.mResult = threadResult;
                return;
            }
            try {
                this.mResult.merge(threadResult);
            }
            catch (Exception e) {
                this.mResult.addErrorMessage(e.getMessage());
            }
        }

        synchronized WorkerBenchTaskResult getResult() {
            return this.mResult;
        }
    }
}

