如何在Hadoop中处理小文件-续

Fayson在前面的文章《如何在Hadoop中处理小文件》和《如何使用Impala合并小文件》中介绍了什么是Hadoop中的小文件,以及常见的处理方法。这里Fayson再补充一篇文章进行说明。

HDFS中太多的小文件往往会带来性能下降以及扩展性受限问题,为了避免这个问题,我们一般需要控制每个文件尽可能的接近HDFS block大小比如256MB,或者是block size的几倍。

在抽取数据时,应尽可能调整抽取管道以保存较少数量的大文件,而不是大量的小文件。如果你做不到,比如实时场景在抽数的时候总是一小批一小批,那只能事后定期的去合并这些小文件。

本文Fayson主要介绍如何最小化小文件生成以及如何合并小文件。

1

小文件是如何产生的

以下是产生小文件的典型场景:

1.滴漏数据(Trickling data) - 数据是以小批量的形式进行增量抽取会导致小文件的产生,那只能事后定期使用一些额外的作业去合并这些小文件。

2.大量的map或者reduce任务 - 大量map或者reduce任务的MapReduce作业或Hive查询很多文件,比如Map-Only的作业有多少个map就会生成多少个文件,如果是Map-Reduce作业则有多少个reduce就会生成多少个文件。

3.过度分区的表 - 比如一个Hive表有太多分区,每个分区下只有几个文件甚至只有一个小文件,这时考虑降低分区的粒度比如从按照天分区改为按照月份分区。

4.上述情况的组合 - 如果上面三种情况组合出现,会加剧小文件问题。比如过度分区的Hive表,每个分区下都是很多个小文件而不是大文件。

2

分区设计

分区是指将大型Hive/Impala表物理拆分为多个更小的,容易管理的部分。当根据分区进行查询时,只需要扫描必要分区的数据,从而显著提升查询性能。

在HDFS中尽量保存大文件的原则同样适用于分区表的每个分区,我们应尽量保证每个分区对应的HDFS目录下的文件都较大。所以在设计表分区时,应该注意一下几点:

1.避免过度分区表。在确定分区的粒度时,请考虑每个分区将存储的数据量。确保每个分区保存的文件都是大文件(256MB的文件或者更大),即使这样设计会导致分区粒度变得更粗,比如从按天分区变为按月分区。

2.对于数据量较小(几百MB)的表,请考虑创建一个非分区表。这样即使我们只扫描单个文件夹下的所有文件,也会比处理分散在数个分区中的数百甚至数千个文件性能要好。

3

文件格式和压缩

根据过往的经验,有些大的集群碰到小文件问题,往往是大量的Hive/Parquet表以未压缩的方式存储,并使用TEXTFILE文件格式。

从本质上说,HDFS中的文件或者Hive/Impala的表文件你选择何种文件格式,对于小文件问题没有直接关系。然而,使用低效的文件格式(比如TEXTFILE)和没有压缩的数据会从侧面影响小文件问题甚至是加剧,从而影响集群的性能和可扩展性,具体包含以下几个方面:

1.使用低效的文件格式,尤其是未压缩的文件格式,会导致HDFS空间使用量的增加以及NameNode需要跟踪的块数量的增加。如果文件很小,由于要存储的原始数据量较大,可能会有更多的小文件。

2.由于读取和写入大量数据而导致更高的IO争用。

3.从非常宽的表(具有大量字段的表)中读取非列式存储格式(TextFile,SequenceFile,Avro)的数据要求每个记录都要从磁盘中完全读取,即使只需要几列也是如此。像Parquet这样的列式格式允许仅从磁盘读取所需的列,这样可以显著提高性能。

为了确保性能和高效存储之间的良好平衡,答应Fayson,请尽量使用PARQUET格式创建表,并确保在向其写入数据时启用数据压缩(除非对Hive / Impala表使用的存储格式有特定要求)。

在Hive中,使用以下示例创建Parquet表,并确保在插入时使用Snappy压缩来压缩数据。

# Hive

# Create table
CREATE TABLE db_name.table_name (
...
)
STORED AS PARQUET
LOCATION '/path/to/table';

# Create table as select
SET parquet.compression=snappy; 

CREATE TABLE db_name.table_name
STORED AS PARQUET
AS SELECT ...;

# Insert into/overwrite table
SET parquet.compression=snappy; 

INSERT INTO TABLE db_name.table_name
SELECT ...;

(可左右滑动)

在Impala中,使用以下语法:

# Impala

# Create table
CREATE TABLE db_name.table_name (
...
)
STORED AS PARQUET
LOCATION '/path/to/table';

