/*
 * Decompiled with CFR 0.152.
 */
package alluxio.client.hadoop;

import alluxio.annotation.dora.DoraTestTodoItem;
import alluxio.client.WriteType;
import alluxio.client.hadoop.AbstractIOMapper;
import alluxio.client.hadoop.AccumulatingReducer;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.hadoop.FileSystem;
import alluxio.hadoop.HadoopConfigurationUtils;
import alluxio.testutils.BaseIntegrationTest;
import alluxio.testutils.LocalAlluxioClusterResource;
import java.io.BufferedReader;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.URI;
import java.util.Date;
import java.util.Random;
import java.util.StringTokenizer;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.junit.AfterClass;
import org.junit.AssumptionViolatedException;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DFSIOIntegrationTest
extends BaseIntegrationTest
implements Tool {
    private static final Logger LOG = LoggerFactory.getLogger(DFSIOIntegrationTest.class);
    private static final int DEFAULT_BUFFER_SIZE = 4096;
    private static final String BASE_FILE_NAME = "test_io_";
    private static final String DEFAULT_RES_FILE_NAME = "DFSIOIntegrationTest_results.log";
    private static final long MEGA = ByteMultiple.MB.value();
    private static final int DEFAULT_NR_BYTES = 16384;
    private static final int DEFAULT_NR_FILES = 4;
    private static boolean sGenerateReportFile = false;
    private static final String USAGE = "Usage: " + DFSIOIntegrationTest.class.getSimpleName() + " [genericOptions] -read [-random | -backward | -skip [-skipSize Size]] | -write | -append | -clean [-compression codecClassName] [-nrFiles N] [-size Size[B|KB|MB|GB|TB]] [-resFile resultFileName] [-bufferSize Bytes] [-rootDir]";
    private Configuration mConfig = new Configuration();
    @ClassRule
    public static LocalAlluxioClusterResource sLocalAlluxioClusterResource = new LocalAlluxioClusterResource.Builder().setProperty(PropertyKey.USER_FILE_WRITE_TYPE_DEFAULT, WriteType.CACHE_THROUGH).build();
    private static URI sLocalAlluxioClusterUri = null;
    private static DFSIOIntegrationTest sBench;
    @ClassRule
    public static HadoopVersionRule sHadoopVersionRule;

    private static String getBaseDir(Configuration conf) {
        return conf.get("test.dfsio.build.data", "/benchmarks/DFSIOIntegrationTest");
    }

    private static Path getControlDir(Configuration conf) {
        return new Path(DFSIOIntegrationTest.getBaseDir(conf), "io_control");
    }

    private static Path getWriteDir(Configuration conf) {
        return new Path(DFSIOIntegrationTest.getBaseDir(conf), "io_write");
    }

    private static Path getReadDir(Configuration conf) {
        return new Path(DFSIOIntegrationTest.getBaseDir(conf), "io_read");
    }

    private static Path getAppendDir(Configuration conf) {
        return new Path(DFSIOIntegrationTest.getBaseDir(conf), "io_append");
    }

    private static Path getRandomReadDir(Configuration conf) {
        return new Path(DFSIOIntegrationTest.getBaseDir(conf), "io_random_read");
    }

    private static Path getDataDir(Configuration conf) {
        return new Path(DFSIOIntegrationTest.getBaseDir(conf), "io_data");
    }

    @BeforeClass
    public static void beforeClass() throws Exception {
        sBench = new DFSIOIntegrationTest();
        sBench.getConf().setBoolean("dfs.support.append", true);
        sLocalAlluxioClusterUri = URI.create(sLocalAlluxioClusterResource.get().getMasterURI());
        sBench.getConf().set("fs.defaultFS", sLocalAlluxioClusterUri.toString());
        sBench.getConf().set("fs.default.name", sLocalAlluxioClusterUri.toString());
        sBench.getConf().set("fs.alluxio.impl", FileSystem.class.getName());
        org.apache.hadoop.fs.FileSystem fs = org.apache.hadoop.fs.FileSystem.get((URI)sLocalAlluxioClusterUri, (Configuration)HadoopConfigurationUtils.mergeAlluxioConfiguration((Configuration)sBench.getConf(), (AlluxioConfiguration)alluxio.conf.Configuration.global()));
        sBench.createControlFile(fs, 16384L, 4);
        DFSIOIntegrationTest.writeTest();
    }

    @AfterClass
    public static void afterClass() throws Exception {
        org.apache.hadoop.fs.FileSystem fs = org.apache.hadoop.fs.FileSystem.get((URI)sLocalAlluxioClusterUri, (Configuration)HadoopConfigurationUtils.mergeAlluxioConfiguration((Configuration)sBench.getConf(), (AlluxioConfiguration)alluxio.conf.Configuration.global()));
        sBench.cleanup(fs);
    }

    public static void writeTest() throws Exception {
        org.apache.hadoop.fs.FileSystem fs = org.apache.hadoop.fs.FileSystem.get((URI)sLocalAlluxioClusterUri, (Configuration)HadoopConfigurationUtils.mergeAlluxioConfiguration((Configuration)sBench.getConf(), (AlluxioConfiguration)alluxio.conf.Configuration.global()));
        long tStart = System.currentTimeMillis();
        sBench.mapperWriteTest(fs);
        long execTime = System.currentTimeMillis() - tStart;
        sBench.analyzeResult(fs, TestType.TEST_TYPE_WRITE, execTime);
    }

    @Test(timeout=50000L)
    @Ignore
    @DoraTestTodoItem(action=DoraTestTodoItem.Action.FIX, owner="jiaming", comment="fix the test case")
    public void read() throws Exception {
        org.apache.hadoop.fs.FileSystem fs = org.apache.hadoop.fs.FileSystem.get((URI)sLocalAlluxioClusterUri, (Configuration)HadoopConfigurationUtils.mergeAlluxioConfiguration((Configuration)sBench.getConf(), (AlluxioConfiguration)alluxio.conf.Configuration.global()));
        long tStart = System.currentTimeMillis();
        sBench.mapperReadTest(fs);
        long execTime = System.currentTimeMillis() - tStart;
        sBench.analyzeResult(fs, TestType.TEST_TYPE_READ, execTime);
    }

    @Test(timeout=50000L)
    @Ignore
    @DoraTestTodoItem(action=DoraTestTodoItem.Action.FIX, owner="jiaming", comment="fix the test case")
    public void readRandom() throws Exception {
        org.apache.hadoop.fs.FileSystem fs = org.apache.hadoop.fs.FileSystem.get((URI)sLocalAlluxioClusterUri, (Configuration)HadoopConfigurationUtils.mergeAlluxioConfiguration((Configuration)sBench.getConf(), (AlluxioConfiguration)alluxio.conf.Configuration.global()));
        long tStart = System.currentTimeMillis();
        sBench.getConf().setLong("test.io.skip.size", 0L);
        sBench.randomReadTest(fs);
        long execTime = System.currentTimeMillis() - tStart;
        sBench.analyzeResult(fs, TestType.TEST_TYPE_READ_RANDOM, execTime);
    }

    @Test(timeout=50000L)
    @Ignore
    @DoraTestTodoItem(action=DoraTestTodoItem.Action.FIX, owner="jiaming", comment="fix the test case")
    public void readBackward() throws Exception {
        org.apache.hadoop.fs.FileSystem fs = org.apache.hadoop.fs.FileSystem.get((URI)sLocalAlluxioClusterUri, (Configuration)HadoopConfigurationUtils.mergeAlluxioConfiguration((Configuration)sBench.getConf(), (AlluxioConfiguration)alluxio.conf.Configuration.global()));
        long tStart = System.currentTimeMillis();
        sBench.getConf().setLong("test.io.skip.size", -4096L);
        sBench.randomReadTest(fs);
        long execTime = System.currentTimeMillis() - tStart;
        sBench.analyzeResult(fs, TestType.TEST_TYPE_READ_BACKWARD, execTime);
    }

    @Test(timeout=50000L)
    @Ignore
    @DoraTestTodoItem(action=DoraTestTodoItem.Action.FIX, owner="jiaming", comment="fix the test case")
    public void readSkip() throws Exception {
        org.apache.hadoop.fs.FileSystem fs = org.apache.hadoop.fs.FileSystem.get((URI)sLocalAlluxioClusterUri, (Configuration)HadoopConfigurationUtils.mergeAlluxioConfiguration((Configuration)sBench.getConf(), (AlluxioConfiguration)alluxio.conf.Configuration.global()));
        long tStart = System.currentTimeMillis();
        sBench.getConf().setLong("test.io.skip.size", 1L);
        sBench.randomReadTest(fs);
        long execTime = System.currentTimeMillis() - tStart;
        sBench.analyzeResult(fs, TestType.TEST_TYPE_READ_SKIP, execTime);
    }

    @Test(timeout=50000L)
    @Ignore
    @DoraTestTodoItem(action=DoraTestTodoItem.Action.FIX, owner="jiaming", comment="fix the test case")
    public void readLargeSkip() throws Exception {
        org.apache.hadoop.fs.FileSystem fs = org.apache.hadoop.fs.FileSystem.get((URI)sLocalAlluxioClusterUri, (Configuration)HadoopConfigurationUtils.mergeAlluxioConfiguration((Configuration)sBench.getConf(), (AlluxioConfiguration)alluxio.conf.Configuration.global()));
        long tStart = System.currentTimeMillis();
        sBench.getConf().setLong("test.io.skip.size", 5000L);
        sBench.randomReadTest(fs);
        long execTime = System.currentTimeMillis() - tStart;
        sBench.analyzeResult(fs, TestType.TEST_TYPE_READ_SKIP, execTime);
    }

    private void createControlFile(org.apache.hadoop.fs.FileSystem fs, long nrBytes, int nrFiles) throws IOException {
        LOG.info("creating control file: " + nrBytes + " bytes, " + nrFiles + " files");
        Path controlDir = DFSIOIntegrationTest.getControlDir(this.mConfig);
        if (!fs.exists(controlDir)) {
            fs.delete(controlDir, true);
            for (int i = 0; i < nrFiles; ++i) {
                String name = DFSIOIntegrationTest.getFileName(i);
                Path controlFile = new Path(controlDir, "in_file_" + name);
                SequenceFile.Writer writer = null;
                try {
                    writer = SequenceFile.createWriter((org.apache.hadoop.fs.FileSystem)fs, (Configuration)this.mConfig, (Path)controlFile, Text.class, LongWritable.class, (SequenceFile.CompressionType)SequenceFile.CompressionType.NONE);
                    writer.append((Writable)new Text(name), (Writable)new LongWritable(nrBytes));
                    continue;
                }
                catch (Exception e) {
                    throw new IOException(e.getLocalizedMessage());
                }
                finally {
                    if (writer != null) {
                        writer.close();
                    }
                    writer = null;
                }
            }
        }
        LOG.info("created control files for: " + nrFiles + " files");
    }

    private static String getFileName(int fIdx) {
        return BASE_FILE_NAME + Integer.toString(fIdx);
    }

    private void mapperWriteTest(org.apache.hadoop.fs.FileSystem fs) throws IOException {
        Path writeDir = DFSIOIntegrationTest.getWriteDir(this.mConfig);
        fs.delete(DFSIOIntegrationTest.getDataDir(this.mConfig), true);
        fs.delete(writeDir, true);
        this.runIOTest(WriteMapper.class, writeDir);
    }

    private void runIOTest(Class<? extends Mapper<Text, LongWritable, Text, Text>> mapperClass, Path outputDir) throws IOException {
        JobConf job = new JobConf(this.mConfig, DFSIOIntegrationTest.class);
        FileInputFormat.setInputPaths((JobConf)job, (Path[])new Path[]{DFSIOIntegrationTest.getControlDir(this.mConfig)});
        job.setInputFormat(SequenceFileInputFormat.class);
        job.setMapperClass(mapperClass);
        job.setReducerClass(AccumulatingReducer.class);
        FileOutputFormat.setOutputPath((JobConf)job, (Path)outputDir);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setNumReduceTasks(1);
        JobClient.runJob((JobConf)job);
    }

    private void mapperAppendTest(org.apache.hadoop.fs.FileSystem fs) throws IOException {
        Path appendDir = DFSIOIntegrationTest.getAppendDir(this.mConfig);
        fs.delete(appendDir, true);
        this.runIOTest(AppendMapper.class, appendDir);
    }

    private void mapperReadTest(org.apache.hadoop.fs.FileSystem fs) throws IOException {
        Path readDir = DFSIOIntegrationTest.getReadDir(this.mConfig);
        fs.delete(readDir, true);
        this.runIOTest(ReadMapper.class, readDir);
    }

    private void randomReadTest(org.apache.hadoop.fs.FileSystem fs) throws IOException {
        Path readDir = DFSIOIntegrationTest.getRandomReadDir(this.mConfig);
        fs.delete(readDir, true);
        this.runIOTest(RandomReadMapper.class, readDir);
    }

    private void sequentialTest(org.apache.hadoop.fs.FileSystem fs, TestType testType, long fileSize, int nrFiles) throws IOException {
        IOStatMapper ioer;
        switch (testType) {
            case TEST_TYPE_READ: {
                ioer = new ReadMapper();
                break;
            }
            case TEST_TYPE_WRITE: {
                ioer = new WriteMapper();
                break;
            }
            case TEST_TYPE_APPEND: {
                ioer = new AppendMapper();
                break;
            }
            case TEST_TYPE_READ_RANDOM: 
            case TEST_TYPE_READ_BACKWARD: 
            case TEST_TYPE_READ_SKIP: {
                ioer = new RandomReadMapper();
                break;
            }
            default: {
                return;
            }
        }
        for (int i = 0; i < nrFiles; ++i) {
            ioer.doIO(Reporter.NULL, BASE_FILE_NAME + Integer.toString(i), fileSize);
        }
        ioer.close();
    }

    public static void main(String[] args) {
        int res;
        DFSIOIntegrationTest bench = new DFSIOIntegrationTest();
        try {
            res = ToolRunner.run((Tool)bench, (String[])args);
        }
        catch (Exception e) {
            System.err.print(StringUtils.stringifyException((Throwable)e));
            res = -2;
        }
        if (res == -1) {
            System.err.print(USAGE);
        }
        System.exit(res);
    }

    public int run(String[] args) throws IOException {
        TestType testType = null;
        int bufferSize = 4096;
        long nrBytes = MEGA;
        int nrFiles = 1;
        long skipSize = 0L;
        String resFileName = DEFAULT_RES_FILE_NAME;
        String compressionClass = null;
        boolean isSequential = false;
        String version = DFSIOIntegrationTest.class.getSimpleName() + ".1.7";
        sGenerateReportFile = true;
        LOG.info(version);
        if (args.length == 0) {
            System.err.println("Missing arguments.");
            return -1;
        }
        for (int i = 0; i < args.length; ++i) {
            if (args[i].startsWith("-read")) {
                testType = TestType.TEST_TYPE_READ;
                continue;
            }
            if (args[i].equals("-write")) {
                testType = TestType.TEST_TYPE_WRITE;
                continue;
            }
            if (args[i].equals("-append")) {
                testType = TestType.TEST_TYPE_APPEND;
                continue;
            }
            if (args[i].equals("-random")) {
                if (testType != TestType.TEST_TYPE_READ) {
                    return -1;
                }
                testType = TestType.TEST_TYPE_READ_RANDOM;
                continue;
            }
            if (args[i].equals("-backward")) {
                if (testType != TestType.TEST_TYPE_READ) {
                    return -1;
                }
                testType = TestType.TEST_TYPE_READ_BACKWARD;
                continue;
            }
            if (args[i].equals("-skip")) {
                if (testType != TestType.TEST_TYPE_READ) {
                    return -1;
                }
                testType = TestType.TEST_TYPE_READ_SKIP;
                continue;
            }
            if (args[i].equals("-clean")) {
                testType = TestType.TEST_TYPE_CLEANUP;
                continue;
            }
            if (args[i].startsWith("-seq")) {
                isSequential = true;
                continue;
            }
            if (args[i].startsWith("-compression")) {
                compressionClass = args[++i];
                continue;
            }
            if (args[i].equals("-nrFiles")) {
                nrFiles = Integer.parseInt(args[++i]);
                continue;
            }
            if (args[i].equals("-fileSize") || args[i].equals("-size")) {
                nrBytes = DFSIOIntegrationTest.parseSize(args[++i]);
                continue;
            }
            if (args[i].equals("-skipSize")) {
                skipSize = DFSIOIntegrationTest.parseSize(args[++i]);
                continue;
            }
            if (args[i].equals("-bufferSize")) {
                bufferSize = Integer.parseInt(args[++i]);
                continue;
            }
            if (args[i].equals("-resFile")) {
                resFileName = args[++i];
                continue;
            }
            System.err.println("Illegal argument: " + args[i]);
            return -1;
        }
        if (testType == null) {
            return -1;
        }
        if (testType == TestType.TEST_TYPE_READ_BACKWARD) {
            skipSize = -bufferSize;
        } else if (testType == TestType.TEST_TYPE_READ_SKIP && skipSize == 0L) {
            skipSize = bufferSize;
        }
        LOG.info("nrFiles = " + nrFiles);
        LOG.info("nrBytes (MB) = " + DFSIOIntegrationTest.toMB(nrBytes));
        LOG.info("bufferSize = " + bufferSize);
        if (skipSize > 0L) {
            LOG.info("skipSize = " + skipSize);
        }
        LOG.info("baseDir = " + DFSIOIntegrationTest.getBaseDir(this.mConfig));
        if (compressionClass != null) {
            this.mConfig.set("test.io.compression.class", compressionClass);
            LOG.info("compressionClass = " + compressionClass);
        }
        this.mConfig.setInt("test.io.file.buffer.size", bufferSize);
        this.mConfig.setLong("test.io.skip.size", skipSize);
        this.mConfig.setBoolean("dfs.support.append", true);
        org.apache.hadoop.fs.FileSystem fs = org.apache.hadoop.fs.FileSystem.get((Configuration)this.mConfig);
        if (isSequential) {
            long tStart = System.currentTimeMillis();
            this.sequentialTest(fs, testType, nrBytes, nrFiles);
            long execTime = System.currentTimeMillis() - tStart;
            String resultLine = "Seq Test exec time sec: " + (float)execTime / 1000.0f;
            LOG.info(resultLine);
            return 0;
        }
        if (testType == TestType.TEST_TYPE_CLEANUP) {
            this.cleanup(fs);
            return 0;
        }
        this.createControlFile(fs, nrBytes, nrFiles);
        long tStart = System.currentTimeMillis();
        switch (testType) {
            case TEST_TYPE_WRITE: {
                this.mapperWriteTest(fs);
                break;
            }
            case TEST_TYPE_READ: {
                this.mapperReadTest(fs);
                break;
            }
            case TEST_TYPE_APPEND: {
                this.mapperAppendTest(fs);
                break;
            }
            case TEST_TYPE_READ_RANDOM: 
            case TEST_TYPE_READ_BACKWARD: 
            case TEST_TYPE_READ_SKIP: {
                this.randomReadTest(fs);
                break;
            }
        }
        long execTime = System.currentTimeMillis() - tStart;
        this.analyzeResult(fs, testType, execTime, resFileName);
        return 0;
    }

    public Configuration getConf() {
        return this.mConfig;
    }

    public void setConf(Configuration conf) {
        this.mConfig = conf;
    }

    static long parseSize(String arg) {
        String[] args = arg.split("\\D", 2);
        assert (args.length <= 2);
        long nrBytes = Long.parseLong(args[0]);
        String bytesMult = arg.substring(args[0].length());
        return nrBytes * ByteMultiple.parseString(bytesMult).value();
    }

    static float toMB(long bytes) {
        return (float)bytes / (float)MEGA;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void analyzeResult(org.apache.hadoop.fs.FileSystem fs, TestType testType, long execTime, String resFileName) throws IOException {
        Path reduceFile = this.getReduceFilePath(testType);
        long tasks = 0L;
        long size = 0L;
        long time = 0L;
        float rate = 0.0f;
        float sqrate = 0.0f;
        FilterInputStream in = null;
        BufferedReader lines = null;
        try {
            String line;
            in = new DataInputStream((InputStream)fs.open(reduceFile));
            lines = new BufferedReader(new InputStreamReader(in));
            while ((line = lines.readLine()) != null) {
                StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%");
                String attr = tokens.nextToken();
                if (attr.endsWith(":tasks")) {
                    tasks = Long.parseLong(tokens.nextToken());
                    continue;
                }
                if (attr.endsWith(":size")) {
                    size = Long.parseLong(tokens.nextToken());
                    continue;
                }
                if (attr.endsWith(":time")) {
                    time = Long.parseLong(tokens.nextToken());
                    continue;
                }
                if (attr.endsWith(":rate")) {
                    rate = Float.parseFloat(tokens.nextToken());
                    continue;
                }
                if (!attr.endsWith(":sqrate")) continue;
                sqrate = Float.parseFloat(tokens.nextToken());
            }
        }
        finally {
            if (in != null) {
                in.close();
            }
            if (lines != null) {
                lines.close();
            }
        }
        double med = rate / 1000.0f / (float)tasks;
        double stdDev = Math.sqrt(Math.abs((double)(sqrate / 1000.0f / (float)tasks) - med * med));
        String[] resultLines = new String[]{"----- DFSIOIntegrationTest ----- : " + (Object)((Object)testType), "           Date & time: " + new Date(System.currentTimeMillis()), "       Number of files: " + tasks, "Total MBytes processed: " + DFSIOIntegrationTest.toMB(size), "     Throughput mb/sec: " + (double)size * 1000.0 / (double)(time * MEGA), "Average IO rate mb/sec: " + med, " IO rate std deviation: " + stdDev, "    Test exec time sec: " + (float)execTime / 1000.0f, ""};
        try (PrintStream res = null;){
            if (sGenerateReportFile) {
                res = new PrintStream(new FileOutputStream(new File(resFileName), true));
            }
            for (String resultLine : resultLines) {
                LOG.info(resultLine);
                if (sGenerateReportFile) {
                    res.println(resultLine);
                    continue;
                }
                System.out.println(resultLine);
            }
        }
    }

    private void analyzeResult(org.apache.hadoop.fs.FileSystem fs, TestType testType, long execTime) throws IOException {
        this.analyzeResult(fs, testType, execTime, DEFAULT_RES_FILE_NAME);
    }

    @Nullable
    private Path getReduceFilePath(TestType testType) {
        switch (testType) {
            case TEST_TYPE_WRITE: {
                return new Path(DFSIOIntegrationTest.getWriteDir(this.mConfig), "part-00000");
            }
            case TEST_TYPE_APPEND: {
                return new Path(DFSIOIntegrationTest.getAppendDir(this.mConfig), "part-00000");
            }
            case TEST_TYPE_READ: {
                return new Path(DFSIOIntegrationTest.getReadDir(this.mConfig), "part-00000");
            }
            case TEST_TYPE_READ_RANDOM: 
            case TEST_TYPE_READ_BACKWARD: 
            case TEST_TYPE_READ_SKIP: {
                return new Path(DFSIOIntegrationTest.getRandomReadDir(this.mConfig), "part-00000");
            }
        }
        return null;
    }

    private void cleanup(org.apache.hadoop.fs.FileSystem fs) throws IOException {
        LOG.info("Cleaning up test files");
        fs.delete(new Path(DFSIOIntegrationTest.getBaseDir(this.mConfig)), true);
    }

    static {
        Configuration.addDefaultResource((String)"hdfs-default.xml");
        Configuration.addDefaultResource((String)"hdfs-site.xml");
        Configuration.addDefaultResource((String)"mapred-default.xml");
        Configuration.addDefaultResource((String)"mapred-site.xml");
        sHadoopVersionRule = new HadoopVersionRule();
    }

    public static class RandomReadMapper
    extends IOStatMapper {
        private Random mRnd = new Random();
        private long mFileSize;
        private long mSkipSize;

        @Override
        public void configure(JobConf conf) {
            super.configure(conf);
            this.mSkipSize = conf.getLong("test.io.skip.size", 0L);
        }

        @Override
        public Closeable getIOStream(String name) throws IOException {
            Path filePath = new Path(DFSIOIntegrationTest.getDataDir(this.getConf()), name);
            this.mFileSize = this.mFS.getFileStatus(filePath).getLen();
            FSDataInputStream in = this.mFS.open(filePath);
            if (this.mCompressionCodec != null) {
                in = new FSDataInputStream((InputStream)this.mCompressionCodec.createInputStream((InputStream)in));
            }
            LOG.info("in = " + in.getClass().getName());
            LOG.info("skipSize = " + this.mSkipSize);
            return in;
        }

        @Override
        public Long doIO(Reporter reporter, String name, long totalSize) throws IOException {
            int curSize;
            PositionedReadable in = (PositionedReadable)this.mStream;
            long actualSize = 0L;
            long pos = this.nextOffset(-1L);
            while (actualSize < totalSize && (curSize = in.read(pos, this.mBuffer, 0, this.mBufferSize)) >= 0) {
                reporter.setStatus("reading " + name + "@" + (actualSize += (long)curSize) + "/" + totalSize + " ::host = " + this.mHostname);
                pos = this.nextOffset(pos);
            }
            return actualSize;
        }

        private long nextOffset(long current) {
            if (this.mSkipSize == 0L) {
                return this.mRnd.nextInt((int)this.mFileSize);
            }
            if (this.mSkipSize > 0L) {
                return current < 0L ? 0L : current + (long)this.mBufferSize + this.mSkipSize;
            }
            return current < 0L ? Math.max(0L, this.mFileSize - (long)this.mBufferSize) : Math.max(0L, current + this.mSkipSize);
        }
    }

    public static class ReadMapper
    extends IOStatMapper {
        @Override
        public Closeable getIOStream(String name) throws IOException {
            FSDataInputStream in = this.mFS.open(new Path(DFSIOIntegrationTest.getDataDir(this.getConf()), name));
            if (this.mCompressionCodec != null) {
                in = this.mCompressionCodec.createInputStream((InputStream)in);
            }
            LOG.info("in = " + in.getClass().getName());
            return in;
        }

        @Override
        public Long doIO(Reporter reporter, String name, long totalSize) throws IOException {
            int curSize;
            InputStream in = (InputStream)this.mStream;
            long actualSize = 0L;
            while (actualSize < totalSize && (curSize = in.read(this.mBuffer, 0, this.mBufferSize)) >= 0) {
                reporter.setStatus("reading " + name + "@" + (actualSize += (long)curSize) + "/" + totalSize + " ::host = " + this.mHostname);
            }
            return actualSize;
        }
    }

    public static class AppendMapper
    extends IOStatMapper {
        public AppendMapper() {
            for (int i = 0; i < this.mBufferSize; ++i) {
                this.mBuffer[i] = (byte)(48 + i % 50);
            }
        }

        @Override
        public Closeable getIOStream(String name) throws IOException {
            FSDataOutputStream out = this.mFS.append(new Path(DFSIOIntegrationTest.getDataDir(this.getConf()), name), this.mBufferSize);
            if (this.mCompressionCodec != null) {
                out = this.mCompressionCodec.createOutputStream((OutputStream)out);
            }
            LOG.info("out = " + out.getClass().getName());
            return out;
        }

        @Override
        public Long doIO(Reporter reporter, String name, long totalSize) throws IOException {
            OutputStream out = (OutputStream)this.mStream;
            for (long nrRemaining = totalSize; nrRemaining > 0L; nrRemaining -= (long)this.mBufferSize) {
                int curSize = (long)this.mBufferSize < nrRemaining ? this.mBufferSize : (int)nrRemaining;
                out.write(this.mBuffer, 0, curSize);
                reporter.setStatus("writing " + name + "@" + (totalSize - nrRemaining) + "/" + totalSize + " ::host = " + this.mHostname);
            }
            return totalSize;
        }
    }

    public static class WriteMapper
    extends IOStatMapper {
        public WriteMapper() {
            for (int i = 0; i < this.mBufferSize; ++i) {
                this.mBuffer[i] = (byte)(48 + i % 50);
            }
        }

        @Override
        public Closeable getIOStream(String name) throws IOException {
            FSDataOutputStream out = this.mFS.create(new Path(DFSIOIntegrationTest.getDataDir(this.getConf()), name), true, this.mBufferSize);
            if (this.mCompressionCodec != null) {
                out = this.mCompressionCodec.createOutputStream((OutputStream)out);
            }
            LOG.info("out = " + out.getClass().getName());
            return out;
        }

        @Override
        public Long doIO(Reporter reporter, String name, long totalSize) throws IOException {
            OutputStream out = (OutputStream)this.mStream;
            for (long nrRemaining = totalSize; nrRemaining > 0L; nrRemaining -= (long)this.mBufferSize) {
                int curSize = (long)this.mBufferSize < nrRemaining ? this.mBufferSize : (int)nrRemaining;
                out.write(this.mBuffer, 0, curSize);
                reporter.setStatus("writing " + name + "@" + (totalSize - nrRemaining) + "/" + totalSize + " ::host = " + this.mHostname);
            }
            return totalSize;
        }
    }

    private static abstract class IOStatMapper
    extends AbstractIOMapper<Long> {
        protected CompressionCodec mCompressionCodec;

        IOStatMapper() {
        }

        @Override
        public void configure(JobConf conf) {
            Class<CompressionCodec> codec;
            super.configure(conf);
            String compression = this.getConf().get("test.io.compression.class", null);
            try {
                codec = compression == null ? null : Class.forName(compression).asSubclass(CompressionCodec.class);
            }
            catch (Exception e) {
                throw new RuntimeException("Compression codec not found: ", e);
            }
            if (codec != null) {
                this.mCompressionCodec = (CompressionCodec)ReflectionUtils.newInstance(codec, (Configuration)this.getConf());
            }
        }

        @Override
        void collectStats(OutputCollector<Text, Text> output, String name, long execTime, Long objSize) throws IOException {
            long totalSize = objSize;
            float ioRateMbSec = (float)totalSize * 1000.0f / (float)(execTime * MEGA);
            LOG.info("Number of bytes processed = " + totalSize);
            LOG.info("Exec time = " + execTime);
            LOG.info("IO rate = " + ioRateMbSec);
            output.collect((Object)new Text("l:tasks"), (Object)new Text(String.valueOf(1)));
            output.collect((Object)new Text("l:size"), (Object)new Text(String.valueOf(totalSize)));
            output.collect((Object)new Text("l:time"), (Object)new Text(String.valueOf(execTime)));
            output.collect((Object)new Text("f:rate"), (Object)new Text(String.valueOf(ioRateMbSec * 1000.0f)));
            output.collect((Object)new Text("f:sqrate"), (Object)new Text(String.valueOf(ioRateMbSec * ioRateMbSec * 1000.0f)));
        }
    }

    static enum ByteMultiple {
        B(1L),
        KB(1024L),
        MB(0x100000L),
        GB(0x40000000L),
        TB(0x10000000000L);

        private long mMultiplier;

        private ByteMultiple(long mult) {
            this.mMultiplier = mult;
        }

        long value() {
            return this.mMultiplier;
        }

        static ByteMultiple parseString(String sMultiple) {
            if (sMultiple == null || sMultiple.isEmpty()) {
                return MB;
            }
            String sMU = sMultiple.toUpperCase();
            if (B.name().toUpperCase().endsWith(sMU)) {
                return B;
            }
            if (KB.name().toUpperCase().endsWith(sMU)) {
                return KB;
            }
            if (MB.name().toUpperCase().endsWith(sMU)) {
                return MB;
            }
            if (GB.name().toUpperCase().endsWith(sMU)) {
                return GB;
            }
            if (TB.name().toUpperCase().endsWith(sMU)) {
                return TB;
            }
            throw new IllegalArgumentException("Unsupported ByteMultiple " + sMultiple);
        }
    }

    private static enum TestType {
        TEST_TYPE_READ("read"),
        TEST_TYPE_WRITE("write"),
        TEST_TYPE_CLEANUP("cleanup"),
        TEST_TYPE_APPEND("append"),
        TEST_TYPE_READ_RANDOM("random read"),
        TEST_TYPE_READ_BACKWARD("backward read"),
        TEST_TYPE_READ_SKIP("skip read");

        private String mType;

        private TestType(String t) {
            this.mType = t;
        }

        public String toString() {
            return this.mType;
        }
    }

    private static class HadoopVersionRule
    implements TestRule {
        private HadoopVersionRule() {
        }

        public Statement apply(final Statement base, Description description) {
            return new Statement(){

                public void evaluate() throws Throwable {
                    String hadoopVersion = System.getProperty("alluxio.hadoop.version");
                    if (hadoopVersion != null && (hadoopVersion.startsWith("2.4") || hadoopVersion.startsWith("2.5") || hadoopVersion.startsWith("2.6"))) {
                        throw new AssumptionViolatedException("Hadoop version not supported. Skipping test!");
                    }
                    base.evaluate();
                }
            };
        }
    }
}

