首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >面试大数据必问的一道题,这次不死记硬背,直接看源码,惊艳一下面试官

面试大数据必问的一道题,这次不死记硬背,直接看源码,惊艳一下面试官

作者头像
kk大数据
发布2021-04-22 14:56:25
5060
发布2021-04-22 14:56:25
举报
文章被收录于专栏:kk大数据kk大数据

一、前言

想必每次去面试都会被问这样一道题:HDFS 的写数据流程

那每次准备面试前,自然是先百度一番,复制一下答案,1 2 3 4 5 6 点,背一背完事。但是面试完,还是不了解 HDFS 写数据流程内部究竟是怎么实现的。

其实这个流程里面有很多我们值得学习的东西,比如写数据到 DataNode,如何保障数据一致性,如何保障数据在写的时候不丢失,重试如何做的,如何做三备份的?

那么这次咱就趴一趴 HDFS 的写数据流程吧。

二、往 HDFS 写数据的客户端代码

我们用 HDFS 的 api ,从一个写数据的代码开始剖析这个过程:

public class TestWriteHdfsFile {

    public static void main(String[] args) throws IOException {
        Configuration configuration = new Configuration();
        FileSystem fileSystem = FileSystem.get(configuration);
        FSDataOutputStream fsDataOutputStream = fileSystem.create(new Path("/user/data.txt"));
        fsDataOutputStream.write("contents".getBytes(StandardCharsets.UTF_8));
    }
}

从这段源码可以看到,写一个数据要有两步,第一步是要 create 一个文件,第二部才是往这个文件写对象。

三、写数据的 create 方法

先获得一个 FileSystem 对象,创建了一个文件返回了 FSDataOutputStream 对象,然后用这个对象写字节数组。

自然得从 create 方法看起了,点进去(当前类:FileSystem)

再点进去(当前类:FileSystem)

再点进去(当前类:FileSystem)

再点进去(当前类:FileSystem)

再点进去,发现是一个抽象方法了(当前类:FileSystem)

ctrl + alt + B,找到实现类,并进入 DistributedFileSystem 类中(当前类:DistributedFileSystem )

再点进去(当前类:DistributedFileSystem )

在 dfs 的 create 方法中,做了三件事情:

  • 往 NameNode 文件目录树的合理位置中添加了 INodeFile 节点;
  • 对这个要往里面写数据的文件,添加了【契约】管理;
  • 启动了 DataStreamer,这是写数据的关键服务。

我们再点进去(当前类:DFSClient)

点进去(当前类:DFSClient)

我们来看 DFSOutputStream 方法的 newStreamForCreate 方法,点进去(当前类:DFSClient)

可以看到,这里即将要调用 NameNode 的代理去执行 create 方法。create 方法再点进去已经没有实现类了,因为这已经是在用 Hadoop Rpc 调用 NameNode 的代理方法了。

具体的实现在 NameNodeRpcServer 里面,NameNode 这一侧大概做了三件事:

(1)创建了一个 INodeFile 加入到内存的文件目录管理树里面;

(2)为当前文件创建契约。所谓契约就是,拥有了这个文件契约的客户端才有权限去写这个文件,防止多个客户端同时写一个文件。

(3)把元数据信息存入 EditLog 日志里面;

(这里面也有很多的逻辑,我们用一个新篇章来讲这些事情)

到目前位置,当前的流程图为:

在 NameNode 这一侧完成了这些事情之后,下面创建了一个很重要的对象:DFSOutputStream,这是写数据流程里面最重要的一个对象。

当前类(DFSOutputStream)

点进去 start 方法看一下,发现是调用的内部的 DataStreamer 类的 start 方法,DataStreamer 类是一个线程。

可以看一下(当前位置:DFSOutputStream 的 DataStreamer 类)

它既然是一个线程,那我们看下这个类的 run 方法是在做什么:

可以看到这是一个 while 循环,里面用 synchronize 锁住了 DataQueue 对象,如果这个队列是空的话,就在 wait 。(当前位置:DFSOutputStream 的 DataStreamer 类)

此时我们的流程图是这样子的:

到目前为止,就是 create 方法,做了这么多的事情。记住,当前还是在 客户端这边的。

四、开始执行 write 方法

我们直接点进去 write() 方法(当前位置:TestWriteFile 客户端测试代码)

发现点进去竟然是在 jdk 的代码里面,这不太对。

