/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.datanode;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.client.BlockReportOptions;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Time;
import org.junit.Assert;
import org.junit.Test;

public class TestBlockReplacement {
    private static final Log LOG = LogFactory.getLog((String)"org.apache.hadoop.hdfs.TestBlockReplacement");
    MiniDFSCluster cluster;

    @Test
    public void testThrottler() throws IOException {
        long TOTAL_BYTES;
        HdfsConfiguration conf = new HdfsConfiguration();
        FileSystem.setDefaultUri((Configuration)conf, (String)"hdfs://localhost:0");
        long bandwidthPerSec = 0x100000L;
        long bytesToSend = TOTAL_BYTES = 6L * bandwidthPerSec;
        long start = Time.monotonicNow();
        DataTransferThrottler throttler = new DataTransferThrottler(bandwidthPerSec);
        long bytesSent = 524288L;
        throttler.throttle(bytesSent);
        bytesToSend -= bytesSent;
        bytesSent = 786432L;
        throttler.throttle(bytesSent);
        bytesToSend -= bytesSent;
        try {
            Thread.sleep(1000L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        throttler.throttle(bytesToSend);
        long end = Time.monotonicNow();
        Assert.assertTrue((TOTAL_BYTES * 1000L / (end - start) <= bandwidthPerSec ? 1 : 0) != 0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testBlockReplacement() throws Exception {
        HdfsConfiguration CONF = new HdfsConfiguration();
        String[] INITIAL_RACKS = new String[]{"/RACK0", "/RACK1", "/RACK2"};
        String[] NEW_RACKS = new String[]{"/RACK2"};
        int REPLICATION_FACTOR = 3;
        int DEFAULT_BLOCK_SIZE = 1024;
        Random r = new Random();
        CONF.setLong("dfs.blocksize", 1024L);
        CONF.setInt("dfs.bytes-per-checksum", 512);
        CONF.setLong("dfs.blockreport.intervalMsec", 500L);
        this.cluster = new MiniDFSCluster.Builder((Configuration)CONF).numDataNodes(3).racks(INITIAL_RACKS).build();
        try {
            this.cluster.waitActive();
            DistributedFileSystem fs = this.cluster.getFileSystem();
            Path fileName = new Path("/tmp.txt");
            DFSTestUtil.createFile((FileSystem)fs, fileName, 1024L, (short)3, r.nextLong());
            DFSTestUtil.waitReplication((FileSystem)fs, fileName, (short)3);
            InetSocketAddress addr = new InetSocketAddress("localhost", this.cluster.getNameNodePort());
            DFSClient client = new DFSClient(addr, (Configuration)CONF);
            List locatedBlocks = client.getNamenode().getBlockLocations("/tmp.txt", 0L, 1024L).getLocatedBlocks();
            Assert.assertEquals((long)1L, (long)locatedBlocks.size());
            LocatedBlock block = (LocatedBlock)locatedBlocks.get(0);
            DatanodeInfo[] oldNodes = block.getLocations();
            Assert.assertEquals((long)oldNodes.length, (long)3L);
            ExtendedBlock b = block.getBlock();
            this.cluster.startDataNodes((Configuration)CONF, 1, true, null, NEW_RACKS);
            this.cluster.waitActive();
            DatanodeInfo[] datanodes = client.datanodeReport(HdfsConstants.DatanodeReportType.ALL);
            DatanodeInfo newNode = null;
            for (DatanodeInfo node : datanodes) {
                Boolean isNewNode = true;
                for (DatanodeInfo oldNode : oldNodes) {
                    if (!node.equals((Object)oldNode)) continue;
                    isNewNode = false;
                    break;
                }
                if (!isNewNode.booleanValue()) continue;
                newNode = node;
                break;
            }
            Assert.assertTrue((newNode != null ? 1 : 0) != 0);
            DatanodeInfo source = null;
            ArrayList<DatanodeInfo> proxies = new ArrayList<DatanodeInfo>(2);
            for (DatanodeInfo node : datanodes) {
                if (node == newNode) continue;
                if (node.getNetworkLocation().equals(newNode.getNetworkLocation())) {
                    source = node;
                    continue;
                }
                proxies.add(node);
            }
            Assert.assertTrue((source != null && proxies.size() == 2 ? 1 : 0) != 0);
            LOG.info((Object)("Testcase 1: Proxy " + newNode + " does not contain the block " + b));
            Assert.assertFalse((boolean)this.replaceBlock(b, source, newNode, (DatanodeInfo)proxies.get(0)));
            LOG.info((Object)("Testcase 2: Destination " + proxies.get(1) + " contains the block " + b));
            Assert.assertFalse((boolean)this.replaceBlock(b, source, (DatanodeInfo)proxies.get(0), (DatanodeInfo)proxies.get(1)));
            LOG.info((Object)("Testcase 3: Source=" + source + " Proxy=" + proxies.get(0) + " Destination=" + newNode));
            Assert.assertTrue((boolean)this.replaceBlock(b, source, (DatanodeInfo)proxies.get(0), newNode));
            this.checkBlocks(new DatanodeInfo[]{newNode, (DatanodeInfo)proxies.get(0), (DatanodeInfo)proxies.get(1)}, fileName.toString(), 1024L, (short)3, client);
            LOG.info((Object)("Testcase 4: invalid del hint " + proxies.get(0)));
            Assert.assertTrue((boolean)this.replaceBlock(b, (DatanodeInfo)proxies.get(0), (DatanodeInfo)proxies.get(1), source));
            this.checkBlocks(proxies.toArray(new DatanodeInfo[proxies.size()]), fileName.toString(), 1024L, (short)3, client);
        }
        finally {
            this.cluster.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testBlockMoveAcrossStorageInSameNode() throws Exception {
        HdfsConfiguration conf = new HdfsConfiguration();
        MiniDFSCluster cluster = new MiniDFSCluster.Builder((Configuration)conf).numDataNodes(1).storageTypes(new StorageType[]{StorageType.DISK, StorageType.ARCHIVE}).build();
        try {
            cluster.waitActive();
            DistributedFileSystem dfs = cluster.getFileSystem();
            Path file = new Path("/testBlockMoveAcrossStorageInSameNode/file");
            DFSTestUtil.createFile((FileSystem)dfs, file, 1024L, (short)1, 1024L);
            LocatedBlocks locatedBlocks = dfs.getClient().getLocatedBlocks(file.toString(), 0L);
            LocatedBlock locatedBlock = locatedBlocks.get(0);
            ExtendedBlock block = locatedBlock.getBlock();
            DatanodeInfo[] locations = locatedBlock.getLocations();
            Assert.assertEquals((long)1L, (long)locations.length);
            StorageType[] storageTypes = locatedBlock.getStorageTypes();
            Assert.assertTrue((storageTypes[0] == StorageType.DISK ? 1 : 0) != 0);
            DatanodeInfo source = locations[0];
            Assert.assertTrue((boolean)this.replaceBlock(block, source, source, source, StorageType.ARCHIVE));
            Thread.sleep(3000L);
            locatedBlocks = dfs.getClient().getLocatedBlocks(file.toString(), 0L);
            locatedBlock = locatedBlocks.get(0);
            Assert.assertEquals((String)"Storage should be only one", (long)1L, (long)locatedBlock.getLocations().length);
            Assert.assertTrue((String)"Block should be moved to ARCHIVE", (locatedBlock.getStorageTypes()[0] == StorageType.ARCHIVE ? 1 : 0) != 0);
        }
        finally {
            cluster.shutdown();
        }
    }

    private void checkBlocks(DatanodeInfo[] includeNodes, String fileName, long fileLen, short replFactor, DFSClient client) throws IOException, TimeoutException {
        boolean notDone;
        long TIMEOUT = 20000L;
        long starttime = Time.monotonicNow();
        long failtime = starttime + 20000L;
        do {
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            List blocks = client.getNamenode().getBlockLocations(fileName, 0L, fileLen).getLocatedBlocks();
            Assert.assertEquals((long)1L, (long)blocks.size());
            DatanodeInfo[] nodes = ((LocatedBlock)blocks.get(0)).getLocations();
            boolean bl = notDone = nodes.length != replFactor;
            if (notDone) {
                LOG.info((Object)("Expected replication factor is " + replFactor + " but the real replication factor is " + nodes.length));
            } else {
                List<DatanodeInfo> nodeLocations = Arrays.asList(nodes);
                for (DatanodeInfo node : includeNodes) {
                    if (nodeLocations.contains(node)) continue;
                    notDone = true;
                    LOG.info((Object)("Block is not located at " + node));
                    break;
                }
            }
            if (Time.monotonicNow() <= failtime) continue;
            String expectedNodesList = "";
            String currentNodesList = "";
            for (DatanodeInfo dn : includeNodes) {
                expectedNodesList = expectedNodesList + dn + ", ";
            }
            for (DatanodeInfo dn : nodes) {
                currentNodesList = currentNodesList + dn + ", ";
            }
            LOG.info((Object)("Expected replica nodes are: " + expectedNodesList));
            LOG.info((Object)("Current actual replica nodes are: " + currentNodesList));
            throw new TimeoutException("Did not achieve expected replication to expected nodes after more than 20000 msec.  See logs for details.");
        } while (notDone);
        LOG.info((Object)("Achieved expected replication values in " + (Time.now() - starttime) + " msec."));
    }

    private boolean replaceBlock(ExtendedBlock block, DatanodeInfo source, DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException {
        return DFSTestUtil.replaceBlock(block, source, sourceProxy, destination, StorageType.DEFAULT, DataTransferProtos.Status.SUCCESS);
    }

    private boolean replaceBlock(ExtendedBlock block, DatanodeInfo source, DatanodeInfo sourceProxy, DatanodeInfo destination, StorageType targetStorageType) throws IOException, SocketException {
        return DFSTestUtil.replaceBlock(block, source, sourceProxy, destination, targetStorageType, DataTransferProtos.Status.SUCCESS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDeletedBlockWhenAddBlockIsInEdit() throws Exception {
        HdfsConfiguration conf = new HdfsConfiguration();
        this.cluster = new MiniDFSCluster.Builder((Configuration)conf).nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(1).build();
        DFSClient client = null;
        try {
            this.cluster.waitActive();
            Assert.assertEquals((String)"Number of namenodes is not 2", (long)2L, (long)this.cluster.getNumNameNodes());
            this.cluster.transitionToActive(0);
            Assert.assertTrue((String)"Namenode 0 should be in active state", (boolean)this.cluster.getNameNode(0).isActiveState());
            Assert.assertTrue((String)"Namenode 1 should be in standby state", (boolean)this.cluster.getNameNode(1).isStandbyState());
            DataNodeTestUtils.triggerHeartbeat(this.cluster.getDataNodes().get(0));
            DistributedFileSystem fs = this.cluster.getFileSystem(0);
            this.cluster.getDataNodes().get(0).triggerBlockReport(new BlockReportOptions.Factory().setIncremental(false).build());
            Path fileName = new Path("/tmp.txt");
            DFSTestUtil.createFile((FileSystem)fs, fileName, 10L, (short)1, 1234L);
            DFSTestUtil.waitReplication((FileSystem)fs, fileName, (short)1);
            client = new DFSClient(this.cluster.getFileSystem(0).getUri(), (Configuration)conf);
            List locatedBlocks = client.getNamenode().getBlockLocations("/tmp.txt", 0L, 10L).getLocatedBlocks();
            Assert.assertTrue((locatedBlocks.size() == 1 ? 1 : 0) != 0);
            Assert.assertTrue((((LocatedBlock)locatedBlocks.get(0)).getLocations().length == 1 ? 1 : 0) != 0);
            this.cluster.startDataNodes((Configuration)conf, 1, true, null, null, null, null);
            Assert.assertEquals((String)"Number of datanodes should be 2", (long)2L, (long)this.cluster.getDataNodes().size());
            DataNode dn0 = this.cluster.getDataNodes().get(0);
            DataNode dn1 = this.cluster.getDataNodes().get(1);
            String activeNNBPId = this.cluster.getNamesystem(0).getBlockPoolId();
            DatanodeDescriptor sourceDnDesc = NameNodeAdapter.getDatanode(this.cluster.getNamesystem(0), (DatanodeID)dn0.getDNRegistrationForBP(activeNNBPId));
            DatanodeDescriptor destDnDesc = NameNodeAdapter.getDatanode(this.cluster.getNamesystem(0), (DatanodeID)dn1.getDNRegistrationForBP(activeNNBPId));
            ExtendedBlock block = DFSTestUtil.getFirstBlock((FileSystem)fs, fileName);
            LOG.info((Object)("replaceBlock:  " + this.replaceBlock(block, (DatanodeInfo)sourceDnDesc, (DatanodeInfo)sourceDnDesc, (DatanodeInfo)destDnDesc)));
            for (int tries = 0; tries < 20; ++tries) {
                Thread.sleep(1000L);
                DataNodeTestUtils.triggerDeletionReport(this.cluster.getDataNodes().get(0));
                locatedBlocks = client.getNamenode().getBlockLocations("/tmp.txt", 0L, 10L).getLocatedBlocks();
                if (((LocatedBlock)locatedBlocks.get(0)).getLocations().length == 1) break;
            }
            this.cluster.transitionToStandby(0);
            this.cluster.transitionToActive(1);
            Assert.assertTrue((String)"Namenode 1 should be in active state", (boolean)this.cluster.getNameNode(1).isActiveState());
            Assert.assertTrue((String)"Namenode 0 should be in standby state", (boolean)this.cluster.getNameNode(0).isStandbyState());
            client.close();
            client = new DFSClient(this.cluster.getFileSystem(1).getUri(), (Configuration)conf);
            List locatedBlocks1 = client.getNamenode().getBlockLocations("/tmp.txt", 0L, 10L).getLocatedBlocks();
            Assert.assertEquals((long)1L, (long)locatedBlocks1.size());
            Assert.assertEquals((String)"The block should be only on 1 datanode ", (long)1L, (long)((LocatedBlock)locatedBlocks1.get(0)).getLocations().length);
        }
        catch (Throwable throwable) {
            IOUtils.cleanup(null, (Closeable[])new Closeable[]{client});
            this.cluster.shutdown();
            throw throwable;
        }
        IOUtils.cleanup(null, (Closeable[])new Closeable[]{client});
        this.cluster.shutdown();
    }

    public static void main(String[] args) throws Exception {
        new TestBlockReplacement().testBlockReplacement();
    }
}