# Create table as select
SET compression_codec=snappy; 

CREATE TABLE db_name.table_name
STORED AS PARQUET
AS SELECT ...;

# Insert into/overwrite table
SET compression_codec=snappy; 

INSERT INTO TABLE db_name.table_name
SELECT ...;

(可左右滑动)

4

使用Hive最小化小文件生成

Hive查询会被转化为一串多个Map-Reduce(或Map-Only)作业执行。当查询处理大量数据时,这些作业会被分解为大量的map或者reduce来并行执行。

Hive查询执行的最后一个Map-Reduce作业的task数将决定查询生成的文件数。如果最后一个作业是Map-Only作业,则文件数将与该作业的map数相同;如果最后一个作业是Map-Reduce作业,则reduce的数量将决定生成的文件数。根据查询产生的数据量,单个生成的文件可能非常小。

有两种简单配置Hive作业的方法,可以最大限度地减少查询生成的文件数量:

4.1

动态文件合并

通过设置下面表格里的参数,Hive将在这一串多个Map-Reduce作业的末尾额外增加一个是否满足条件的比较步骤。此步骤计算作业生成的文件的平均大小,如果小于某个阈值,则会运行自动合并。

这个合并是有代价的,它会使用集群资源,也会消耗一些时间。总耗时和使用的资源取决于生成的数据量。尽管如此,你现在做这个合并也比以后专门去合并小文件要方便,性能也可能会更好。使用这个参数主要是针对查询结果有大量的小文件(数百个或更多)生成。

这些参数会动态评估判断是否需要压缩以及压缩文件的最佳数量:

# Enable conditional compaction for map-only jobs
SET hive.merge.mapfiles = true;

# Enable conditional compaction for map-reduce jobs
SET hive.merge.mapredfiles = true;

# Target size for the compacted files – this is a target,
# not a hard limit. Leave a buffer between this number
# and 256 MB (268435456 bytes)
SET hive.merge.size.per.task = 256000000;

# Average size threshold for file compaction – the compaction
# will only execute if the average file size is smaller
# than this value
SET hive.merge.smallfiles.avgsize = 134217728;

(可左右滑动)

4.2

强制文件合并

另外一个强制文件合并的方法是指定Hive作业的Reduce数量。由于每个reducer都会生成一个文件,所以reducer的数量也就代表了最后生成的文件数量。

这样做有优点也有缺点:

1.优点:

  • 对于那些会被转换为多个Map-Reduce作业(与Map-Only相反)的查询,不需要像上面章节提到的多一些额外的判断或合并的步骤。我们只需要调整最后一个Map-Reduce作业的reduce的数量即可。

2.缺点

  • 除非你能准确知道查询结果会产生多少数据,否则你无法决定生成大小合适的文件需要多少个reducer。
  • 如果设置的reducer数量很少,会导致作业性能下降,因为每个reduce需要处理大量数据。
  • 如果查询执行之间的数据量不同,则可能很难找到reduce的最佳数量。
  • 如果查询是Map-Only查询,则需要修改查询以强制执行reduce阶段(参见下文)。

由于上述因素,只有在你至少粗略地知道查询生成的数据量时才使用此方法。如果查询结果生成的文件会非常小(小于256MB),我们只使用1个reduce也还不错。

# ONLY ONE OF THE PARAMETERS BELOW SHOULD BE USED

# Limit the maximum number of reducers
SET hive.exec.reducers.max = <number>;

# Set a fixed number of reducers
SET mapreduce.job.reduces = <number>;

(可左右滑动)

如果Hive查询是Map-Only的,则上述参数将不起作用。在这种情况下,我们可以在SQL语句后添加SORT BY 1以实现查询语句必须执行reduce。

5

合并已有的小文件

有时候,我们其实无法阻止HDFS中小文件的产生。这种时候,我们需要定期运行合并作业以控制小文件的数量。你可以将合并作业独立于你日常数据采集或生成流程之外作为单独作业,也可以直接将合并作业合并到里日常的数据采集流程中去。

将运行合并作业作为数据采集管道(ingestion pipeline)的一部分,可以更容易协调数据采集和数据合并:这样你可以确保写数到表或分区时,这个表或分区不会同时正在做数据合并的事。如果合并作业是独立于数据采集管道(ingestion pipeline)运行的,则你需要保证数据采集没运行的时候才能调度数据合并的作业(基于同一个表或者同一个分区)。

以下方法可用于对表或分区的文件合并。

5.1

Hive合并

