专栏首页大数据技术与应用实战hadoop2.7.3源码解析之hdfs删除文件全流程分析h

hadoop2.7.3源码解析之hdfs删除文件全流程分析h

@

  • 从命名空间删除文件
  • 将相应的数据块加到InvalidateBlocks中
  • ReplicationMonitor监控线程
  • 心跳生成删除命令
  • 心跳处理删除命令
  • 异步单独开启线程删除磁盘数据
  • 向namenode汇报删除的块

客户端删除文件

先来一段简单的代码,用java的api删除hdfs的 文件

	Configuration conf = new Configuration();
	FileSystem fs = FileSystem.get(conf);
		Path p = new Path("hdfs://127.0.0.1:9000/demo021.txt");
		fs.delete(p,true);
		fs.close();// 释放资源
		System.out.println("删除成功.....");

namenode删除文件

客户端通过ClientProtocol.delete(String, boolean)方法来删除文件,最终实现是NameNodeRpcServer.delete(String, boolean)方法.

之后调用了FSNamesystem的delete来删除namesystem中的相应的文件.,这里总共分为两步,第一步,从namespace删除相应的文件信息并收集删除的文件的数据块.第二步,将收集到的待删除的数据块加到blockmanage的invalidateBlocks中,等待datanode下次心跳的时候生成删除命令发给datanode,然后删除具体的数据块.

  boolean delete(String src, boolean recursive, boolean logRetryCache)
      throws IOException {
    waitForLoadingFSImage();
    BlocksMapUpdateInfo toRemovedBlocks = null;
    writeLock();
    boolean ret = false;
    try {
      //检查是否有写的权限
      checkOperation(OperationCategory.WRITE);
      //检查是否处于安全模式
      checkNameNodeSafeMode("Cannot delete " + src);
      //从命名空间删除相应的文件
      toRemovedBlocks = FSDirDeleteOp.delete(
          this, src, recursive, logRetryCache);
      ret = toRemovedBlocks != null;
    } catch (AccessControlException e) {
      logAuditEvent(false, "delete", src);
      throw e;
    } finally {
      writeUnlock();
    }
    //将删除操作记录到editlog中
    getEditLog().logSync();
    if (toRemovedBlocks != null) {
     //删除数据块操作
      removeBlocks(toRemovedBlocks); // Incremental deletion of blocks
    }
    logAuditEvent(true, "delete", src);
    return ret;
  }

从命名空间删除文件

通过工具类FSDirDeleteOp的静态方法delete来删除文件,并且收集该文件的要删除的block.

最终通过FSDirDeleteOp类的unprotectedDelete(FSDirectory, INodesInPath, BlocksMapUpdateInfo, List, long)方法来执行删除操作.之所以叫做unprotectedDelet,是因为这个时候删除只是将该文件从命名空间中删除,并没有真正的写入editlog.

