/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred;

import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.util.Arrays;
import java.util.Collection;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.MiniMRClientCluster;
import org.apache.hadoop.mapred.MiniMRClientClusterFactory;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.Utils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=Parameterized.class)
public class TestMRIntermediateDataEncryption {
    private static final Logger LOG = LoggerFactory.getLogger(TestMRIntermediateDataEncryption.class);
    private static final String JVM_SECURITY_EGD_OPT = "-Djava.security.egd=file:/dev/./urandom";
    private static final Path INPUT_DIR = new Path("/test/input");
    private static final Path OUTPUT = new Path("/test/output");
    private static final int NUM_LINES = 1000;
    private static MiniMRClientCluster mrCluster = null;
    private static MiniDFSCluster dfsCluster = null;
    private static FileSystem fs = null;
    private static final int NUM_NODES = 2;
    private final String testTitle;
    private final int numMappers;
    private final int numReducers;
    private final boolean isUber;

    @Parameterized.Parameters(name="{index}: TestMRIntermediateDataEncryption.{0} .. mappers:{1}, reducers:{2}, isUber:{3})")
    public static Collection<Object[]> getTestParameters() {
        return Arrays.asList({"testSingleReducer", 3, 1, false}, {"testUberMode", 3, 1, true}, {"testMultipleMapsPerNode", 8, 1, false}, {"testMultipleReducers", 2, 4, false});
    }

    public TestMRIntermediateDataEncryption(String testName, int mappers, int reducers, boolean uberEnabled) {
        this.testTitle = testName;
        this.numMappers = mappers;
        this.numReducers = reducers;
        this.isUber = uberEnabled;
    }

    @BeforeClass
    public static void setupClass() throws Exception {
        Configuration conf = new Configuration();
        conf.setBoolean("mapreduce.job.encrypted-intermediate-data", true);
        conf.set("yarn.app.mapreduce.am.admin-command-opts", JVM_SECURITY_EGD_OPT);
        String childJVMOpts = "-Djava.security.egd=file:/dev/./urandom " + conf.get("mapred.child.java.opts", " ");
        conf.set("mapred.child.java.opts", childJVMOpts);
        dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
        mrCluster = MiniMRClientClusterFactory.create(TestMRIntermediateDataEncryption.class, 2, conf);
        mrCluster.start();
    }

    @AfterClass
    public static void tearDown() throws IOException {
        if (fs != null) {
            fs.close();
        }
        if (mrCluster != null) {
            mrCluster.stop();
        }
        if (dfsCluster != null) {
            dfsCluster.shutdown();
        }
    }

    @Before
    public void setup() throws Exception {
        LOG.info("Starting TestMRIntermediateDataEncryption#{}.......", (Object)this.testTitle);
        fs = dfsCluster.getFileSystem();
        if (fs.exists(INPUT_DIR) && !fs.delete(INPUT_DIR, true)) {
            throw new IOException("Could not delete " + INPUT_DIR);
        }
        if (fs.exists(OUTPUT) && !fs.delete(OUTPUT, true)) {
            throw new IOException("Could not delete " + OUTPUT);
        }
        this.createInput(fs, this.numMappers, 1000);
    }

    @After
    public void cleanup() throws IOException {
        if (fs != null) {
            if (fs.exists(OUTPUT)) {
                fs.delete(OUTPUT, true);
            }
            if (fs.exists(INPUT_DIR)) {
                fs.delete(INPUT_DIR, true);
            }
        }
    }