我们可以再次从 create 方法点到 这个地方来(当前位置:DistributedFileSystem)

点进去,发现其实它其实真正返回的类是 HdfsDataOutputStream,并且在构造函数里面传了一个 out 对象进来,这个 out 对象就是 DFSOutputStream。(当前位置:DFSClient)

那我们看一下 HdfsDataOutputStream 这个类的 write 方法。搜索了一下,发现没有。那可能在父类里,我们去父类里看一下:

发现父类里有 write() 方法(当前位置:FSDataOutputStream)

调用的是 out 的 write 方法,上面可以看到,其实 out 是 DFSOutputStream。那我们看 DFSOutputStream 的 write 方法,发现又没有,那可能是在父类里面:

发现父类有 write 方法(当前位置:FSOutputSummer)

再进去就是核心写数据的方法了。

五、HDFS 文件最小的组成并不是 Block

到这里,我们要普及一下 HDFS 文件块的组成结构。

一个 Block 是有很多个 Packet(小包)组成的,而每个 Packet 又是有很多个 chunk(块)+checksum(校验码) 组成的。

其中每个 chunk 是 512 byte,校验码是 4 个字节。每个 Packet 是 64K,每个 Packet 是有 127 个 chunk 组成的。

六、继续写流程

上面那个 write 方法是一个字节一个字节往内存里写,每写一个字节都要判断是否缓存到了一定大小(9个chunk大小),如果到了一定大小,就开始 flushBuffer。

我们点到 flushBuffer 方法里来(当前位置:FSOutputSummer)

这里开始 for 循环 byte[] 数组,开始一个 chunk 一个 chunk 的开始写(当前位置:FSOutputSummer)

再往里面点就是抽象类了,我们看它的实现类(当前位置:DFSOutputStream)

我们进入到这个方法里来。先创建一个 Packet 对象(当前位置:DFSOutputStream)

然后往 Packet 对象里写入 chunk 和 checksum 的数据(当前位置:DFSOutputStream)

随着数据的不断写入,Packet 的数据会越来越多。直到一个 Packet 写满 或者 Block 写满,都表示一个 Packet 写满了,开始把 Packet 对象放入到队列中。(当前位置:DFSOutputStream)

继续点进去看。(当前位置:DFSOutputStream)

可以看到,往队列 dataQueue add 了一个 Packet 对象,并且调用了 notifyAll() 方法。

至于什么要放入队列中,为什么要调用 notifyAll() 方法?

还记得上面有一个 DataStreamer 线程吗,它的 run 方法里面,在 while 循环判断 DataQueue 是否有数据,如果 DataQueue 是空的,就在那 wait 。DataStreamer 线程其实就是在等待其他线程往 DataQueue 里面放数据,并且通知它。

直到现在为止,我们的流程是这样的:

七、队列里有数据之后 DataStreamer 进程开始工作

当 DataQueue 里面有数据之后,其他线程调用了 notifyAll() 方法,DataStreamer 线程从 wait 方法开始继续执行。执行时,会再次进入 while 循环中,此时 DataQueue.size() 已经不等于0了,所以跳出 while 循环。(当前位置:DFSOutputStream)

从 DataQueue 里面取一条数据出来(当前位置:DFSOutputStream)

然后开始做一个非常重要的事情,那就是先向 NameNode 申请一个 Block 块,然后建立数据管道。(当前位置:DFSOutputStream)

八、向 NameNode 请求 Block 信息

我们继续点进去 nextBlockOutputStream() 方法。(当前位置:DFSOutputStream)

然后开始调用 NameNode 的方法,向 NameNode 申请 Block 信息:

这个时候,需要去看服务端代码了,我们进入 NameNodeRpcServer ,找到 addBlock 方法。(当前位置:NameNodeRpcServer)

我们可以稍微看一眼它的返回值 LocatedBlock 都有哪些属性,发现它会返回 DataNodeInfo 的信息,也就是说,我们这个 Block 该存到哪些 DataNode 上面。

我们再点到 getAdditionalBlock 方法里去,发现会使用 BlockManager 来选择当前 Block 该存放到哪些 DataNode 上(当前位置:FSNamesystem)

当然这里,肯定不是无脑选择的,肯定会有一些策略,比如负载均衡,机架感知策略。

然后会把 Block 信息挂载到目录树上,然后再写到磁盘上。(当前位置:FSNamesystem)