我们可以直接使用Hive的作业来合并已有的Hive表中的小文件。这个方法其实就是使用Hive作业从一个表或分区中读取数据然后重新覆盖写入到相同的路径下。必须为合并文件的Hive作业指定一些类似上面章节提到的一些参数,以控制写入HDFS的文件的数量和大小。

合并一个非分区表的小文件方法1:

SET hive.merge.mapfiles = true;
SET hive.merge.mapredfiles = true;
SET hive.merge.size.per.task = 256000000;
SET hive.merge.smallfiles.avgsize = 134217728;

SET hive.exec.compress.output = true;
SET parquet.compression = snappy; 

INSERT OVERWRITE TABLE db_name.table_name
SELECT *
FROM db_name.table_name;

(可左右滑动)

合并一个非分区表的小文件方法:

SET mapreduce.job.reduces = <table_size_MB/256>;

SET hive.exec.compress.output = true;
SET parquet.compression = snappy;

INSERT OVERWRITE TABLE db_name.table_name
SELECT *
FROM db_name.table_name
SORT BY 1;

(可左右滑动)

合并一个表分区的小文件:

SET mapreduce.job.reduces = <table_size_MB/256>;

SET hive.exec.compress.output = true;
SET parquet.compression = snappy;

INSERT OVERWRITE TABLE db_name.table_name
PARTITION (part_col = '<part_value>')
SELECT col1, col2, ..., coln
FROM db_name.table_name
WHERE part_col = '<part_value>'
SORT BY 1;

(可左右滑动)

合并一个范围内的表分区的小文件:

SET hive.merge.mapfiles = true;
SET hive.merge.mapredfiles = true;
SET hive.merge.size.per.task = 256000000;
SET hive.merge.smallfiles.avgsize = 134217728;

SET hive.exec.compress.output = true;
SET parquet.compression = snappy;

SET hive.exec.dynamic.partition.mode = nonstrict;
SET hive.exec.dynamic.partition = true;

INSERT OVERWRITE TABLE db_name.table_name
PARTITION (part_col)
SELECT col1, col2, ..., coln, part_col
FROM db_name.table_name
WHERE part_col BETWEEN '<part_value1>' AND '<part_value2>';

(可左右滑动)

5.2

FileCrusher

使用Hive来压缩表中小文件的一个缺点是,如果表中既包含小文件又包含大文件,则必须将这些大小文件一起处理然后重新写入磁盘。如上一节所述,也即没有办法只处理表中的小文件,而保持大文件不变。

FileCrusher使用MapReduce作业来合并一个或多个目录中的小文件,而不会动大文件。它支持以下文件格式的表:

  • TEXTFILE
  • SEQUENCEFILE
  • AVRO
  • PARQUET

它还可以压缩合并后的文件,不管这些文件以前是否被压缩,从而减少占用的存储空间。默认情况下FileCrusher使用Snappy压缩输出数据。

FileCrusher不依赖于Hive,而且处理数据时不会以Hive表为单位,它直接工作在HDFS数据之上。一般需要将需要合并的目录信息以及存储的文件格式作为输入参数传递给它。

为了简化使用FileCrusher压缩Hive表,我们创建了一个“包装脚本”(wrapper script)来将Hive表的相关参数正确解析后传递给FileCrusher。

crush_partition.sh脚本将表名(也可以是分区)作为参数,并执行以下任务:

  • 在合并之前收集有关表/分区的统计信息
  • 计算传递给FileCrusher所需的信息
  • 使用必要参数执行FileCrusher
  • 在Impala中刷新表元数据,以便Impala可以查看合并后的文件
  • 合并后搜集统计信息
  • 提供合并前和合并后的摘要信息,并列出原始文件备份的目录位置

脚本的方法如下所示:

Syntax: crush_partition.sh <db_name> <table_name> <partition_spec> [compression] [threshold] [max_reduces] 

(可左右滑动)

具体参数解释如下:

db_name - (必须)表所存储的数据库名

table_name -(必须)需要合并的表名

partition_spec -(必须)需要合并的分区参数,有效值为:

  • “all” – 合并非分区表,或者合并分区表的所有分区内的文件
  • 指定分区参数,参数必须用引号引起来,例如:
    • "year=2010,state='CA'"
    • "pt_date='2016-01-01'"

compression -(可选,默认Snappy)合并后的文件写入的压缩格式,有效值为:snappy, none (for no compression), gzip, bzip2 and deflate。

threshold -(可选,默认0.5)符合文件合并条件的相对于HDFS block size的百分比阈值,必须是 (0, 1] 范围内的值。默认的0.5的意思是小于或等于HDFS block size的文件会被合并,大于50%的则会保持不变。

