前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >[HDFS源码]-HDFS 副本策略原理分析

[HDFS源码]-HDFS 副本策略原理分析

作者头像
用户5252199
发布2022-08-30 11:29:56
8660
发布2022-08-30 11:29:56
举报

本文会从源码的实现角度来分享关于HDFS副本存储策略的概念和实现原理,HDFS的副本存储策略涉及副本写入、副本读取、机架感知、目标端存储的好坏区分策略, 熟悉副本存储策略可以帮助我们在开发或者运维过程中,提升数据处理/读写的效率、避免集群故障的发生.

01 副本放置策略概念

HDFS中文件是以副本的形式进行存储的, HDFS的副本放置策略的主要逻辑在于如何将副本放在合适的地方,并且副本放置好坏会影响数据读写性能的高低,同时HDFS提供了对于副本的容错机制,在副本丢失或DataNode宕机的时候会自动进行恢复.

首先 , HDFS副本放置策略的核心逻辑如下:

第一副本:放置在上传文件的 DataNode上(比如计算任务计算的时候,数据写入会选择当前节点的DataNode);如果是集群外提交,则随机挑选台磁盘不太慢、CPU不太忙的节点

第二副本: 放置在与第一个副本不同的机架的节点上(如果没有配置机架感知,则选择距离第一副本节点的就近节点)

第三副本: 与第二个副本相同机架的不同节点上(如果没有配置机架感知,则选择距离第二副本节点的就近节点)

如果还有更多的副本的话,则会选择随机节点放置

这里还会涉及到配置机架情况和没有配置机架的情况,是否配置机架感知也会影响到BPP(BlockPlacementPolicy副本策略核心类)

02 副本放置策略源码分析

目前在HDFS中现有的副本防止策略类有2大继承子类,分别为BlockPlacementPolicyDefault, BlockPlacementPolicyWithNodeGroup

源码基于Hadoop3.3.0版本进行分析, 在HDFS中副本放置的核心抽象类是: BlockPlacementPolicy ,策略的具体实现类为: BlockPlacementPolicyDefault 在前面副本策略的核心逻辑的描述就是这个类的注释内容.

BlockPlacementPolicy 类的核心功能包括:

  1. 选择目标节点
  2. 验证块的放置是否符合放置策略的要求
  3. 删除多余的副本块
  4. 将数据节点分为两组,一组包含具有多个副本的机架上的节点,另一组包含剩余的节点。

那么从源码阅读的角度,首先需要知道父类中函数用途,才能更容易去了解其子类的实现逻辑.

BlockPlacementPolicyDefault 类中最核心的方法是 chooseTarget ,但是这里有三个同名的 chooseTarget 方法, 三个函数的区别在于storageTypesfavoredNodes 两个参数 .

划重点: 两个参数的作用, storageTypes 是用来选择目标端的存储类型,比如RAM_DISK、SSD、DISK、ARAHIVE , 关于异构存储类型代码实现正在编写,后面也会发送,记得先关注公众号哦 ^.^

favoredNodes表示在选择节点的时候,优先选择这些节点

class: org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault

代码语言:javascript
复制
/**
   * 选择numOfReplicas个DataNode作为Block的目标节点,复制一个大小为blocksize的块
   * 最后将它们以Pipeline的方式排序返回
   * @param srcPath 正在调用这个chooseTargets的文件
   * @param numOfReplicas 需要的额外副本数量.
   * @param writer 的机器,如果不在集群中则为空。
   * @param 已选定为目标的datanode。
   * @param returnChosenNodes 决定是否返回chosenNodes。.
   * @param excludedNodes 已排除的节点不应该在目标节点中
   * @param blocksize 要写入数据的大小
   * @param flags 块放置标志
   * @return 数组的DatanodeDescriptor实例选择作为目标并作为管道排序。
   */
public abstract DatanodeStorageInfo[] chooseTarget(String srcPath,
                                             int numOfReplicas,
                                             Node writer,
                                             List<DatanodeStorageInfo> chosen,
                                             boolean returnChosenNodes,
                                             Set<Node> excludedNodes,
                                             long blocksize,
                                             BlockStoragePolicy storagePolicy,
                                             EnumSet<AddBlockFlag> flags);                                
代码语言:javascript
复制
// 根据目标端的存储类型来选择
public DatanodeStorageInfo[] chooseTarget(String srcPath, int numOfReplicas,
      Node writer, List<DatanodeStorageInfo> chosen, boolean returnChosenNodes,
      Set<Node> excludedNodes, long blocksize, BlockStoragePolicy storagePolicy,
      EnumSet<AddBlockFlag> flags, EnumMap<StorageType, Integer> storageTypes) {
    return chooseTarget(srcPath, numOfReplicas, writer, chosen,
        returnChosenNodes, excludedNodes, blocksize, storagePolicy, flags);
  }