好了,向 NameNode 申请 Block 大致就是这么多过程,此时我们的流程图是这样的。

九、建立数据管道

到目前位置,Block 信息也申请完毕,下面我们要开始创建数据管道。

那么创建数据管道是个什么样的动作?

所谓数据管道,就是在 DataNode 和 DataNode 预先启动好相关的线程和 Socket。然后客户端是往某一个节点上(DataNode1)去写数据的,然后 DataNode 1 开始往 DataNode2 上写,DataNode2 同时往 DataNode3 写。

那这样做有什么好处?

如果假设客户端就直接往三个节点上写,那么客户端会同时维护三个连接。如果客户端要写很多文件的话,就要维护好多连接,压力比较大。更有甚者,如果客户端和机房不在一个局域网下,那么带宽压力也会非常大。

所以使用数据管道,客户端只需要维护和某一个节点的网络连接就可以了,加速效率,节省带宽,可以说是非常好的设计。

下面继续看源码,上面已经说到,客户端已经向 NameNode 申请了 Block,并且知道要往哪些 DataNode 上写数据了,下面开始建立数据管道。

当前位置:(DataStreamer :nextBlockOutputStream() 方法)

然后建立了一个 Socket 连接,创建了输出流,使用这个输出流来往 DataNode 写数据

开始写 Socket 请求(当前位置:DataStreamer)

我们点进去看一下,注意这里的类型,是 WRITE_BLOCK 类型的,会对应到下文接收方对于不同类型的处理。最后 flush 了,即把数据写了出去(当前位置:Sender)

然后既然 Socket 请求发出去了,肯定 DataNode 有个程序在接收数据,这个类叫 DataXceiverServer,我们打开看一下,发现它是一个线程,那我们来看一下 run 方法(当前位置:DataXceiverServer)。

首先接收 Socket 请求(当前位置:DataXceiverServer):

每发送过来一个 Packet ,都要创建一个 DataXceiver 线程去接收它(当前位置:DataXceiverServer):

我们来看一下 DataXceiver 的 run 方法(因为它是一个线程)(当前位置:DataXceiver),首先会读取数据:

然后处理这些数据(当前位置:DataXceiver):

然后根据不同类型,来做不同的处理(当前位置:Receiver):

然后看一下实现,是一个方法参数定义好几十的方法(当前位置:Receiver):

它是个抽象方法,我们来看它的实现类 DataXceiver,发现创建了一个 BlockReceiver,来接收数据(当前位置:DataXceiver):

这个方法执行完了之后,发现它会判断是否有下游的机器,如果有,则继续调用 Sender 的writeBlock 方法发送数据

那么这个方法又是刚才的方法了。

所以到现在为止,就是管道建立的过程,也就是把各个 DataNode 上的线程建立起来了,流程图如下所示:

建立数据管道是一个发送 Socket 请求的过程,既然是网络请求,那么肯定会遇到错误,是如何处理的呢?

如果建立过程中遇到错误,会返回 false。如果返回值是 false 的话,那么 namenode 会抛弃这个 block,并且会把错误机器的 DataNode 记录下来。由于这些方法是在 for 循环里面,下一次重新申请 Block 的话,就不会再去分配那台错误的 DataNode 了。

十、客户端收集各个 DataNode 写数据的结果

现在客户端已经和各个 DataNode 建立好了数据管道,开始写数据了。那么写数据的结果一样很重要,需要去收集各个 DataNode 写数据的结果。此时会创建一个重要的对象ResponseProcessor。(当前位置:DataStreamer)

可以看到,它是一个线程,会去读取下游的处理结果(当前位置:ResponseProcessor):

如果是成功的,就把这个 Packet 数据从 ack 中移除。

那么这里的 AckQueue 是什么呢?有什么作用?

我们先看启动了 ResponseProcessor 之后会做什么?会把 Packet 从 DataQueue 中移出去,然后加到 AckQueue 中来。(当前位置:DataStreamer):

最终,终于开始写数据(当前位置:DataStreamer):

下面来解释一下 ,DataQueue 和 AckQueue 是怎么配合使用的

就是客户端在写满一个 Packet 的时候,DataStreamer 线程从 DataQueue 的队列中取了一个出来,准备发到 DataNode,此时它会再写一份到 AckQueue 中,然后才开始建立数据管道,写数据到 DataNode。

