/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.balancer;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.balancer.Balancer;
import org.apache.hadoop.hdfs.server.balancer.BalancerParameters;
import org.apache.hadoop.hdfs.server.balancer.BalancingPolicy;
import org.apache.hadoop.hdfs.server.balancer.ExitStatus;
import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;

public class TestBalancerWithMultipleNameNodes {
    static final Logger LOG = Balancer.LOG;
    private static final long CAPACITY = 500L;
    private static final String RACK0 = "/rack0";
    private static final String RACK1 = "/rack1";
    private static final String RACK2 = "/rack2";
    private static final String FILE_NAME = "/tmp.txt";
    private static final Path FILE_PATH = new Path("/tmp.txt");
    private static final Random RANDOM = new Random();

    public TestBalancerWithMultipleNameNodes() {
        GenericTestUtils.setLogLevel((Logger)LOG, (org.slf4j.event.Level)org.slf4j.event.Level.TRACE);
        DFSTestUtil.setNameNodeLogLevel(Level.TRACE);
    }

    private static void createFile(Suite s, int index, long len) throws IOException, InterruptedException, TimeoutException {
        DistributedFileSystem fs = s.cluster.getFileSystem(index);
        DFSTestUtil.createFile((FileSystem)fs, FILE_PATH, len, s.replication, RANDOM.nextLong());
        DFSTestUtil.waitReplication((FileSystem)fs, FILE_PATH, s.replication);
    }

    private static ExtendedBlock[][] generateBlocks(Suite s, long size) throws IOException, InterruptedException, TimeoutException {
        ExtendedBlock[][] blocks = new ExtendedBlock[s.clients.length][];
        for (int n = 0; n < s.clients.length; ++n) {
            TestBalancerWithMultipleNameNodes.createFile(s, n, size);
            List locatedBlocks = s.clients[n].getBlockLocations(FILE_NAME, 0L, size).getLocatedBlocks();
            int numOfBlocks = locatedBlocks.size();
            blocks[n] = new ExtendedBlock[numOfBlocks];
            for (int i = 0; i < numOfBlocks; ++i) {
                ExtendedBlock b = ((LocatedBlock)locatedBlocks.get(i)).getBlock();
                blocks[n][i] = new ExtendedBlock(b.getBlockPoolId(), b.getBlockId(), b.getNumBytes(), b.getGenerationStamp());
            }
        }
        return blocks;
    }

    static void wait(ClientProtocol[] clients, long expectedUsedSpace, long expectedTotalSpace) throws IOException {
        LOG.info("WAIT expectedUsedSpace=" + expectedUsedSpace + ", expectedTotalSpace=" + expectedTotalSpace);
        for (int n = 0; n < clients.length; ++n) {
            int i = 0;
            boolean done = false;
            while (!done) {
                long[] s = clients[n].getStats();
                done = s[0] == expectedTotalSpace && s[1] == expectedUsedSpace;
                if (done) continue;
                TestBalancerWithMultipleNameNodes.sleep(100L);
                if (++i % 100 != 0) continue;
                LOG.warn("WAIT i=" + i + ", s=[" + s[0] + ", " + s[1] + "]");
            }
        }
    }

