/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.streaming;

import java.io.File;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.TaskID;
import org.apache.hadoop.mapred.TaskLog;
import org.apache.hadoop.mapred.TaskReport;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MapReduceTestUtil;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.streaming.StreamJob;
import org.apache.hadoop.streaming.TestStreaming;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class TestStreamingStatus {
    protected static String TEST_ROOT_DIR = new File(System.getProperty("test.build.data", "/tmp"), TestStreamingStatus.class.getSimpleName()).toURI().toString().replace(' ', '+');
    protected String INPUT_FILE = TEST_ROOT_DIR + "/input.txt";
    protected String OUTPUT_DIR = TEST_ROOT_DIR + "/out";
    protected String input = "roses.are.red\nviolets.are.blue\nbunnies.are.pink\n";
    protected String map = null;
    protected String reduce = null;
    protected String scriptFile = TEST_ROOT_DIR + "/perlScript.pl";
    protected String scriptFileName = new Path(this.scriptFile).toUri().getPath();
    String expectedStderr = "my error msg before consuming input\nmy error msg after consuming input\n";
    String expectedOutput = null;
    String expectedStatus = "before consuming input";
    protected String script = "#!/usr/bin/perl\nprint STDERR \"reporter:status:" + this.expectedStatus + "\\n\";\nprint STDERR \"reporter:counter:myOwnCounterGroup,myOwnCounter,1\\n\";\nprint STDERR \"my error msg before consuming input\\n\";\nfor($count = 1500; $count >= 1; $count--) {print STDOUT \"$count \";}while(<STDIN>) {chomp;}\nprint STDERR \"my error msg after consuming input\\n\";\nprint STDERR \"reporter:counter:myOwnCounterGroup,myOwnCounter,1\\n\";\n";
    MiniMRCluster mr = null;
    FileSystem fs = null;
    JobConf conf = null;

    @Before
    public void setUp() throws IOException {
        this.conf = new JobConf();
        this.conf.setBoolean("mapreduce.jobtracker.retirejobs", false);
        this.conf.setBoolean("mapreduce.jobtracker.persist.jobstatus.active", false);
        this.mr = new MiniMRCluster(1, "file:///", 3, null, null, this.conf);
        Path inFile = new Path(this.INPUT_FILE);
        this.fs = inFile.getFileSystem((Configuration)this.mr.createJobConf());
        this.clean(this.fs);
        this.buildExpectedJobOutput();
    }

    @After
    public void tearDown() {
        if (this.fs != null) {
            this.clean(this.fs);
        }
        if (this.mr != null) {
            this.mr.shutdown();
        }
    }

    void buildExpectedJobOutput() {
        if (this.expectedOutput == null) {
            this.expectedOutput = "";
            for (int i = 1500; i >= 1; --i) {
                this.expectedOutput = this.expectedOutput.concat(Integer.toString(i) + " ");
            }
            this.expectedOutput = this.expectedOutput.trim();
        }
    }

    protected void createInputAndScript(boolean isEmptyInput, String script) throws IOException {
        this.makeInput(this.fs, isEmptyInput ? "" : this.input);
        FSDataOutputStream file = this.fs.create(new Path(this.scriptFileName));
        file.writeBytes(script);
        file.close();
    }

    protected String[] genArgs(String jobtracker, String mapper, String reducer) {
        return new String[]{"-input", this.INPUT_FILE, "-output", this.OUTPUT_DIR, "-mapper", mapper, "-reducer", reducer, "-jobconf", "mapreduce.job.maps=1", "-jobconf", "mapreduce.job.reduces=1", "-jobconf", "mapreduce.task.files.preserve.failedtasks=true", "-jobconf", "stream.tmpdir=" + new Path(TEST_ROOT_DIR).toUri().getPath(), "-jobconf", "mapreduce.jobtracker.address=" + jobtracker, "-jobconf", "fs.default.name=file:///", "-jobconf", "mapred.jar=" + TestStreaming.STREAMING_JAR, "-jobconf", "mapreduce.framework.name=yarn"};
    }

    public void makeInput(FileSystem fs, String input) throws IOException {
        Path inFile = new Path(this.INPUT_FILE);
        FSDataOutputStream file = fs.create(inFile);
        file.writeBytes(input);
        file.close();
    }

    protected void deleteOutDir(FileSystem fs) {
        try {
            Path outDir = new Path(this.OUTPUT_DIR);
            fs.delete(outDir, true);
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    public void clean(FileSystem fs) {
        this.deleteOutDir(fs);
        try {
            Path file = new Path(this.INPUT_FILE);
            if (fs.exists(file)) {
                fs.delete(file, false);
            }
            if (fs.exists(file = new Path(this.scriptFile))) {
                fs.delete(file, false);
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Test
    public void testReporting() throws Exception {
        this.testStreamJob(false);
        this.testStreamJob(true);
    }

    private void testStreamJob(boolean isEmptyInput) throws IOException {
        this.createInputAndScript(isEmptyInput, this.script);
        this.map = this.scriptFileName;
        this.reduce = "/bin/cat";
        this.runStreamJob(TaskType.MAP, isEmptyInput);
        this.deleteOutDir(this.fs);
        this.map = "/bin/cat";
        this.reduce = this.scriptFileName;
        this.runStreamJob(TaskType.REDUCE, isEmptyInput);
        this.clean(this.fs);
    }

    void runStreamJob(TaskType type, boolean isEmptyInput) throws IOException {
        boolean mayExit = false;
        StreamJob job = new StreamJob(this.genArgs(this.mr.createJobConf().get("mapreduce.jobtracker.address"), this.map, this.reduce), mayExit);
        int returnValue = job.go();
        Assert.assertEquals((long)0L, (long)returnValue);
        int expectedCounterValue = 0;
        if (type == TaskType.MAP || !isEmptyInput) {
            this.validateTaskStatus(job, type);
            this.validateJobOutput(job.getConf());
            expectedCounterValue = 2;
        }
        this.validateUserCounter(job, expectedCounterValue);
        this.validateTaskStderr(job, type);
        this.deleteOutDir(this.fs);
    }

    void validateTaskStatus(StreamJob job, TaskType type) throws IOException {
        String finalPhaseInTask;
        TaskReport[] reports;
        if (type == TaskType.MAP) {
            reports = job.jc_.getMapTaskReports(job.jobId_);
            finalPhaseInTask = "sort";
        } else {
            reports = job.jc_.getReduceTaskReports(job.jobId_);
            finalPhaseInTask = "reduce";
        }
        Assert.assertEquals((long)1L, (long)reports.length);
        Assert.assertEquals((Object)(this.expectedStatus + " > " + finalPhaseInTask), (Object)reports[0].getState());
    }

    void validateJobOutput(Configuration conf) throws IOException {
        String output = MapReduceTestUtil.readOutput((Path)new Path(this.OUTPUT_DIR), (Configuration)conf).trim();
        Assert.assertTrue((boolean)output.equals(this.expectedOutput));
    }

    void validateTaskStderr(StreamJob job, TaskType type) throws IOException {
        TaskAttemptID attemptId = new TaskAttemptID(new TaskID((JobID)job.jobId_, type, 0), 0);
        String log = MapReduceTestUtil.readTaskLog((TaskLog.LogName)TaskLog.LogName.STDERR, (TaskAttemptID)attemptId, (boolean)false);
        Assert.assertTrue((boolean)log.equals(this.expectedStderr.trim()));
    }

    void validateUserCounter(StreamJob job, int expectedCounterValue) throws IOException {
        Counters counters = job.running_.getCounters();
        Assert.assertEquals((long)expectedCounterValue, (long)counters.findCounter("myOwnCounterGroup", "myOwnCounter").getValue());
    }
}