    @Test(timeout=600000L)
    public void testMerge() throws Exception {
        JobConf job = new JobConf(mrCluster.getConfig());
        job.setJobName("Test");
        JobClient client = new JobClient(job);
        RunningJob submittedJob = null;
        FileInputFormat.setInputPaths((JobConf)job, (Path[])new Path[]{INPUT_DIR});
        FileOutputFormat.setOutputPath((JobConf)job, (Path)OUTPUT);
        job.set("mapreduce.output.textoutputformat.separator", " ");
        job.setInputFormat(TextInputFormat.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setMapperClass(MyMapper.class);
        job.setPartitionerClass(MyPartitioner.class);
        job.setOutputFormat(TextOutputFormat.class);
        job.setNumReduceTasks(this.numReducers);
        job.setInt("mapreduce.map.maxattempts", 1);
        job.setInt("mapreduce.reduce.maxattempts", 1);
        job.setInt("mapred.test.num_lines", 1000);
        job.setBoolean("mapreduce.job.ubertask.enable", this.isUber);
        job.setBoolean("mapreduce.job.encrypted-intermediate-data", true);
        submittedJob = client.submitJob(job);
        submittedJob.waitForCompletion();
        Assert.assertTrue((String)"The submitted job is completed", (boolean)submittedJob.isComplete());
        Assert.assertTrue((String)"The submitted job is successful", (boolean)submittedJob.isSuccessful());
        this.verifyOutput(fs, this.numMappers, 1000);
        client.close();
        Thread.sleep(1000L);
    }

    private void createInput(FileSystem filesystem, int mappers, int numLines) throws Exception {
        for (int i = 0; i < mappers; ++i) {
            FSDataOutputStream os = filesystem.create(new Path(INPUT_DIR, "input_" + i + ".txt"));
            OutputStreamWriter writer = new OutputStreamWriter((OutputStream)os);
            for (int j = 0; j < numLines; ++j) {
                int k = j + 1;
                String formattedNumber = String.format("%09d", k);
                writer.write(formattedNumber + " " + formattedNumber + "\n");
            }
            ((Writer)writer).close();
            os.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void verifyOutput(FileSystem fileSystem, int mappers, int numLines) throws Exception {
        Path[] fileList;
        FSDataInputStream dis = null;
        long numValidRecords = 0L;
        long numInvalidRecords = 0L;
        String prevKeyValue = "000000000";
        for (Path outFile : fileList = FileUtil.stat2Paths((FileStatus[])fileSystem.listStatus(OUTPUT, (PathFilter)new Utils.OutputFileUtils.OutputFilesFilter()))) {
            try {
                String record;
                dis = fileSystem.open(outFile);
                while ((record = dis.readLine()) != null) {
                    int blankPos = record.indexOf(" ");
                    String keyString = record.substring(0, blankPos);
                    String valueString = record.substring(blankPos + 1);
                    if (keyString.compareTo(prevKeyValue) >= 0 && keyString.equals(valueString)) {
                        prevKeyValue = keyString;
                        ++numValidRecords;
                        continue;
                    }
                    ++numInvalidRecords;
                }
            }
            finally {
                if (dis != null) {
                    dis.close();
                    dis = null;
                }
            }
        }
        Assert.assertEquals((long)(mappers * numLines), (long)numValidRecords);
        Assert.assertEquals((long)0L, (long)numInvalidRecords);
    }

    static class MyPartitioner
    implements Partitioner<Text, Text> {
        private JobConf job;

        public void configure(JobConf job) {
            this.job = job;
        }

        public int getPartition(Text key, Text value, int numPartitions) {
            int keyValue = 0;
            try {
                keyValue = Integer.parseInt(key.toString());
            }
            catch (NumberFormatException nfe) {
                keyValue = 0;
            }
            int partitionNumber = numPartitions * Math.max(0, keyValue - 1) / this.job.getInt("mapred.test.num_lines", 10000);
            return partitionNumber;
        }
    }

    public static class MyMapper
    extends MapReduceBase
    implements Mapper<LongWritable, Text, Text, Text> {
        private Text keyText = new Text();
        private Text valueText = new Text();

        public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
            String record = value.toString();
            int blankPos = record.indexOf(" ");
            this.keyText.set(record.substring(0, blankPos));
            this.valueText.set(record.substring(blankPos + 1));
            output.collect((Object)this.keyText, (Object)this.valueText);
        }

        public void close() throws IOException {
        }
    }
}