    static void runBalancer(Suite s, long totalUsed, long totalCapacity) throws Exception {
        double avg = (double)totalUsed * 100.0 / (double)totalCapacity;
        LOG.info("BALANCER 0: totalUsed=" + totalUsed + ", totalCapacity=" + totalCapacity + ", avg=" + avg);
        TestBalancerWithMultipleNameNodes.wait(s.clients, totalUsed, totalCapacity);
        LOG.info("BALANCER 1");
        Map<Integer, DatanodeStorageReport[]> preBalancerPoolUsages = TestBalancerWithMultipleNameNodes.getStorageReports(s);
        Collection namenodes = DFSUtil.getInternalNsRpcUris((Configuration)s.conf);
        int r = Balancer.run((Collection)namenodes, (BalancerParameters)s.parameters, (Configuration)s.conf);
        Assert.assertEquals((long)ExitStatus.SUCCESS.getExitCode(), (long)r);
        LOG.info("BALANCER 2");
        TestBalancerWithMultipleNameNodes.wait(s.clients, totalUsed, totalCapacity);
        LOG.info("BALANCER 3");
        int i = 0;
        boolean balanced = false;
        while (!balanced) {
            long[] used = new long[s.cluster.getDataNodes().size()];
            long[] cap = new long[used.length];
            long[][] bpUsed = new long[s.clients.length][s.cluster.getDataNodes().size()];
            for (int n = 0; n < s.clients.length; ++n) {
                DatanodeInfo[] datanodes = s.clients[n].getDatanodeReport(HdfsConstants.DatanodeReportType.ALL);
                Assert.assertEquals((long)datanodes.length, (long)used.length);
                for (int d = 0; d < datanodes.length; ++d) {
                    if (n == 0) {
                        used[d] = datanodes[d].getDfsUsed();
                        cap[d] = datanodes[d].getCapacity();
                        if (i % 100 == 0) {
                            LOG.warn("datanodes[" + d + "]: getDfsUsed()=" + datanodes[d].getDfsUsed() + ", getCapacity()=" + datanodes[d].getCapacity());
                        }
                    } else {
                        Assert.assertEquals((long)used[d], (long)datanodes[d].getDfsUsed());
                        Assert.assertEquals((long)cap[d], (long)datanodes[d].getCapacity());
                    }
                    bpUsed[n][d] = datanodes[d].getBlockPoolUsed();
                }
            }
            balanced = true;
            for (int d = 0; d < used.length; ++d) {
                double p;
                if (s.parameters.getBalancingPolicy() == BalancingPolicy.Pool.INSTANCE) {
                    for (int k = 0; k < s.parameters.getBlockPools().size(); ++k) {
                        p = (double)bpUsed[k][d] * 100.0 / (double)cap[d];
                        avg = TestBalancer.sum(bpUsed[k]) * 100L / totalCapacity;
                        boolean bl = balanced = p <= avg + s.parameters.getThreshold();
                        if (balanced) continue;
                        if (i % 100 == 0) {
                            LOG.warn("datanodes " + d + " is not yet balanced: block pool used=" + bpUsed[d][k] + ", cap=" + cap[d] + ", avg=" + avg);
                            LOG.warn("sum(blockpoolUsed)=" + TestBalancer.sum(bpUsed[k]) + ", sum(cap)=" + TestBalancer.sum(cap));
                        }
                        TestBalancerWithMultipleNameNodes.sleep(100L);
                        break;
                    }
                    if (balanced) continue;
                    break;
                }
                p = (double)used[d] * 100.0 / (double)cap[d];
                boolean bl = balanced = p <= avg + s.parameters.getThreshold();
                if (balanced) continue;
                if (i % 100 == 0) {
                    LOG.warn("datanodes " + d + " is not yet balanced: used=" + used[d] + ", cap=" + cap[d] + ", avg=" + avg);
                    LOG.warn("sum(used)=" + TestBalancer.sum(used) + ", sum(cap)=" + TestBalancer.sum(cap));
                }
                TestBalancerWithMultipleNameNodes.sleep(100L);
                break;
            }
            ++i;
        }
        LOG.info("BALANCER 6");
        Map<Integer, DatanodeStorageReport[]> postBalancerPoolUsages = TestBalancerWithMultipleNameNodes.getStorageReports(s);
        Assert.assertEquals((long)preBalancerPoolUsages.size(), (long)postBalancerPoolUsages.size());
        for (Map.Entry<Integer, DatanodeStorageReport[]> entry : preBalancerPoolUsages.entrySet()) {
            TestBalancerWithMultipleNameNodes.compareTotalPoolUsage(entry.getValue(), postBalancerPoolUsages.get(entry.getKey()));
        }
    }

