/*
 * Decompiled with CFR 0.152.
 */
package alluxio.stress.cli;

import alluxio.cli.ValidationUtils;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.InstancedConfiguration;
import alluxio.stress.cli.Benchmark;
import alluxio.stress.worker.IOTaskResult;
import alluxio.stress.worker.UfsIOParameters;
import alluxio.underfs.UnderFileSystem;
import alluxio.underfs.UnderFileSystemConfiguration;
import alluxio.util.CommonUtils;
import alluxio.util.FormatUtils;
import alluxio.util.executor.ExecutorServiceFactories;
import com.beust.jcommander.ParametersDelegate;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UfsIOBench
extends Benchmark<IOTaskResult> {
    private static final Logger LOG = LoggerFactory.getLogger(UfsIOBench.class);
    private static final int BUFFER_SIZE = 0x100000;
    @ParametersDelegate
    private UfsIOParameters mParameters = new UfsIOParameters();
    private final InstancedConfiguration mConf = InstancedConfiguration.defaults();
    private final UUID mTaskId = UUID.randomUUID();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public IOTaskResult runLocal() throws Exception {
        boolean switched = CommonUtils.PROCESS_TYPE.compareAndSet(CommonUtils.ProcessType.CLIENT, CommonUtils.ProcessType.JOB_WORKER);
        LOG.debug("Running locally with {} threads", (Object)this.mParameters.mThreads);
        ExecutorService pool = null;
        IOTaskResult result = null;
        try {
            pool = ExecutorServiceFactories.fixedThreadPool((String)"bench-io-thread", (int)this.mParameters.mThreads).create();
            result = this.runIOBench(pool);
            LOG.debug("IO benchmark finished with result: {}", (Object)result);
            IOTaskResult iOTaskResult = result;
            return iOTaskResult;
        }
        catch (Exception e) {
            if (result == null) {
                LOG.error("Failed run UFS IO benchmark on path {}", (Object)this.mParameters.mPath, (Object)e);
                result = new IOTaskResult();
                result.setParameters(this.mParameters);
                result.setBaseParameters(this.mBaseParameters);
                result.addError(ValidationUtils.getErrorInfo((Throwable)e));
            }
            IOTaskResult iOTaskResult = result;
            return iOTaskResult;
        }
        finally {
            if (pool != null) {
                pool.shutdownNow();
                pool.awaitTermination(30L, TimeUnit.SECONDS);
            }
            if (switched) {
                CommonUtils.PROCESS_TYPE.set(CommonUtils.ProcessType.CLIENT);
            }
        }
    }

    @Override
    public void prepare() {
        if (this.mParameters.mUseUfsConf && !this.mBaseParameters.mCluster) {
            throw new IllegalArgumentException(String.format("%s can not use the ufs conf if it is not running in cluster mode", this.getClass().getName()));
        }
    }

    public static void main(String[] args) {
        UfsIOBench.mainInternal(args, new UfsIOBench());
    }

    private String getFilePathStr(int idx) {
        return this.mParameters.mPath + String.format("io-benchmark-%s-%d", this.mTaskId.toString(), idx);
    }

    private IOTaskResult runIOBench(ExecutorService pool) throws Exception {
        IOTaskResult writeTaskResult = this.write(pool);
        if (writeTaskResult.getPoints().size() == 0) {
            LOG.error("Failed to write any files. Abort the test.");
            return writeTaskResult;
        }
        IOTaskResult readTaskResult = this.read(pool);
        this.cleanUp();
        return writeTaskResult.merge(readTaskResult);
    }

    private void cleanUp() throws IOException {
        UnderFileSystemConfiguration ufsConf = UnderFileSystemConfiguration.defaults((AlluxioConfiguration)this.mConf).createMountSpecificConf(this.mParameters.mConf);
        UnderFileSystem ufs = UnderFileSystem.Factory.create((String)this.mParameters.mPath, (UnderFileSystemConfiguration)ufsConf);
        for (int i = 0; i < this.mParameters.mThreads; ++i) {
            ufs.deleteFile(this.getFilePathStr(i));
        }
    }

    private IOTaskResult read(ExecutorService pool) throws InterruptedException, ExecutionException {
        UnderFileSystem ufs;
        long ioSizeBytes;
        int numThreads;
        try {
            numThreads = this.mParameters.mThreads;
            ioSizeBytes = FormatUtils.parseSpaceSize((String)this.mParameters.mDataSize);
            UnderFileSystemConfiguration ufsConf = UnderFileSystemConfiguration.defaults((AlluxioConfiguration)this.mConf).createMountSpecificConf(this.mParameters.mConf);
            ufs = UnderFileSystem.Factory.create((String)this.mParameters.mPath, (UnderFileSystemConfiguration)ufsConf);
            if (!ufs.exists(this.mParameters.mPath)) {
                throw new IOException(String.format("The target directory %s does not exist!", this.mParameters.mPath));
            }
        }
        catch (Exception e) {
            LOG.error("Failed to access UFS path {}", (Object)this.mParameters.mPath);
            IOTaskResult result = new IOTaskResult();
            result.setParameters(this.mParameters);
            result.setBaseParameters(this.mBaseParameters);
            result.addError(ValidationUtils.getErrorInfo((Throwable)e));
            return result;
        }
        ArrayList<CompletableFuture<IOTaskResult>> futures = new ArrayList<CompletableFuture<IOTaskResult>>();
        int i = 0;
        while (i < numThreads) {
            int idx = i++;
            CompletableFuture<IOTaskResult> future = CompletableFuture.supplyAsync(() -> {
                IOTaskResult result = new IOTaskResult();
                result.setBaseParameters(this.mBaseParameters);
                result.setParameters(this.mParameters);
                long startTime = CommonUtils.getCurrentMs();
                String filePath = this.getFilePathStr(idx);
                LOG.debug("Reading filePath={}", (Object)filePath);
                InputStream inStream = null;
                try {
                    long readBytes;
                    int readBufBytes;
                    inStream = ufs.open(filePath);
                    byte[] buf = new byte[0x100000];
                    for (readBytes = 0L; readBytes < ioSizeBytes && (readBufBytes = inStream.read(buf)) > 0; readBytes += (long)readBufBytes) {
                    }
                    long endTime = CommonUtils.getCurrentMs();
                    double duration = (double)(endTime - startTime) / 1000.0;
                    IOTaskResult.Point p = new IOTaskResult.Point(IOTaskResult.IOMode.READ, duration, readBytes);
                    result.addPoint(p);
                    LOG.debug("Read task finished {}", (Object)p);
                }
                catch (Exception e) {
                    LOG.error("Failed to read {}", (Object)filePath, (Object)e);
                    result.addError(ValidationUtils.getErrorInfo((Throwable)e));
                }
                finally {
                    if (inStream != null) {
                        try {
                            inStream.close();
                        }
                        catch (IOException e) {
                            LOG.warn("Failed to close read stream {}", (Object)filePath, (Object)e);
                            result.addError(e.getMessage());
                        }
                    }
                }
                return result;
            }, pool);
            futures.add(future);
        }
        CompletableFuture[] cfs = futures.toArray(new CompletableFuture[0]);
        List results = (List)((CompletableFuture)CompletableFuture.allOf(cfs).thenApply(f -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()))).get();
        return IOTaskResult.reduceList((Iterable)results);
    }

    private IOTaskResult write(ExecutorService pool) throws InterruptedException, ExecutionException {
        UnderFileSystem ufs;
        long ioSizeBytes;
        int numThreads;
        try {
            numThreads = this.mParameters.mThreads;
            ioSizeBytes = FormatUtils.parseSpaceSize((String)this.mParameters.mDataSize);
            UnderFileSystemConfiguration ufsConf = UnderFileSystemConfiguration.defaults((AlluxioConfiguration)this.mConf).createMountSpecificConf(this.mParameters.mConf);
            ufs = UnderFileSystem.Factory.create((String)this.mParameters.mPath, (UnderFileSystemConfiguration)ufsConf);
            if (!ufs.exists(this.mParameters.mPath)) {
                LOG.debug("Prepare directory {}", (Object)this.mParameters.mPath);
                ufs.mkdirs(this.mParameters.mPath);
            }
        }
        catch (Exception e) {
            LOG.error("Failed to access UFS path {}", (Object)this.mParameters.mPath);
            IOTaskResult result = new IOTaskResult();
            result.setParameters(this.mParameters);
            result.setBaseParameters(this.mBaseParameters);
            result.addError(ValidationUtils.getErrorInfo((Throwable)e));
            return result;
        }
        ArrayList<CompletableFuture<IOTaskResult>> futures = new ArrayList<CompletableFuture<IOTaskResult>>();
        byte[] randomData = CommonUtils.randomBytes((int)0x100000);
        int i = 0;
        while (i < numThreads) {
            int idx = i++;
            CompletableFuture<IOTaskResult> future = CompletableFuture.supplyAsync(() -> {
                IOTaskResult result = new IOTaskResult();
                result.setParameters(this.mParameters);
                result.setBaseParameters(this.mBaseParameters);
                long startTime = CommonUtils.getCurrentMs();
                String filePath = this.getFilePathStr(idx);
                LOG.debug("filePath={}, data to write={}", (Object)filePath, (Object)this.mParameters.mDataSize);
                BufferedOutputStream outStream = null;
                try {
                    long wroteBytes;
                    long bytesToWrite;
                    outStream = new BufferedOutputStream(ufs.create(filePath));
                    for (wroteBytes = 0L; wroteBytes < ioSizeBytes; wroteBytes += bytesToWrite) {
                        bytesToWrite = Math.min(ioSizeBytes - wroteBytes, 0x100000L);
                        outStream.write(randomData, 0, (int)bytesToWrite);
                    }
                    outStream.flush();
                    long endTime = CommonUtils.getCurrentMs();
                    double duration = (double)(endTime - startTime) / 1000.0;
                    IOTaskResult.Point p = new IOTaskResult.Point(IOTaskResult.IOMode.WRITE, duration, wroteBytes);
                    result.addPoint(p);
                    LOG.debug("Write task finished {}", (Object)p);
                }
                catch (Exception e) {
                    LOG.error("Failed to write to UFS: ", (Throwable)e);
                    result.addError(e.getMessage());
                }
                finally {
                    if (outStream != null) {
                        try {
                            outStream.close();
                        }
                        catch (IOException e) {
                            LOG.warn("Failed to close stream to UFS: ", (Throwable)e);
                            result.addError(e.getMessage());
                        }
                    }
                }
                LOG.debug("Thread {} file={}, IOBench result={}", new Object[]{Thread.currentThread().getName(), filePath, result});
                return result;
            }, pool);
            futures.add(future);
        }
        CompletableFuture[] cfs = futures.toArray(new CompletableFuture[0]);
        List results = (List)((CompletableFuture)CompletableFuture.allOf(cfs).thenApply(f -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()))).get();
        return IOTaskResult.reduceList((Iterable)results);
    }
}