max_reduces -(可选,默认200)FileCrusher会被分配的最大reduce数,这个限制是为了避免在合并非常大的表时分配太多任务而占用太多资源。所以我们可以使用这个参数来平衡合并文件的速度以及它在Hadoop集群上造成的开销。

当FileCrusher运行时,它会将符合压缩条件的文件合并压缩为更大的文件,然后使用合并后的文件替换原始的小文件。合并后的文件格式为:

“crushed_file-<timestamp>-<some_numbers>”

原始文件不会被删除,它们会被移动的备份目录,备份目录的路径会在作业执行完毕后打印到终端。原始文件的绝对路径在备份目录中保持不变,因此,如果需要回滚,则很容易找出你想要拷贝回去的目录地址。例如,如果原始小文件的目录为:

/user/hive/warehouse/prod.db/user_transactions/000000_1
/user/hive/warehouse/prod.db/user_transactions/000000_2

(可左右滑动)

合并后会成为一个文件:

/user/hive/warehouse/prod.db/user_transactions/crushed_file-20161118102300-0-0

(可左右滑动)

原始文件我们会移动到备份目录,而且它之前的原始路径我们依旧会保留:

/user/admin/filecrush_backup/user/hive/warehouse/prod.db/user_transactions/000000_1
/user/admin/filecrush_backup/user/hive/warehouse/prod.db/user_transactions/000000_2

(可左右滑动)

FileCrusher的github地址:

https://github.com/asdaraujo/filecrush

本文提到的crush_partition.sh全路径为:

https://github.com/asdaraujo/filecrush/tree/master/bin

提示:代码块部分可以左右滑动查看噢

为天地立心,为生民立命,为往圣继绝学,为万世开太平。 温馨提示:如果使用电脑查看图片不清晰,可以使用手机打开文章单击文中的图片放大查看高清原图。

原文发布于微信公众号 - Hadoop实操(gh_c4c535955d0f)

原文发表时间:2018-11-18

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏喵了个咪的博客空间

[喵咪大数据]Hbase搭建和基本使用

[喵咪大数据]Hbase搭建和基本使用 ? 说完了Hive我们接着来看另外一个建立在Hadoop基础上的存储引擎HBase,HBase以内存作为缓存数据落地到H...

54590
来自专栏LhWorld哥陪你聊算法

Hive篇--相关概念和使用二

分桶表是对列值取哈希值的方式,将不同数据放到不同文件中存储。 对于hive中每一个表、分区都可以进一步进行分桶。(可以对列,也可以对表进行分桶) 由列的哈希值除...

28220
来自专栏杨建荣的学习笔记

关于修改分区表的准备和操作细则(r3笔记26天)

在之前的博文中,讨论过一个根据分区键值发现性能问题的案例。90%以上的数据都分布在了一个分区上,其它的分区要么没有数据要么数据很少,这是很明显的分区问题。当然这...

31760
来自专栏我和PYTHON有个约会

Django来敲门~第一部分【5.2.模型和数据库交互】

通常情况下,如果你只是做测试使用,可以使用Django内置的数据库SQLite就完全可以满足需要了,我们在本次教程中,通过使用MySQL这个数据库来完成后续的功...

7510
来自专栏数据和云

YH6:Oracle Sharding 知识库

简单来说,Oracle的Sharding技术就是通过分区(Partioning)技术的扩展来实现的。以前一个表的分区可以存在于不同的表空间,现在可以存在于不同的...

30570
来自专栏java系列博客

pl/sql导入excel到oracle表

24470
来自专栏Hadoop实操

Impala并发查询缓慢问题解决方案

62020
来自专栏Hadoop实操

如何在Redhat7.3的CDH5.14中启用Kerberos

在前面的文章中,Fayson介绍了《如何在Redhat7.3安装CDH5.14》,这里我们基于这个环境开始安装MIT Kerberos。前面Fayson也介绍过...

1.2K40
来自专栏大数据智能实战

太多的.hive-stagingxxx文件的处理

跑一段时间的HIVE程序之后,偶尔打开对应的HDFS文件夹,才发现在其目录下,产生了太多的.hive-staging_hive_date-time_ XXX文件...

31450
来自专栏乐沙弥的世界

Oracle 数据库实例启动关闭过程

Oracle数据库实例的启动,严格来说应该是实例的启动,数据库仅仅是在实例启动后进行装载。Oracle数据启动的过程被划分为

17940

扫码关注云+社区

领取腾讯云代金券