删除过程分为以下几个步骤: 1.检查文件是否存在 2,修改快照记录 3.从namespace中移除文件,也就是FSDirectory记录的INodeDirectory 类型的rootDir中删除; 4.设置父文件夹的最后修改时间 5更新删除的记录数 6 收集要删除的block

  /**
   * Delete a path from the name space
   * Update the count at each ancestor directory with quota
   * @param iip the inodes resolved from the path
   * @param collectedBlocks blocks collected from the deleted path
   * @param removedINodes inodes that should be removed from inodeMap
   * @param mtime the time the inode is removed
   * @return the number of inodes deleted; 0 if no inodes are deleted.
   */
  private static long unprotectedDelete(
      FSDirectory fsd, INodesInPath iip, BlocksMapUpdateInfo collectedBlocks,
      List<INode> removedINodes, long mtime) {
    assert fsd.hasWriteLock();

    // check if target node exists
    //检查是否存在
    INode targetNode = iip.getLastINode();
    if (targetNode == null) {
      return -1;
    }

//修改快照
    // record modification
    final int latestSnapshot = iip.getLatestSnapshotId();
    targetNode.recordModification(latestSnapshot);

//最核心的代码,从命名空间删除
    // Remove the node from the namespace
    long removed = fsd.removeLastINode(iip);
    if (removed == -1) {
      return -1;
    }
//设置父文件夹的最后修改时间
    // set the parent's modification time
    final INodeDirectory parent = targetNode.getParent();
    parent.updateModificationTime(mtime, latestSnapshot);

//更新记录数
    fsd.updateCountForDelete(targetNode, iip);
    if (removed == 0) {
      return 0;
    }

//收集要删除的block
    // collect block and update quota
    if (!targetNode.isInLatestSnapshot(latestSnapshot)) {
    //收集INodeFile中的blocks变量存放的block信息
      targetNode.destroyAndCollectBlocks(fsd.getBlockStoragePolicySuite(),
        collectedBlocks, removedINodes);
    } else {
      QuotaCounts counts = targetNode.cleanSubtree(
        fsd.getBlockStoragePolicySuite(), CURRENT_STATE_ID,
          latestSnapshot, collectedBlocks, removedINodes);
      removed = counts.getNameSpace();
      fsd.updateCountNoQuotaCheck(iip, iip.length() -1, counts.negation());
    }

    if (NameNode.stateChangeLog.isDebugEnabled()) {
      NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
          + iip.getPath() + " is removed");
    }
    return removed;
  }
}

将相应的数据块加到InvalidateBlocks中

FSNamesystem的removeBlocks循环刚才收集到的blocks,然后调用blockManager的removeBlock来处理要删除的数据块.

在blockManager的removeBlock中,首先获取到相应的block对应的DatanodeDescriptor,然后将其加到invalidateBlocks里面,然后从blocksMap,corruptReplicas,pendingReplications ,neededReplications 中删除相应的block.

  public void removeBlock(Block block) {
    assert namesystem.hasWriteLock();
    // No need to ACK blocks that are being removed entirely
    // from the namespace, since the removal of the associated
    // file already removes them from the block map below.
    block.setNumBytes(BlockCommand.NO_ACK);
    addToInvalidates(block); //加到invalidateBlocks中
    removeBlockFromMap(block);//从blocksMap删除
    // Remove the block from pendingReplications and neededReplications
    pendingReplications.remove(block);
    neededReplications.remove(block, UnderReplicatedBlocks.LEVEL);
    if (postponedMisreplicatedBlocks.remove(block)) {
      postponedMisreplicatedBlocksCount.decrementAndGet();
    }
  }

ReplicationMonitor监控线程

BlockManage里面有一个ReplicationMonitor线程,不断的计算块的副本信息和无效的块信息,以便生成相应的命令,等下次心跳的时候传给datanode.在这里我们只是看下相应的删除的方法.

通过run方法我们找到计算无效的块信息的方法computeInvalidateWork,在这里会循环invalidateBlocks中的所有datanode,然后循环调用invalidateWorkForOneNode方法一个一个的datanode来处理.

在invalidateWorkForOneNode中,首先将相应的datanode从invalidateBlocks中删除,然后调用invalidateBlocks.invalidateWork将该DatanodeDescriptor相应的无效的块加到DatanodeDescriptor类中LightWeightHashSet类型的变量invalidateBlocks中,等待下次心跳生成删除命令.

心跳生成删除命令

具体生成删除相关命令的代码在以下方法中,DatanodeManager.handleHeartbeat(DatanodeRegistration, StorageReport[], String, long, long, int, int, int, VolumeFailureSummary).

        //check block invalidation
        Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
        if (blks != null) {
          cmds.add(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE,
              blockPoolId, blks));
        }

有关hdfs心跳相关的信息请参考 http://blog.csdn.net/zhangjun5965/article/details/75579238

datanode删除相应的block并汇报