代码语言:javascript
复制
// 根据目标端的存储类型来选择
public DatanodeStorageInfo[] chooseTarget(String srcPath, int numOfReplicas,
      Node writer, List<DatanodeStorageInfo> chosen, boolean returnChosenNodes,
      Set<Node> excludedNodes, long blocksize, BlockStoragePolicy storagePolicy,
      EnumSet<AddBlockFlag> flags, EnumMap<StorageType, Integer> storageTypes) {
    return chooseTarget(srcPath, numOfReplicas, writer, chosen,
        returnChosenNodes, excludedNodes, blocksize, storagePolicy, flags);
  }

在了解了核心函数定义之后,下面我们来拆分一下核心方法里面的具体实现流程: 在三个方法中,可以发现根据favoredNodes 参数来区分了两段逻辑实现,一个是由favoredNodes 参数的逻辑,一个是不包含favoredNodes 的逻辑.

chooseTarget 方法中,如果传入了favoredNodes 参数,则优先选取该参数中的节点,然后和无favoredNodes 参数的实现中,方法最终会进入到真正的chooseTarget方法中.

真正的chooseTarget 方法实现主要的逻辑如下:

  1. 初始化操作
    1. 如果副本数为0或者机器节点数量为0,则返回空
    2. 创建ExcludeNodes集合.
    3. 计算出每个机架所允许的最大副本数,走的是getMaxNodesPerRack
  2. 选择目标节点
    1. 将所选择的节点加入到结果列表中,同时加入到移除列表中,表示这些节点已经选择过了.
    2. 选择numOfReplicas数量的目标节点,并返回其中第一个节点.
  3. 构建Pieline

chooseTarget方法代码实现,代码比较长,可以简单看看核心逻辑

代码语言:javascript
复制
  /**这个是不包含favoredNodes参数的代码实现 */
  private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
                                    Node writer,
                                    List<DatanodeStorageInfo> chosenStorage,
                                    boolean returnChosenNodes,
                                    Set<Node> excludedNodes,
                                    long blocksize,
                                    final BlockStoragePolicy storagePolicy,
                                    EnumSet<AddBlockFlag> addBlockFlags,
                                    EnumMap<StorageType, Integer> sTypes) {
    /// 判断numOfReplicas是否为0 或者 集群中没有可选的节点
    if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
      return DatanodeStorageInfo.EMPTY_ARRAY;
    }
    /// 创建ExcludeNodes集合.
    if (excludedNodes == null) {
      excludedNodes = new HashSet<>();
    }
    // 计算出每个机架所允许的最大副本数
    int[] result = getMaxNodesPerRack(chosenStorage.size(), numOfReplicas);
    numOfReplicas = result[0];
    int maxNodesPerRack = result[1];
      
    for (DatanodeStorageInfo storage : chosenStorage) {
      // 添加localMachine和相关节点到excludedNodes
      addToExcludedNodes(storage.getDatanodeDescriptor(), excludedNodes);
    }

    List<DatanodeStorageInfo> results = null;
    Node localNode = null;
    boolean avoidStaleNodes = (stats != null
        && stats.isAvoidingStaleDataNodesForWrite());
    boolean avoidLocalNode = (addBlockFlags != null
        && addBlockFlags.contains(AddBlockFlag.NO_LOCAL_WRITE)
        && writer != null
        && !excludedNodes.contains(writer));
    // 尝试排除本地节点。如果不能获得足够的节点,它就退回到默认的块放置策略。
    if (avoidLocalNode) {
      results = new ArrayList<>(chosenStorage);
      Set<Node> excludedNodeCopy = new HashSet<>(excludedNodes);
      if (writer != null) {
        excludedNodeCopy.add(writer);
      }
      localNode = chooseTarget(numOfReplicas, writer,
          excludedNodeCopy, blocksize, maxNodesPerRack, results,
          avoidStaleNodes, storagePolicy,
          EnumSet.noneOf(StorageType.class), results.isEmpty(), sTypes);
      if (results.size() < numOfReplicas) {
        // 没有足够的节点;则后退一步
        results = null;
      }
    }
    if (results == null) {
      results = new ArrayList<>(chosenStorage);
      localNode = chooseTarget(numOfReplicas, writer, excludedNodes,
          blocksize, maxNodesPerRack, results, avoidStaleNodes,
          storagePolicy, EnumSet.noneOf(StorageType.class), results.isEmpty(),
          sTypes);
    }

    if (!returnChosenNodes) {  
      results.removeAll(chosenStorage);
    }
      
    // 排序形成Pipeline
    return getPipeline(
        (writer != null && writer instanceof DatanodeDescriptor) ? writer
            : localNode,
        results.toArray(new DatanodeStorageInfo[results.size()]));
  }