等到 DataNode 都写完了,DataNode 之间会一个个汇报自己写数据的结果上来,最终汇报给客户端。如果是成功的,就把这个 packet 从 AckQueue 中移除掉,如果失败了,则重新把这个 Packet 从 AckQueue 中拿到 DataQueue 中重试一次。

十一、DataNode 如何接收数据的

我们继续从第九点,建立数据管道后面开始看,也就是 DataXceiver 开始看(当前位置:DataXceiver):

它创建了一个 PacketResponder,(当前位置:BlockReceiver):

这个对象也很好理解,既然客户端要接收 DataNode 上报的结果创建了 ResponseProcessor ,那么 DataNode 之间也要去知道其他 DataNode 有没写数据成功,也需要一个 PacketResponder 来接收响应。

不过只需要在 DataNode1 和 DataNode 2 上启动就行了,DataNode3 是最后一个节点,是不需要的。

然后看一下 ResponseProcessor 这个线程做了什么。

如果不是下游的最后一个节点,则读取下游返回的结果,(当前位置:BlockReceiver):

如果下游处理成功,则把 Packet 从 AckQueue 中移除。注意这里的 AckQueue 不是客户端的那个,而是 DataNode 里面的 AckQueue,(当前位置:BlockReceiver):

如果处理不成功,那么把 AckQueue 的东西重试一遍就可以了。

然后它就开始不断的接收下游的请求了,在一个 while 循环里面,(当前位置:BlockReceiver)

我们来看这个 receivePacket 方法,它接收到数据之后,做的第一件事情就是把 Packet 数据放到 自己的 ackQueue 里面去,(当前位置:BlockReceiver):

然后把数据写到下游(当前位置:BlockReceiver):

然后校验一下数据(当前位置:BlockReceiver):

没问题的话,就把数据写到本地磁盘上面(当前位置:BlockReceiver):

所以,总结一下,就是当 DataNode 接收到数据以后,第一步就是把这个 Packet 放到自己的 AckQueue 里面,然后把数据发送到下游节点,校验数据并写到自己的磁盘。

同时下游的 DataNode 也会依次做这样的事情。也就是说,客户端在写数据的时候,所有的 DataNode 都会同时往自己的磁盘上写数据,并不是一个个写的。

此时整体流程是这样的:

十二、HDFS 写数据的容错机制

既然是往多个节点写,那么肯定是会出错的,看看它的容错是怎样的:

这里面就是把相关标志标记了一下

然后抛出了异常,最外面是捕获了异常的,捕获到异常后,又把 hasError 标识为 true 了,开始进入下一次循环。

然后循环的时候会走到最后这行代码这:

可以点进去看一下,重新把数据加入到了客户端的 DataQueue 里,并且清空了 AckQueue。

并且开始重新建立数据管道,此时会分成两种情况:

一种情况是,3 副本的情况下,只有一个 DataNode 挂掉了,那么就不再找一台机器再建立管道了,而是继续把数据写完。那么写完肯定就俩副本,会等到 DataNode 向 NameNode 发送心跳时,NameNode 会给 DataNode 一个指令,让它把自己的副本,拷贝到一个别的机器上。

另一种情况是,如果超过了半数都挂了,那么就只能推倒重来,重新建立数据管道了。

十三、总结一下

这个流程还是很复杂的,我们可以稍微理一下脉络

1、首先向 NameNode 建立文件,创建契约;

2、启动了 DataStreamer 线程;

3、写 Packet

4、向 NameNode 申请 Block;

5、建立数据管道以及容错;

6、ResponseProcessor 线程;

7、PacketResponder 线程;

8、写数据的容错。

其实写文章的好处,就是把自己研究过的东西记录下来,因为人的大脑真的很不靠谱,动不动就忘记了。有了笔记,日后可以拿来回忆,就不用重新看了,可以很容易就回忆起来。

好了,谢谢

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-04-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 KK架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、前言
  • 二、往 HDFS 写数据的客户端代码
  • 三、写数据的 create 方法
  • 四、开始执行 write 方法
  • 五、HDFS 文件最小的组成并不是 Block
  • 六、继续写流程
  • 七、队列里有数据之后 DataStreamer 进程开始工作
  • 八、向 NameNode 请求 Block 信息
  • 九、建立数据管道
  • 十、客户端收集各个 DataNode 写数据的结果
  • 十一、DataNode 如何接收数据的
  • 十二、HDFS 写数据的容错机制
  • 十三、总结一下
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档