心跳处理删除命令

datanode方面是通过BPServiceActor的offerService方法进行心跳相关的操作,报告心跳之后,会依次处理从namenode接收的命令,最终处理的方法落在BPOfferService.processCommandFromActive(DatanodeCommand, BPServiceActor)方法上.

在这个方法中,通过switch来判断传过来的是哪种命令,来分别进行处理,删除数据块对应的是DatanodeProtocol.DNA_INVALIDATE,最终进入了FsDatasetImpl.invalidate(String, Block[])方法来从磁盘删除具体的数据块.

异步单独开启线程删除磁盘数据

具体的操作方法是调用了asyncDiskService.deleteAsync异步的开启线程删除数据块,以便提高效率.

      // Delete the block asynchronously to make sure we can do it fast enough.
      // It's ok to unlink the block file before the uncache operation
      // finishes.
      try {
        asyncDiskService.deleteAsync(v.obtainReference(), f,
            FsDatasetUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp()),
            new ExtendedBlock(bpid, invalidBlks[i]),
            dataStorage.getTrashDirectoryForBlockFile(bpid, f));
      } catch (ClosedChannelException e) {
        LOG.warn("Volume " + v + " is closed, ignore the deletion task for " +
            "block " + invalidBlks[i]);
      }

多线程删除具体是开启了一个ReplicaFileDeleteTask线程来做删除的操作,这个方法会先删除数据块信息和meta信息,删除之后调用 datanode.notifyNamenodeDeletedBlock(block, volume.getStorageID());向namenode报告最近删除的数据块. 但是这个时候并不是将这些信息直接发给namenode,而是要删除的blocks和其对应的DatanodeStorage生成ReceivedDeletedBlockInfo对象存在了BPServiceActor的Map<DatanodeStorage, PerStoragePendingIncrementalBR>类型的map的变量pendingIncrementalBRperStorage中,等下次心跳的时候,由BPServiceActor来处理.

向namenode汇报删除的块

在BPServiceActor的心跳处理方法offerService中,会通过 reportReceivedDeletedBlocks();读取pendingIncrementalBRperStorage变量中的blocks信息,向namenode汇报刚刚删除的数据块信息.

        if (sendImmediateIBR ||
            (startTime - lastDeletedReport > dnConf.deleteReportInterval)) {
          reportReceivedDeletedBlocks();
          lastDeletedReport = startTime;
        }

最终通过datanode和namenode之间的协议DatanodeProtocol.blockReceivedAndDeleted(DatanodeRegistration, String, StorageReceivedDeletedBlocks[])来向namenode报告刚才删除的数据块.

namenode处理删除block的汇报

namenode处理最近删除的块的方法是在NameNodeRpcServer中的方法blockReceivedAndDeleted中,通过跟踪代码,最终到了BlockManager.removeStoredBlock(Block, DatanodeDescriptor)中.

首先从blocksMap中移除相应的块信息,然后判断是否是因为datanode挂掉而导致的block被移除,并做相应的处理,然后从excessReplicateMap,corruptReplicas队列中将其移除.

  /**
   * Modify (block-->datanode) map. Possibly generate replication tasks, if the
   * removed block is still valid.
   */
  public void removeStoredBlock(Block block, DatanodeDescriptor node) {
    blockLog.debug("BLOCK* removeStoredBlock: {} from {}", block, node);
    assert (namesystem.hasWriteLock());
    {
      //从blocksMap移除
      if (!blocksMap.removeNode(block, node)) {
        blockLog.debug("BLOCK* removeStoredBlock: {} has already been" +
            " removed from node {}", block, node);
        return;
      }

      //判断是否是因为datanode失败而移除的数据块,如果block仍然有效,检查副本是不是必要的,在这种情况下,需要将block加到待复制的block列表中.
      // It's possible that the block was removed because of a datanode
      // failure. If the block is still valid, check if replication is
      // necessary. In that case, put block on a possibly-will-
      // be-replicated list.
      //
      BlockCollection bc = blocksMap.getBlockCollection(block);
      if (bc != null) {
        namesystem.decrementSafeBlockCount(block);
        updateNeededReplications(block, -1, 0);
      }

      //
      // We've removed a block from a node, so it's definitely no longer
      // in "excess" there.
      //从excessReplicateMap移除
      LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(node
          .getDatanodeUuid());
      if (excessBlocks != null) {
        if (excessBlocks.remove(block)) {
          excessBlocksCount.decrementAndGet();
          blockLog.debug("BLOCK* removeStoredBlock: {} is removed from " +
              "excessBlocks", block);
          if (excessBlocks.size() == 0) {
            excessReplicateMap.remove(node.getDatanodeUuid());
          }
        }
      }
       //从corruptReplicas移除
      // Remove the replica from corruptReplicas
      corruptReplicas.removeFromCorruptReplicasMap(block, node);
    }
  }