目标节点的选择策略源码chooseTargetInOrder方法分析:

分别为第一节点选择、第二节点选择、第三节点选择以及超过三个节点之后的选择策略,这块代码逻辑正好是符合文中开头提到的副本策略的描述

代码语言:javascript
复制
代码语言:javascript
复制
final int numOfResults = results.size();
// 1.如果目标节点数为0,则表示还没有开始选择节点
if (numOfResults == 0) {
  // 1 . 先拿本地节点作为存储节点
  DatanodeStorageInfo storageInfo = chooseLocalStorage(writer,
      excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes,
      storageTypes, true);
  writer = (storageInfo != null) ? storageInfo.getDatanodeDescriptor()
                                 : null;
  // 1. numOfReplicas 表示选择多少个Node作为存储节点,
  // 如果numOfReplicas=0 , 则表示节点已经选择完成,不需要在选择了。
  if (--numOfReplicas == 0) {
    return writer;
  }
}
// 2. 否则目标节点不为空,则取第一个作为存储
final DatanodeDescriptor dn0 = results.get(0).getDatanodeDescriptor();
// 2.1 : 如果目标节点只有一个
if (numOfResults <= 1) {
    // 2.2 则剩下的从其他机架进行选择
  chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
      results, avoidStaleNodes, storageTypes);
  // 2.3 : 选择完成,返回这个writer
  if (--numOfReplicas == 0) {
    return writer;
  }
}
// 3. 如果已经选择了两个节点以内,则开始选取第三个节点
if (numOfResults <= 2) {
    // 3.1: 取出第二个DN节点
  final DatanodeDescriptor dn1 = results.get(1).getDatanodeDescriptor();
   // 3.2: 判断DN0和DN1 是否是相同的机架,如果是相同机架,则在其他机架中选择一个节点
  if (clusterMap.isOnSameRack(dn0, dn1)) {
    chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
        results, avoidStaleNodes, storageTypes);
    // 3.3: 如果DN0和DN1不是一个机架,是一个新的块的话,则在DN1所在机架选择第一个节点
  } else if (newBlock){
    chooseLocalRack(dn1, excludedNodes, blocksize, maxNodesPerRack,
        results, avoidStaleNodes, storageTypes);
    // 3.4: 如果DN0和DN1不是一个机架,则在本地机架选择一个节点
  } else {
    chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,
        results, avoidStaleNodes, storageTypes);
  }
  // 选择完成了,返回
  if (--numOfReplicas == 0) {
    return writer;
  }
}
// 4: 最后:如果副本数已经超过2个,已经设置超过3副本的数量
// 则剩余位置在集群中随机选择放置节点
chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
    maxNodesPerRack, results, avoidStaleNodes, storageTypes);
return writer;

Pipeline形成的逻辑

getPipeline : 返回一个节点管道。管道的形成是从写入器开始,遍历所有节点的最短路径。这基本上是一个TSP旅行商问题TSP旅行商问题百科:假设有一个旅行商人要拜访N个城市,他必须选择所要走的路径,路径的限制是每个城市只能拜访一次,而且最后要回到原来出发的城市。路径的选择目标是要求得的路径路程为所有路径之中的最小值

  1. 如果请求方本身不在一个datanode上,则默认选取第一个datanode作为起始节点
  2. 获取当前index下标所属的Storage为最近距离的目标storage
  3. 计算最短距离,getDistance 的逻辑: 返回两个节点之间的距离,假设一个节点到其父节点的距离为1两个节点之间的距离是通过将它们的距离相加来计算的,是他们最近的共同祖先 <LCA最近公共祖先算法>
  4. 最后构建一个最短路径的pipeline

Pipeline类实现的核心源码如下:

代码语言:javascript
复制
 int index=0;
  // 如果writer请求方本身不在一个datanode上,则默认选取第一个datanode作为起始节点
  if (writer == null || !clusterMap.contains(writer)) {
    writer = storages[0].getDatanodeDescriptor();
      }
  // 遍历所有的storeages,计算最近距离目标的storage
  for(; index < storages.length; index++) {
        // 获取当前index下标所属的Storage为最近距离的目标storage
        DatanodeStorageInfo shortestStorage = storages[index];
        // 计算最短距离,getDistance 的逻辑 :返回两个节点之间的距离
        //假设一个节点到其父节点的距离为1
        //两个节点之间的距离是通过将它们的距离相加来计算的
        //和他们最近的共同祖先。
        int shortestDistance = clusterMap.getDistance(writer, shortestStorage.getDatanodeDescriptor());
        int shortestIndex = index;
        for(int i = index + 1; i < storages.length; i++) {
          // 遍历计算当前的距离
          int currentDistance = clusterMap.getDistance(writer,
              storages[i].getDatanodeDescriptor());
          if (shortestDistance>currentDistance) {
            shortestDistance = currentDistance;
            shortestStorage = storages[i];
            shortestIndex = i;
          }
        }
        //找到新的最短距离的storage,并进行下标替换
        if (index != shortestIndex) {
          storages[shortestIndex] = storages[index];
          storages[index] = shortestStorage;
        }
        writer = shortestStorage.getDatanodeDescriptor();
      }
    }
    return storages;

