1、Hadoop有几种部署方式
2、HDFS的各进程名称与功能
3、HDFS读流程
4、HDFS写流程
5、HDFS写文件时节点挂机处理
6、HDFS文件副本放置策略
7、常用的hdfs命令
8、MapReduce过程
9、HDFS文件存储格式
10、HDFS文件压缩算法
11、HDFS租约机制
12、HDFS安全模式
13、HDFS负载均衡(Rebalance)
14、HDFS存储策略与异构存储
15、HDFS纠删码
16、SecondaryNameNode
17、JournalNode
18、HDFS HA与联邦机制
19、HDFS中小文件过多导致的问题与如何优化
20、MapReduce跑得慢的原因
21、MapReduce优化方法
22、MapReduce数据倾斜描述与解决方案
23、HDFS调优技巧
1 独立模式
2 伪分布式模式
3 集群模式
1)跟NN通信查询元数据(block所在的DN的节点),找到文件块所在的DN的服务器。2)挑选一台DN(就近原则,然后随机)服务器,请求建立socket流。3)DN开始发送数据(从磁盘里读取数据放入流,一packet为单位做校验) 4)客户端以packet为单位接收,现在本地缓存,然后写入目标文件中,后面的block块就相当于append到前面的block块,最后合成最终需要的文件。
1)客户端发送消息给namenode请求上传,NameNode检查目标文件是否已存在,父目录是否存在并返回是否可以上传。2)NameNode根据机架感知策略返回3个DataNode节点,分别为dn1、dn2、dn3。3)dn1、dn2、dn3逐级应答客户端。通信管道建立完成。4)客户端上传之前对文件进行切片,切片规则:按datanode的block块大小进行切片,hadoop2.x默认block大小为128m(hadoop1.x默认block大小64m)。5)客户端开始往dn1上传第一个Block(先从磁盘读取数据放到一个本地内存缓存),以Packet为单位,packet默认大小为64k。6)dn1收到一个Packet就会传给dn2,dn2传给dn3;dn1每传一个packet会放入一个应答队列等待应答。7)当一个Block传输完成之后,客户端再次请求NameNode上传第二个Block依次类推直到文件传输完成。
在文件写入过程中不会重新分配datanode。如果写入时一个datanode挂掉,会将已经写入的数据放置到数据队列的顶部,并将挂掉的datanode移出通信管道,将数据写入到剩余的datanode。在写入结束后, namenode会收集datanode的信息,发现此文件的副本没有达到配置的要求(default=3),然后寻找一个datanode保存副本。
假设有一份数据,三副本
hdfs fs -help rm
hdfs fs -ls /
hdfs fs -mkdir -p /user/ysir
hdfs fs -get /aaa/a.txt
hdfs fs -put ~/a.txt /
hdfs fs -cat /user/ysir/a.txt
hdfs fs -chmod 777 /a.txt
hdfs fs -chown hdfs:hdfs /a.txt
hdfs fs -cp /aaa/a.txt /bbb/
hdfs fs -mv /aaa/a.txt /bbb/
hdfs fs -rm /user/ysir/a.txt
Input数据输入 -> Map -> partition -> 写缓冲 -> 溢写(按key排序-合并) -> combine->Merge(溢写过程-最终过程)-> copy -> merge -> reduce -> output数据输出
其中shuffle过程可以细分为map shuffle和reduce shuffle
input用于数据输入并做split切片,每个输入分片(input split)针对一个mapTask。Split是逻辑上的切分,并不会在物理上切分文件,分片存储的并不是数据本身,而是一个分片长度和一个记录数据位置的数组。 分片大小根据Math.max(minSize,Math.min(maxSize,blockSize))决定,minSize默认为1,分片大小默认为blockSize大小,最大可上浮1.1倍(源码中体现)切分时按照文件切分,不可跨文件切分。split不可跨文件切分,当存在多个小文件时,每个小文件都会产生一个mapTask,map任务的装载比较耗时,从而导致 MR 运行较慢。 采用CombineTextInputFormat可以将多个小文件从逻辑上规划到一个切片中,多个小文件产生的逻辑分片只需要占用一个mapTask。
mapTask读取split分片后的数据,根据用户自定义的代码逻辑进行key/value处理。
根据key或value及reduce的数量来决定当前的这对输出数据最终应该交由哪个reduce task处理。默认对key
hash后再以reduce task数量取模。默认的取模方式只是为了平均reduce的处理能力,用户也可自定义partitioner分配reduce。
数据经过partition确定目标reducer后,需要将数据写入内存缓冲区中,缓冲区的作用是批量收集map结果,减少磁盘IO的影响。key/value对以及Partition结果都会被写入缓冲区。当然写入之前,key与value值都会被序列化成字节数组。 内存缓冲区默认100MB,加大缓冲区可提升程序运行效率
溢写: 内存缓冲区默认是100MB。当mapTask输出结果过多会撑爆内存,所以需要将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。从内存往磁盘写数据的过程称为Spill(溢写)。溢写由单独线程完成,不影响map写缓冲。为了保障map在溢写过程中正常写缓冲,缓冲区有个溢写的比例,默认为缓冲区空间的0.8,也就是当缓冲区的数据已经达到阈值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。mapTask结果还可往剩下的20MB内存中写,互不影响。 排序: 当溢写线程启动后,需要对溢写内存区内的数据按key进行全局字典顺序。如果作业配置了Combiner,则会运行combine函数,减少写入磁盘的数据量。 合并: mapTask在溢写过程,排序完成后会进行合并操作,将数据按照key/value值拼接到一块,减少与partition相关的索引记录。如果作业配置了Combiner,则会运行combine函数,减少写入磁盘的数据量。
如果mapreduce中设置了combiner,则在数据溢写或merge的时候,combiner会根据key进行局部数据的合并,聚合并精简数据, 减少磁盘I/O 。combiner等同于map端的Reducer,适用于求累加,最大值等操作,不适用于求平均值。使用Combine可以大量的减少数据倾斜。
当前map完成后,内存缓冲区数据会被Flush至磁盘中。如果当前map只产生了一个溢写文件,则不会进行Merge,溢写文件就是最终发送至reduce的文件。如果溢写文件数量大于1,所有文件会合并成一个整体有序的文件作为reduce任务的输入。
每个reduce task不断地通过RPC从JobTracker获取mapTask是否完成的信息,如果reduce task得到通知,获知某台TaskTracker上的map task执行完成,Shuffle的后半段过程开始启动。
简单地说,reduce task在执行之前的工作就是不断地拉取当前job里每个map task的最终结果,然后对从不同地方拉取过来的数据不断地做merge,也最终形成一个文件作为reduce task的输入文件。reducetask根据自己的分区号,去各个maptask机器上取相应的结果分区数据,拉取的数据先存储在内存中,内存不够了,再存储到磁盘。
一个map的中间结果中包含多个reduce需要处理的部分数据的。为了优化reduce的执行时间,hadoop中是等job的第一个map结束后,所有的reduce就开始尝试从完成的map中下载该reduce对应的partition数据,因此map和reduce是交叉进行的,其实就是shuffle。 Reduce启动copy线程通过HTTP向各个Map任务下载它所需要的数据,一旦map任务完成之后,会通过心跳通知Application Master。 reduce的一个线程会周期性地向master询问,直到提取完所有数据。数据被reduce下载完成后,map端不会立刻删除数据,这是为了预防reduce任务失败需要重做。因此map输出数据是在整个作业完成之后才被删除掉的。
这里的merge如map端的merge动作,只是数组中存放的是不同map端copy来的数值。Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活,它基于JVM的heap size设置,因为Shuffle阶段Reducer不运行,所以应该把绝大部分的内存都给Shuffle用。 内存Merge:当缓冲中数据达到配置的阈值时,这些数据在内存中被合并、写入机器磁盘。阈值有2种配置方式:配置内存比例:前面提到reduceJVM堆内存的一部分用于存放来自map任务的输入,在这基础之上配置一个开始合并数据的比例。假设用于存放map输出的内存为500M,mapreduce.reduce.shuffle.merge.percent配置为0.66,则当内存中的数据达到330M的时候,会触发合并写入。配置map输出数量:通过mapreduce.reduce.merge.inmem.threshold配置。在合并的过程中,会对被合并的文件做全局的排序。如果作业配置了Combiner,则会运行combine函数,减少写入磁盘的数据量。 磁盘Merge:Copy过程中磁盘Merge:在copy过来的数据不断写入磁盘的过程中,一个后台线程会把这些文件合并为更大的、有序的文件。如果map的输出结果进行了压缩,则在合并过程中,需要在内存中解压后才能给进行合并。这里的合并只是为了减少最终合并的工作量,也就是在map输出还在拷贝时,就开始进行一部分合并工作。合并的过程一样会进行全局排序。最终磁盘中Merge:当所有map输出都拷贝完毕之后,所有数据被最后合并成一个整体有序的文件,作为reduce任务的输入。这个合并过程是一轮一轮进行的,最后一轮的合并结果直接推送给reduce作为输入,节省了磁盘操作的一个来回。
merge完成后会生成一个最终文件。直接作为Reducer的输入,但默认情况下,这个文件是存放于磁盘中的。至于怎样才能让这个文件出现在内存中,之后的性能优化篇我再说。当Reducer的输入文件已定,整个Shuffle才最终结束。然后就是Reducer执行,把结果放到HDFS上。
默认情况下一个 Reduce输出一个文件,默认文件名为 part-r-00000,输出文件的个数与 Reduce 的个数一致。 输出包括文本输出、二进制输出、数据库输出、以及集成OutputFormat类实现自定义输出。
hdfs 文件存储格式分为两大类:行存储和列存储
行存储:
行存储的写入是一次完成,在写入上有很大优势。将一整行存储在一起,是一种连续的存储方式,可以保证写入过程的成功或者失败,保证数据完整性。查询时如果只需要某列信息,也必须把整行都读入内存当中,在内存中对冗余数据进行过滤。没有索引的查询使用大量I/O,通过建立索引加快查询效率。因为在一行记录中可能存在多种类型的数据,数据解析需要在多种类型之间频繁转换,这个操作消耗CPU,增加了解析的时间。
列存储:
列存储由于需要把一行记录拆分成单列保存,写入次数明显比行存储多,实际时间消耗会更大。列存储会把文件切割成若干列,读取时只需要返回对应列的数据。由于每一列中的数据类型相同所以可以根据数据类型选择适合的编码和压缩格式
对照表格
操作类型 | 行存储 | 列存储 |
---|---|---|
hdfs格式 | TextFile,Sequence,MapFile,Avro | Parquet , RCFile,ORCFile |
存储 | 连续存储 | 按列存储 |
写入操作 | 一次写入整行,效率高 | 一行数据分列多次写入,效率较低 |
查询操作 | 整行读取,内存过滤冗余行 | 按列读取 |
压缩 | 每行数据类型不同,压缩性能较差 | 每列数据类型相同,压缩性能好 |
使用场景 | OLTP | OLAP |
1) textfile
textfile为默认格式,加载速度最快,可以采用Gzip进行压缩,压缩后的文件无法split。在检索时磁盘开销大,数据解析开销大。
2) SequenceFile
SequenceFile是Hadoop提供的一种二进制文件,以[Key,Value]的形式序列化到文件中。可以把SequenceFile当做是一个容器,把所有的文件打包到SequenceFile类中可以高效的对小文件进行存储和处理。
SequenceFile主要由一个Header后跟多条Record组成。Header主要包含了Keyname和valuename,还包含了一些同步标识,用于快速定位到记录的边界。每条Record以键值对的方式进行存储,内容包括:记录长度、Key长度、Key值和value值,Value的结构取决于该记录是否被压缩。
SequenceFile支持三种记录存储方式:
但是SequenceFile只支持Java, SequenceFile一般用来作为小文件的容器使用, 防止小文件占用过多的NameNode内存空间来存储其在DataNode位置的元数据。
3) RCFile
在一般的列存储中,会将不同的列分开存储,有时候存在一个表的某些列不在同一个HDFS块上,所以在查询的时候,Hive重组列的过程会浪费很多IO开销。
RCFile是Hive推出的一种专门面向列的数据格式。存储方式为数据按行分块,每块按照列存储的行列混合模式,具有压缩高,列存取快的特点。需要说明的是,RCFile在map阶段从远端拷贝仍然是拷贝整个数据块,并且拷贝到本地目录后RCFile并不是真正直接跳过不需要的列,而是通过扫描每一个行组的头部信息实现,但是在整个block级别的头部并没有定义每个列从哪个行组起始到哪个行组结束,所以读取全量数据的操作其性能比sequencefile低。
RCFile先将数据按行划分成行组,大小默认是4MB,行组内包括16字节的HDFS同步块信息,主要是为了区分同一个HDFS块上的相邻行组;元数据的头部信息主要包括该行组内的存储的行数、列的字段信息等等;在Row Group内部,再将数据按列划分存储。其结构如下:
4) ORCfile
是RCfile的升级版,支持文件切分,将数据划分为默认大小为250MB的stripe(条带),每个stripe包含索引,数据和footer。可以支持复杂的数据结构(比如Map等)
5) Parquet
parquet基于Google的dremel,擅长处理深度嵌套的数据(有点类似于嵌套多层的json格式),parquet会将嵌套结构整合为平面列存储。
6) Avro
Avro 是 Hadoop 中的一个子项目,也是 Apache 中一个独立的项目,Avro 是一个基于二进制数据传输高性能的中间件。在 Hadoop 的其他项目中,例如 HBase 和 Hive 的 Client 端与服务端的数据传输也采用了这个工具。Avro是一个语言无关的数据序列化的系统,它的出现主要是为了解决Writables缺少跨语言移植的缺陷。Avro将模式存储在文件头中,所以每个文件都是自描述的,而且Avro还支持模式演进(schema evolution),也就是说,读取文件的模式不需要与写入文件的模式严格匹配,当有新需求时,可以在模式中加入新的字段。Avro支持分片, 即使是进行Gzip压缩之后
在进行文件压缩算法的选择,首先要先考虑一下几个问题
1) Gzip压缩
优点:压缩率比较高,压缩/解压速度也比较快,hadoop本身支持。
缺点:不支持分片。
应用场景:当每个文件压缩之后在1个block块大小内,可以考虑用gzip压缩格式。2) lzo压缩
优点:压缩/解压速度也比较快,合理的压缩率,支持分片,是Hadoop中最流行的压缩格式,支持Hadoop native库。
缺点:压缩率比gzip要低一些,Hadoop本身不支持,需要安装,如果支持分片需要建立索引,还需要指定inputformat改为lzo格式。
应用场景:一个很大的文本文件,压缩之后还大于200M以上的可以考虑,而且单个文件越大,lzo优点越明显。
3) snappy压缩
优点:支持Hadoop native库,高速压缩速度和合理的压缩率。
缺点:不支持分片,压缩率比gzip要低,Hadoop本身不支持,需要安装。
应用场景:当MapReduce作业的map输出的数据比较大的时候,作为map到reduce的中间数据的压缩格式。
4) bzip2压缩
优点:支持分片,具有很高的压缩率,比gzip压缩率都高,Hadoop本身支持,但不支持native。
缺点:压缩/解压速度慢,不支持Hadoop native库。
应用场景:适合对速度要求不高,但需要较高的压缩率的时候,可以作为mapreduce作业的输出格式,输出之后的数据比较大,处理之后的数据需要压缩存档减少磁盘空间并且以后数据用得比较少的情况。
在HDFS中,当每次客户端用户往某个文件中写入数据的时候,为了保持数据的一致性,此时其它客户端程序是不允许向此文件同时写入数据的。那么HDFS是如何做到这一点的呢?答案是租约(Lease)。租约是HDFS给予客户端的一个写文件操作的临时许可证,无此证件者将不被允许操作此文件。客户端在每次读写HDFS文件的时候获取租约对文件进行读写,文件读取完毕了,然后再释放此租约。
租约管理
HDFS租约管理的操作集中在一个类上:LeaseManager。运行在NameNode的服务中。租约类的定义就是在LeaseManager中的。在LeaseManager租约管理器中,它所做的事情主要归纳为两类。
讲述完租约的概念以及管理之后,我们来分析租约的添加到释放的过程。以我们对于租约的一个传统概念应该是这样一个过程:首先在进行文件写操作时,进行租约的添加,然后操作结束之后,进行租约的释放。
在NameNode主节点启动时,HDFS会首先进入安全模式,检查包括文件副本的数量、可用的datanode数量、集群可用block比例等参数。以上参数达到阈值(可配置)后,H即可视为系统达到安全标准,HDFS自动离开安全模式。在安全模式下,文件系统只接受读数据请求,而不接受删除、修改等变更请求。且文件block不能进行任何的副本复制操作,因此达到最小的副本数量要求是基于datanode启动时的状态来判定的,启动时不会再做任何复制(从而达到最小副本数量要求)
安全模式相关配置
系统什么时候才离开安全模式,需要满足哪些条件?可以根据以下配置内容进行确定 如果有必要,也可以通过命令强制离开安全模式。与安全模式相关的主要配置在hdfs-site.xml文件中,主要有下面几个属性
dfs.namenode.replication.min: 最小的block副本数量,默认为1。
dfs.namenode.safemode.threshold-pct: 副本数达到最小要求的block占系统总block数的百分比,当实际比例超过该配置后,才能离开安全模式(但是还需要其他条件也满足)。默认为0.999f,也就是说符合最小副本数要求的block占比超过99.9%时,并且其他条件也满足才能离开安全模式。如果小于等于0,则不会等待任何block副本达到要求即可离开。如果大于1,则永远处于安全模式。
dfs.namenode.safemode.min.datanodes: 离开安全模式的最小可用datanode数量要求,默认为0。即所有datanode都不可用,仍然可以离开安全模式。
dfs.namenode.safemode.extension: 集群可用block比例、可用datanode都达到要求之后,如果在extension配置的时间段之后依然能满足要求,此时集群才离开安全模式。单位为毫秒,默认为1。也就是当满足条件并且能够维持1毫秒之后,离开安全模式。这个配置主要是对集群的稳定程度做进一步的确认。避免达到要求后马上又不符合安全标准。
总结一下,要离开安全模式,需要满足以下条件:1)达到副本数量要求的block比例满足要求;2)可用的datanode节点数满足配置的数量要求;3) 1、2 两个条件满足后维持的时间达到配置的要求。
Hadoop的HDFS集群非常容易出现机器与机器之间磁盘利用率不平衡的情况,例如:当集群内新增、删除节点,或者某个节点机器内硬盘存储达到饱和值。当数据不平衡时,Map任务可能会分配到没有存储数据的机器,这将导致网络带宽的消耗,也无法很好的进行本地计算。
当HDFS负载不均衡时,需要对HDFS进行数据的负载均衡调整,即对各节点机器上数据的存储分布进行调整。让数据均匀的分布在各个DataNode上,均衡IO性能,防止热点发生。进行数据的负载均衡调整,必须要满足如下原则:
Rebalance
rebalance作用是为了使数据在集群中各节点的分布尽量均衡,rebalance是一个非自动的管理功能,在任意一台能够连接到HDFS的机器上命令行下输入 hadoop balancer [-threshold] 既会启动。如果集群处于不平衡状态,这个过程就会在不平衡的节点之间迁移数据,如果rebalance过程没有被打断的话,完成此次rebalance目标后过程会自动停止。
影响rebalance的参数
threshold 默认设置:10,参数取值范围:0-100 参数含义:判断集群是否平衡的阈值。如果没有达到则进行平衡任务,平衡过程中标准达到此阈值后退出。理论上,该参数设置的越小,整个集群就越平衡。
dfs.balance.bandwidthPerSec 默认设置:1048576(1M/S) 参数含义:Balancer运行时允许占用的带宽,默认为1M/S,如果宽带占用过低则影响均衡效率,宽带占用过高则影响HDFS正常任务的读写IO。
负载均衡过程
数据均衡过程的核心是一个数据均衡算法,该数据均衡算法将不断迭代数据均衡逻辑,直至集群内数据均衡为止。该数据均衡算法每次迭代的逻辑如下:
Hadoop从2.6.0版本开始支持异构存储,异构存储的意义在于HDFS中频繁访问的数据,可以将其保存在更高访问性能的存储介质(内存或SSD)上,提升其读写性能;对于几乎不会访问的数据,保存在机械硬盘或其他廉价存储上,降低成本。HDFS异构存储的配置需要用户对目录指定存储策略,即用户需要预先知道每个目录下的文件的访问热度:事先划分好冷热数据存储目录,设置好对应的存储策略,然后后续相应的程序在对应分类目录下写数据,自动继承父目录的存储策略
存储介质:
hdfs的存储策略依赖于底层的存储介质。hdfs支持的存储介质:
存储介质配置:将对应的存储类型添加到dfs.datanode.data.dir的配置项中即可,配置的时候需要申明存储类型和对应目录,存储类型需要用中括号括起来,存储类型有[SSD]/[DISK]/[ARCHIVE]/[RAM_DISK],如果不指定存储类型,则默认就是DISK。
上面例子,前面12个盘,我没有设置存储类型,因为都是DISK,最后一个盘使用了SSD类型。
存储策略
存储策略可配置,可以设置全局的,也可以设置到某个文件夹。
存储策略配置:
HDFS提供了专门的命令来设置对应的策略,命令使用方法如下:查看策略帮助信息:
hdfs storagepolicies -help
列出当前版本支持的存储策略:
hdfs storagepolicies -listPolicies
设置对应路径的策略:
hdfs storagepolicies -setStoragePolicy -path -policy
具体流程
1、在hdfs的配置文件hdfs-site.xml中配置对应的异构存储
2、DataNode启动的时候从配置文件中读取对应的存储类型,以及容量情况,并通过心跳的形式不断的上报给NameNode。
3、NameNode收到DataNode发送的关于存储类型、容量等内容的心跳包后,会进行处理,更新存储的相关内容。
4、写请求发到NameNode后,NameNode根据写请求具体的目录对应的存储策略选择对应的存储类型的DataNode进行写入操作。
为了数据的可靠性,HDFS通过多副本机制来保证。三副本冗余下,1TB的原始数据需要占用3TB的磁盘空间,存储利用率只有1/3。而且系统中大部分是使用频率非常低的冷数据,,给存储空间和网络带宽带来了很大的压力。因此,在保证可靠性的前提下如何提高存储利用率已成为当前HDFS面对的主要问题之一。
Hadoop 3.0 引入了纠删码技术(Erasure Coding),它可以提高50%以上的存储利用率,并且保证数据的可靠性。
纠删码技术简称EC,是一种编码容错技术。它通过对数据进行分块,然后计算出校验数据,使得各个部分的数据产生关联性。当一部分数据块丢失时,可以通过剩余的数据块和校验块计算出丢失的数据块。
缺点:
优缺点
优点:
缺点:
总结
综合副本存储和EC存储优缺点,EC存储更适合用于存储备份数据和使用频率较少的非热点数据,副本存储更适用于存储需要追加写入和经常分析的热点数据。
NameNode主要是用来保存HDFS的元数据信息,包括内存元数据和元数据文件;它们的存在的位置分别为:内存和磁盘上。其中内存元数据主要是hdfs文件目录的管理;元数据文件则用于持久化存储。元数据主要包括:1、文件、目录自身的属性信息,例如文件名,目录名,修改信息等。2、文件记录的信息的存储相关的信息,例如存储块信息,分块情况,副本个数等。3、记录HDFS的Datanode的信息,包括节点位置,存活状态,空间大小,用于对DataNode的管理。
在NameNode重启时,edit logs才会合并到fsimage文件中,从而得到一个文件系统的最新快照。但是NameNode是很少重启的,导致edits变得很大,在namenode重启时会占用大量时间合并。如果NameNode挂掉了,可能会丢失内存中的改动。
SecondaryNameNode就是来帮助解决上述问题的,它的职责是合并NameNode的edit logs到fsimage文件中。
Secondary NameNode的整个目的是在HDFS中提供一个检查点。它只是NameNode的一个助手节点,而不是备份节点。Secondary NameNode所做的不过是在文件系统中设置一个检查点来帮助NameNode更好的工作。它不是要取代掉NameNode也不是NameNode的备份。所以更贴切的中文释义应该为辅助节点或检查点节点
主要用于HDFS HA中的两个namenode之间数据同步,当active状态namenode的命名空间有任何修改时,会告知大部分的JournalNodes进程。standby状态的NameNode有能力读取JNS中的变更信息,并且一直监控edit log的变化,把变化应用于自己的命名空间。standby可以确保在集群出错时,命名空间状态已经完全同步了,保证数据的状态一致。
在一个典型的HA集群中,每个NameNode是一台独立的服务器。在任一时刻,只有一个NameNode处于active状态,另一个处于standby状态。其中,active状态的NameNode负责所有的客户端操作,standby状态的NameNode处于从属地位,维护着数据状态,随时准备切换。
HDFS HA
namenode是HDFS的核心节点,存储了各类元数据信息,并管理文件系统的命名空间和客户端对文件的访问。但在HDFS1.0中,只存在一个NN,一旦发生“单点故障”,就会导致整个系统失效。虽然有个SecondaryNameNode,但是它并不是NN的热备份,SNN主要功能在于周期性的从NN中获取FsImage和EditLog,进行合并后再发送给NN,替换掉原来的FsImage,以防止EditLog文件过大,导致NN失败恢复时消耗太多时间。
由于SNN无法提供“热备份”功能,在NN发生故障时,无法立即切换到SNN对外提供服务,仍需要停机恢复。HDFS2.0采用了HA(High Availability)架构。在HA集群中,一般设置两个NN,其中一个处于“活跃(Active)”状态,另一个处于“待命(Standby)”状态。处于Active状态的NN负责对外处理所有客户端的请求,处于Standby状态的NN作为热备份节点,保存了足够多的元数据,在Active节点发生故障时,立即切换到活跃状态对外提供服务。
由于Standby NN是Active NN的“热备份”,因此Active NN的状态信息必须实时同步到StandbyNN。Active NN将更新数据写入到JournalNode,Standby NN会一直监听,一旦发现有新的写入,就立即从JournalNode中读取这些数据并加载到自己内存中,从而保证与Active NN状态一致。
NN保存了数据块与DN的映射信息。当一个DN加入到集群中时,它会把自身数据块列表发送给NN,定期通过心跳方式以确保NN中的块映射是最新的。因此,为了实现故障时的快速切换,必须保证StandbyNN中也包含最新的块映射信息,为此需要给DN配置Active和Standby两个NN的地址,把块的位置和心跳信息同时发送到两个NN上。为了防止出现“脑裂”现象,还要保证在任何时刻都只有一个NN处于Active状态,需要Zookeeper实现。
单组Namenode局限性
HDFS HA只允许整个集群有一个活动的Namenode,管理所有的命名空间。随着集群规模的增长,单Namenode的局限性越发的明显,主要表现在以下几个方面:
既然单Namenode存在上述局限性,那么为什么要通过联邦机制横向拓展Namenode,而不是纵向扩展,主要有以下原因:
HDFS联邦
在HDFS联邦中,设计了多个相互独立的NN,使得HDFS的命名服务能够水平扩展,这些NN分别负责自己所属的目录,不需要彼此协调。如NameNode1负责/database目录,那么在/database目录下的文件元数据都由NameNode1负责。各NameNode间元数据不共享,每个NameNode都有对应的standby。每个DN要向集群中所有的NN注册,并周期性的发送心跳信息和块信息,报告自己的状态。
HDFS联邦拥有多个独立的命名空间,其中,每一个命名空间管理属于自己的一组块,这些属于同一个命名空间的块组成一个“块池”。每个DN会为多个块池提供块的存储,块池中的各个块实际上是存储在不同DN中的。
块池(block pool):属于某一命名空间(NS)的一组文件块。联邦环境下,每个namenode维护一个命名空间卷(namespace volume),包括命名空间的元数据和在该空间下的文件的所有数据块的块池。
应用场景:超大规模文件存储。如互联网公司存储用户行为数据、电信历史数据、语音数据等超大规模数据存储。此时NameNode的内存不足以支撑如此庞大的集群。常用的估算公式为1G对应1百万个块,按缺省块大小计算的话,大概是128T (这个估算比例是有比较大的富裕的,其实,即使是每个文件只有一个块,所有元数据信息也不会有1KB/block)。
小文件过多导致的问题
小文件是指文件size小于HDFS上block大小的文件。这样的文件会给hadoop的扩展性和性能带来严重问题。
优化方案
1) 使用HAR(Hadoop Archives)
为了缓解大量小文件带给namenode内存的压力,Hadoop 0.18.0引入了Hadoop Archives(HAR files),其本质就是在HDFS之上构建一个分层文件系统。通过执行hadoop archive 命令就可以创建一个HAR文件。在命令行下,用户可使用一个以har://开头的URL就可以访问HAR文件中的小文件。使用HAR files可以减少HDFS中的文件数量。下图为HAR文件的文件结构,可以看出来访问一个指定的小文件需要访问两层索引文件才能获取小文件在HAR文件中的存储位置,因此,访问一个HAR文件的效率可能会比直接访问HDFS文件要低。对于一个mapreduce任务来说,如果使用HAR文件作为其输入,仍旧是其中每个小文件对应一个map task,效率低下。所以,HAR files最好是用于文件归档。
2) 使用sequencefile
SequenceFile核心是以文件名为key,文件内容为value组织小文件。10000个100KB的小文件,可以编写程序将这些文件放到一个SequenceFile文件,然后就以数据流的方式处理这些文件,也可以使用MapReduce进行处理。一个SequenceFile是可分割的,所以MapReduce可将文件切分成块,每一块独立操作。不像HAR,SequenceFile支持压缩。在大多数情况下,以block为单位进行压缩是最好的选择,因为一个block包含多条记录,压缩作用在block之上,比reduce压缩方式(一条一条记录进行压缩)的压缩比高。把已有的数据转存为SequenceFile比较慢。比起先写小文件,再将小文件写入SequenceFile,一个更好的选择是直接将数据写入一个SequenceFile文件,省去小文件作为中间媒介。
3) MapReduce过程中使用CombineFileInputFormat
CombineFileInputFormat是一种新的inputformat,用于将多个文件合并成一个单独的split,另外,它会考虑数据的存储位置。
Mapreduce 程序效率的瓶颈在于两点:
1) 计算机性能
CPU、内存、磁盘健康、网络
2) I/O 操作优化
(1)数据倾斜
(2)map和reduce数设置不合理
(3)reduce等待过久
(4)小文件过多
(5)大量的不可分块的超大文件
(6)spill次数过多
(7)merge次数过多等。
1) 数据输入
(1)合并小文件:在执行mr任务前将小文件进行合并,大量的小文件会产生大量的map任务,增大map任务装载次数,而任务的装载比较耗时,从而导致 mr 运行较慢。(2)采用ConbinFileInputFormat来作为输入,解决输入端大量小文件场景。
2) map阶段
(1)减少spill次数:通过调整io.sort.mb及sort.spill.percent参数值,增大触发spill的内存上限,减少spill次数,从而减少磁盘 IO。
(2)减少merge次数:通过调整io.sort.factor参数,增大merge的文件数目,减少merge的次数,从而缩短mr处理时间。
(3)在 map 之后先进行combine处理,减少 I/O。
3) reduce阶段
(1)合理设置map和reduce数:两个都不能设置太少,也不能设置太多。太少,会导致task等待,延长处理时间;太多,会导致 map、reduce任务间竞争资源,造成处理超时等错误。
(2)设置map、reduce共存:调整slowstart.completedmaps参数,使map运行到一定程度后,reduce也开始运行,减少reduce的等待时间。
(3)规避使用reduce,因为Reduce在用于连接数据集的时候将会产生大量的网络消耗。
(4)合理设置reduce端的buffer,默认情况下,数据达到一个阈值的时候,buffer中的数据就会写入磁盘,然后reduce会从磁盘中获得所有的数据。也就是说,buffer和reduce是没有直接关联的,中间多个一个写磁盘->读磁盘的过程,既然有这个弊端,那么就可以通过参数来配置,使得buffer中的一部分数据可以直接输送到reduce,从而减少IO开销:mapred.job.reduce.input.buffer.percent,默认为0.0。当值大于0的时候,会保留指定比例的内存读buffer中的数据直接拿给reduce使用。这样一来,设置buffer需要内存,读取数据需要内存,reduce计算也要内存,所以要根据作业的运行情况进行调整。
4) IO传输
(1)采用数据压缩的方式,减少网络IO的的时间。安装Snappy和LZOP压缩编码器。
(2)使用SequenceFile二进制文件
描述:简单来说数据倾斜就是Map阶段数据的key分化严重不均,在进行shuffle之后,一部分reduce节点数据过多,一部分节点数据过少,最终导致整个程序运行时间很长才结束。
一般会有两种情况:
举个 word count 的入门例子,它的map 阶段就是形成 (“aaa”,1)的形式,然后在reduce 阶段进行 value 相加,得出 “aaa” 出现的次数。若进行 word count 的文本有100G,其中 80G 全部是 “aaa” 剩下 20G 是其余单词,那就会形成 80G 的数据量交给一个 reduce 进行相加,其余 20G 根据 key 不同分散到不同 reduce 进行相加的情况。如此就造成了数据倾斜,最后就是很多reduce节点跑到 99%然后一直在原地等着那80G的reduce节点跑完。
解决方案:
1) 增加reduce 的jvm内存 既然reduce 本身的计算需要以合适的内存作为支持,在硬件环境容许的情况下,增加reduce 的内存大小显然有改善数据倾斜的可能,这种方式尤其适合数据分布第一种情况,单个值有大量记录, 这种值的所有纪录已经超过了分配给reduce 的内存,无论你怎么样分区这种情况都不会改变. 当然这种情况的限制也非常明显, 1.内存的限制存在,2.可能会对集群其他任务的运行产生不稳定的影响.
2) 增加reduce 个数 这个对于数据分布第二种情况有效,唯一值较多,单个唯一值的记录数不会超过分配给reduce 的内存. 如果发生了偶尔的数据倾斜情况,增加reduce 个数可以缓解偶然情况下的某些reduce 不小心分配了多个较多记录数的情况. 但是对于第一种数据分布无效.
3) 自己实现partition类 自定义partition类,将指定key的数据分配至指定的reduce中,这种方法需要对数据有一个明确的了解,知道数据中key的分布情况。4) 添加combiner操作 combiner相当于提前进行reduce,就会把一个mapper中的相同key进行了聚合,减少shuffle过程中数据量,以及reduce端的计算量。这种方法可以有效的缓解数据倾斜问题,但是如果导致数据倾斜的key 大量分布在不同的mapper的时候,这种方法就不是很有效了。
HDFS提供了十分丰富的配置选项,几乎每个HDFS配置项都具有默认值,一些涉及性能的配置项的默认值一般都偏于保守。根据业务需求和服务器配置合理设置这些选项可以有效提高HDFS的性能。