    private static void compareTotalPoolUsage(DatanodeStorageReport[] preReports, DatanodeStorageReport[] postReports) {
        Assert.assertNotNull((Object)preReports);
        Assert.assertNotNull((Object)postReports);
        Assert.assertEquals((long)preReports.length, (long)postReports.length);
        block0: for (DatanodeStorageReport preReport : preReports) {
            String dnUuid = preReport.getDatanodeInfo().getDatanodeUuid();
            for (DatanodeStorageReport postReport : postReports) {
                if (!postReport.getDatanodeInfo().getDatanodeUuid().equals(dnUuid)) continue;
                Assert.assertEquals((long)TestBalancerWithMultipleNameNodes.getTotalPoolUsage(preReport), (long)TestBalancerWithMultipleNameNodes.getTotalPoolUsage(postReport));
                LOG.info("Comparision of datanode pool usage pre/post balancer run. PrePoolUsage: " + TestBalancerWithMultipleNameNodes.getTotalPoolUsage(preReport) + ", PostPoolUsage: " + TestBalancerWithMultipleNameNodes.getTotalPoolUsage(postReport));
                continue block0;
            }
        }
    }

    private static long getTotalPoolUsage(DatanodeStorageReport report) {
        long usage = 0L;
        for (StorageReport sr : report.getStorageReports()) {
            usage += sr.getBlockPoolUsed();
        }
        return usage;
    }

    private static Map<Integer, DatanodeStorageReport[]> getStorageReports(Suite s) throws IOException {
        HashMap<Integer, DatanodeStorageReport[]> reports = new HashMap<Integer, DatanodeStorageReport[]>();
        if (s.parameters.getBlockPools().size() == 0) {
            return Collections.emptyMap();
        }
        for (int i = 0; i < s.clients.length; ++i) {
            if (s.parameters.getBlockPools().contains(s.cluster.getNamesystem(i).getBlockPoolId())) continue;
            LOG.info("Tracking usage of blockpool id: " + s.cluster.getNamesystem(i).getBlockPoolId());
            reports.put(i, s.clients[i].getDatanodeStorageReport(HdfsConstants.DatanodeReportType.LIVE));
        }
        LOG.info("Tracking " + reports.size() + " blockpool(s) for pre/post balancer usage.");
        return reports;
    }

    private static void sleep(long ms) {
        try {
            Thread.sleep(ms);
        }
        catch (InterruptedException e) {
            LOG.error("{}", (Throwable)e);
        }
    }