如何判断目标存储的好坏?

经过上面一系列的计算、选择将符合条件的节点加入到了result列表中,那么到这里这里其实还有一步很关键.在isGoodDataNode方法中判断了目标存储的好坏.

需要判断以下几个条件:

  1. 检查该节点是否在服务
  2. 检查该节点是否是下线节点
  3. 是否考虑目标节点的负载,如果是,则要看工作负载是否过高
  4. 检查目标机架是否选择了过多的节点
  5. 是否大于同机架内最多的副本选择数

代码逻辑如下 :

代码语言:javascript
复制
  boolean isGoodDatanode(DatanodeDescriptor node,
                         int maxTargetPerRack, boolean considerLoad,
                         List<DatanodeStorageInfo> results,
                         boolean avoidStaleNodes) {
    // check if the node is (being) decommissioned
      // 检查该节点是否在服务
    if (!node.isInService()) {
      logNodeIsNotChosen(node, NodeNotChosenReason.NOT_IN_SERVICE);
      return false;
    }
    // 该节点是否是下线节点
    if (avoidStaleNodes) {
      if (node.isStale(this.staleInterval)) {
        logNodeIsNotChosen(node, NodeNotChosenReason.NODE_STALE);
        return false;
      }
    }
    // 是否考虑目标节点的负载,如果考虑的话,就要判断当前节点负载是否比较高
    if(considerLoad) {
      if (excludeNodeByLoad(node)) {
        return false;
      }
    }
    // 检查目标机架是否选择了过多的节点
    String rackname = node.getNetworkLocation();
    int counter=1;
    for(DatanodeStorageInfo resultStorage : results) {
      if (rackname.equals(
          resultStorage.getDatanodeDescriptor().getNetworkLocation())) {
        counter++;
      }
    }
    // 是否满足同机架内最大副本数的限制
    if (counter > maxTargetPerRack) {
      logNodeIsNotChosen(node, NodeNotChosenReason.TOO_MANY_NODES_ON_RACK);
      return false;
    }
    return true;
  }

04 副本系数

所谓副本系数是指在HDFS中可以通过hdfs.site.xml中的参数: dfs.replication 来配置的,默认的情况下是3, 也就是说每个文件在上传之后,默认会生成三份数据,三份数据的存储的策略是按照上面提到的副本策略来保存的.

当然, 这个副本系数(数量)也可以进行配置,配置的方式有以下几种:

  1. 在hdfs.site.xml中配置,比如将参数 dfs.replication 设置为 2 , 不过这种方式是一种全局策略,等于说所有文件的保存都是只有2个副本.
  2. 通过Client端的在上传文件的时候进行配置, 比如:
代码语言:javascript
复制
Configuration configuration = new Configuration();
configuration.set("dfs.replication",1);
  1. 通过HDFS CLI命令来手动的调整副本数量,比如: hadoop fs -setrep [-R] [-w] <numReplicas> <path> 更改文件的副本系数。如果path是一个目录,则该命令递归地更改以path为根的目录树下所有文件的副本系数 -w : 会等待复制直到完成之后才会退出,这个可能需要很长时间 -R : 标记为向后兼容 , 实际从源码层面没有任何作用

05 归纳总结

对于本文副本策略的内容的学习,我们了解了副本放置策略的概念、方法、原理实现(源码层) , 最后我们来归纳总结一下设计要点:

HDFS默认使用的副本策略设计要点:

  • 可靠性:Block 存储在两个机架上, 如果其中一个机架网络出现异常 , 可以保证在其它机架的Datanode上找到数据。
  • 写操作:写操作数据传输仅进行一次网络传输 , 减少了机架间的数据传输, 提高了写操作的效率。
  • 读操作:在读取数据时, 为了减少整体的带宽消耗和降低整体的带宽延时,HDFS 会尽量让读取操作读取离 client 最近的副本(短读操作) . 如果在读取操作的同一个机架上有一个副本,那么就读取该副本 , 如果本地数据损坏, 那么node 可以从同一个机架内的相邻 node 上拿到数据.

后续会持续更新一些大数据组件源码相关的内容 , 欢迎点赞、关注、在看 ^.^

本文参考:

深度剖析Hadoop HDFS - 林意群 编著

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-05-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据技术博文 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 01 副本放置策略概念
  • 02 副本放置策略源码分析
  • 04 副本系数
  • 05 归纳总结
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档