总结

上述只是基于hadoop2.7.3源码自己的一些学习笔记,如有不对的地方,还请见谅。

本文分享自微信公众号 - 大数据技术与应用实战(bigdata_bigdata),作者:zhangjun

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-05-26

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • flink实战教程-使用set实时计算当天网站uv

    对于web网站,我们一般会有这样的需求,实时的计算出来当天网站的uv,尽可能快的展示出来。今天我们就讲一下基于java的set集合做一下实时uv的统计。

    大数据技术与应用实战
  • hadoop源码解析之RPC分析

    因为hadoop底层各种通讯都用的是rpc,如client和namenode、client和datanode、namanode和datanode等。所以首先学习...

    大数据技术与应用实战
  • 放弃fastjson,拥抱Jackson

    最近由于阿里的fastjson频繁爆出安全漏洞,为了避免后续升级上线的烦恼,决定弃用fastjson,使用Jackson,把现有项目中的fastjson都换成了...

    大数据技术与应用实战
  • 每日一题C++版(树的高度)

    编程是很多偏计算机、人工智能领域必须掌握的一项技能,此编程能力在学习和工作中起着重要的作用。因此小白决定开辟一个新的板块“每日一题”,通过每天一道编程题目来强化...

    小白学视觉
  • 【专知-关关的刷题日记19】Leetcode 118. Pascal&#39;s Triangle

    题目 Given numRows, generate the first numRows of Pascal's triangle. For example, ...

    WZEARW
  • 留给国足的时间不多了,我渴望温暖的阳光,更期待百花吐蕊的春天。。。

    一个是广大球迷恨铁不成钢的国足又又又输球了,对手叙利亚,国内战火纷飞,民不聊生,但是人家的足球依然能够顽强的战胜国足,此处给叙利亚男子国家足球队点个赞。而对于中...

    周萝卜
  • 为你在 Bash 历史中执行过的每一项命令设置时间和日期

    在默认情况下,所有通过 Bash 在命令行中执行过的命令都被存储在历史缓存区或者一个叫做 ~/.bash_history 的文件里。这意味着系统管理员可以看到系...

    :::::::
  • 论文研读-基于变量分类的动态多目标优化算法

    [1] K. Deb, U. V. Rao, and S. Karthik, “Dynamic multi-objective optimization and...

    DrawSky
  • 集合系列 Map(十二):HashMap

    HashMap 是 Map 基于哈希散列算法的实现,其在 JDK1.7 中采用了数组+链表的数据结构。在 JDK1.8 中为了提高查询效率,采用了数组+链表+红...

    陈树义
  • 打造运维大脑:翼支付高速发展背后,甜橙金融的云化智能演进

    不久前,在由ACOUG与云和恩墨主办的2018数据技术嘉年华的金融科技实战分论坛上,甜橙金融分享了其云化变革的成功经验。

    数据和云

扫码关注云+社区

领取腾讯云代金券