    private static Configuration createConf() {
        HdfsConfiguration conf = new HdfsConfiguration();
        TestBalancer.initConf((Configuration)conf);
        return conf;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void unevenDistribution(int nNameNodes, int nNameNodesToBalance, long[] distributionPerNN, long[] capacities, String[] racks, Configuration conf) throws Exception {
        ExtendedBlock[][] blocks;
        LOG.info("UNEVEN 0");
        int nDataNodes = distributionPerNN.length;
        if (capacities.length != nDataNodes || racks.length != nDataNodes) {
            throw new IllegalArgumentException("Array length is not the same");
        }
        if (nNameNodesToBalance > nNameNodes) {
            throw new IllegalArgumentException("Number of namenodes to balance is greater than the number of namenodes.");
        }
        long usedSpacePerNN = TestBalancer.sum(distributionPerNN);
        LOG.info("UNEVEN 1");
        MiniDFSCluster cluster = new MiniDFSCluster.Builder(new Configuration(conf)).nnTopology(MiniDFSNNTopology.simpleFederatedTopology(nNameNodes)).numDataNodes(nDataNodes).racks(racks).simulatedCapacities(capacities).build();
        LOG.info("UNEVEN 2");
        try {
            cluster.waitActive();
            DFSTestUtil.setFederatedConfiguration(cluster, conf);
            LOG.info("UNEVEN 3");
            Suite s = new Suite(cluster, nNameNodes, nDataNodes, null, conf);
            blocks = TestBalancerWithMultipleNameNodes.generateBlocks(s, usedSpacePerNN);
            LOG.info("UNEVEN 4");
        }
        finally {
            cluster.shutdown();
        }
        conf.set("dfs.namenode.safemode.threshold-pct", "0.0f");
        long[] newCapacities = new long[nDataNodes];
        for (int i = 0; i < nDataNodes; ++i) {
            newCapacities[i] = capacities[i] * (long)nNameNodes;
        }
        LOG.info("UNEVEN 10");
        MiniDFSCluster cluster2 = new MiniDFSCluster.Builder(conf).nnTopology(MiniDFSNNTopology.simpleFederatedTopology(nNameNodes)).numDataNodes(nDataNodes).racks(racks).simulatedCapacities(newCapacities).format(false).build();
        LOG.info("UNEVEN 11");
        try {
            cluster2.waitActive();
            LOG.info("UNEVEN 12");
            HashSet<String> blockpools = new HashSet<String>();
            for (int i = 0; i < nNameNodesToBalance; ++i) {
                blockpools.add(cluster2.getNamesystem(i).getBlockPoolId());
            }
            BalancerParameters.Builder b = new BalancerParameters.Builder();
            b.setBlockpools(blockpools);
            BalancerParameters params = b.build();
            Suite s = new Suite(cluster2, nNameNodes, nDataNodes, params, conf);
            for (int n = 0; n < nNameNodes; ++n) {
                Block[][] blocksDN = TestBalancer.distributeBlocks(blocks[n], s.replication, distributionPerNN);
                for (int d = 0; d < blocksDN.length; ++d) {
                    cluster2.injectBlocks(n, d, Arrays.asList(blocksDN[d]));
                }
                LOG.info("UNEVEN 13: n=" + n);
            }
            long totalCapacity = TestBalancer.sum(newCapacities);
            long totalUsed = (long)nNameNodes * usedSpacePerNN;
            LOG.info("UNEVEN 14");
            TestBalancerWithMultipleNameNodes.runBalancer(s, totalUsed, totalCapacity);
            LOG.info("UNEVEN 15");
        }
        finally {
            cluster2.shutdown();
        }
        LOG.info("UNEVEN 16");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void runTest(int nNameNodes, String[] racks, String[] newRack, Configuration conf, int nNameNodestoBalance, BalancerParameters balancerParameters) throws Exception {
        int nDataNodes = racks.length;
        long[] capacities = new long[nDataNodes];
        Arrays.fill(capacities, 500L);
        LOG.info("nNameNodes=" + nNameNodes + ", nDataNodes=" + nDataNodes);
        Assert.assertEquals((long)nDataNodes, (long)racks.length);
        LOG.info("RUN_TEST -1: start a cluster with nNameNodes=" + nNameNodes + ", nDataNodes=" + nDataNodes);
        MiniDFSCluster cluster = new MiniDFSCluster.Builder(new Configuration(conf)).nnTopology(MiniDFSNNTopology.simpleFederatedTopology(nNameNodes)).numDataNodes(nDataNodes).racks(racks).simulatedCapacities(capacities).build();
        LOG.info("RUN_TEST 0");
        DFSTestUtil.setFederatedConfiguration(cluster, conf);
        try {
            Suite s;
            cluster.waitActive();
            LOG.info("RUN_TEST 1");
            HashSet<String> blockpools = new HashSet<String>();
            if (balancerParameters == null) {
                s = new Suite(cluster, nNameNodes, nDataNodes, BalancerParameters.DEFAULT, conf);
            } else {
                for (int i = 0; i < nNameNodestoBalance; ++i) {
                    blockpools.add(cluster.getNamesystem(i).getBlockPoolId());
                }
                BalancerParameters.Builder b = new BalancerParameters.Builder();
                b.setBalancingPolicy(balancerParameters.getBalancingPolicy());
                b.setBlockpools(blockpools);
                BalancerParameters params = b.build();
                s = new Suite(cluster, nNameNodes, nDataNodes, params, conf, 2);
            }
            long totalCapacity = TestBalancer.sum(capacities);
            LOG.info("RUN_TEST 2: create files");
            long totalUsed = totalCapacity * (long)s.replication * 3L / 10L;
            long size = totalUsed / (long)nNameNodes / (long)s.replication;
            for (int n = 0; n < nNameNodes; ++n) {
                TestBalancerWithMultipleNameNodes.createFile(s, n, size);
            }
            LOG.info("RUN_TEST 3: " + newRack.length + " new datanodes");
            long[] newCapacity = new long[newRack.length];
            Arrays.fill(newCapacity, 500L);
            cluster.startDataNodes(conf, newCapacity.length, true, null, newRack, newCapacity);
            LOG.info("RUN_TEST 4: run Balancer");
            TestBalancerWithMultipleNameNodes.runBalancer(s, totalUsed, totalCapacity += TestBalancer.sum(newCapacity));
            LOG.info("RUN_TEST 5");
        }
        finally {
            cluster.shutdown();
        }
        LOG.info("RUN_TEST 6: done");
    }

    @Test
    public void testTwoOneOne() throws Exception {
        Configuration conf = TestBalancerWithMultipleNameNodes.createConf();
        this.runTest(2, new String[]{RACK0}, new String[]{RACK0}, conf, 2, null);
    }

    @Test
    public void testUnevenDistribution() throws Exception {
        Configuration conf = TestBalancerWithMultipleNameNodes.createConf();
        this.unevenDistribution(2, 2, new long[]{150L, 25L}, new long[]{500L, 500L}, new String[]{RACK0, RACK1}, conf);
    }

    @Test
    public void testBalancing1OutOf2Blockpools() throws Exception {
        Configuration conf = TestBalancerWithMultipleNameNodes.createConf();
        this.unevenDistribution(2, 1, new long[]{150L, 25L}, new long[]{500L, 500L}, new String[]{RACK0, RACK1}, conf);
    }

    @Test
    public void testBalancing2OutOf3Blockpools() throws Exception {
        Configuration conf = TestBalancerWithMultipleNameNodes.createConf();
        this.unevenDistribution(3, 2, new long[]{150L, 25L, 50L}, new long[]{500L, 500L, 500L}, new String[]{RACK0, RACK1, RACK2}, conf);
    }

    @Test(timeout=600000L)
    public void testTwoFourTwo() throws Exception {
        Configuration conf = TestBalancerWithMultipleNameNodes.createConf();
        this.runTest(2, new String[]{RACK0, RACK0, RACK1, RACK1}, new String[]{RACK2, RACK2}, conf, 2, null);
    }

    @Test(timeout=600000L)
    public void testBalancingBlockpoolsWithBlockPoolPolicy() throws Exception {
        Configuration conf = TestBalancerWithMultipleNameNodes.createConf();
        BalancerParameters balancerParameters = new BalancerParameters.Builder().setBalancingPolicy((BalancingPolicy)BalancingPolicy.Pool.INSTANCE).build();
        this.runTest(2, new String[]{RACK0, RACK0, RACK1, RACK1}, new String[]{RACK2, RACK2}, conf, 2, balancerParameters);
    }

    @Test(timeout=600000L)
    public void test1OutOf2BlockpoolsWithBlockPoolPolicy() throws Exception {
        Configuration conf = TestBalancerWithMultipleNameNodes.createConf();
        BalancerParameters balancerParameters = new BalancerParameters.Builder().setBalancingPolicy((BalancingPolicy)BalancingPolicy.Pool.INSTANCE).build();
        this.runTest(2, new String[]{RACK0, RACK0, RACK1, RACK1}, new String[]{RACK2, RACK2}, conf, 1, balancerParameters);
    }

    static {
        TestBalancer.initTestSetup();
    }

    private static class Suite {
        final Configuration conf;
        final MiniDFSCluster cluster;
        final ClientProtocol[] clients;
        final short replication;
        final BalancerParameters parameters;

        Suite(MiniDFSCluster cluster, int nNameNodes, int nDataNodes, BalancerParameters parameters, Configuration conf) throws IOException {
            this.conf = conf;
            this.cluster = cluster;
            this.clients = new ClientProtocol[nNameNodes];
            for (int i = 0; i < nNameNodes; ++i) {
                this.clients[i] = cluster.getNameNode(i).getRpcServer();
            }
            this.replication = 1;
            this.parameters = parameters;
        }

        Suite(MiniDFSCluster cluster, int nNameNodes, int nDataNodes, BalancerParameters parameters, Configuration conf, short replicationFactor) throws IOException {
            this.conf = conf;
            this.cluster = cluster;
            this.clients = new ClientProtocol[nNameNodes];
            for (int i = 0; i < nNameNodes; ++i) {
                this.clients[i] = cluster.getNameNode(i).getRpcServer();
            }
            this.replication = replicationFactor;
            this.parameters = parameters;
        }
    }
}

