前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >hive 插入parquet二级分区表数据倾斜优化

hive 插入parquet二级分区表数据倾斜优化

作者头像
YG
发布2018-10-22 15:25:27
2.3K0
发布2018-10-22 15:25:27
举报
文章被收录于专栏:YG小书屋YG小书屋

单个表每天数据有50亿左右。需用二级分区优化该表。

1、最初查询

代码语言:javascript
复制
insert into table xx_parquet_v2 PARTITION(dt, uiappid) select %s from xxx where dt= %s;

错误: Java Heap Space。或者GC overhead limit exceeded。 原因: Parquet和ORC是列式批处理文件格式。这些格式要求在写入文件之前将批次的行(batches of rows)缓存在内存中。在执行INSERT语句时,动态分区目前的实现是:至少为每个动态分区目录打开一个文件写入器(file writer)。由于这些缓冲区是按分区维护的,因此在运行时所需的内存量随着分区数量的增加而增加。所以经常会导致mappers或reducers的OOM,具体取决于打开的文件写入器(file writer)的数量。

通过INSERT语句插入数据到动态分区表中,也可能会超过HDFS同时打开文件数的限制。

如果没有join或聚合,INSERT ... SELECT语句会被转换为只有map任务的作业。mapper任务会读取输入记录然后将它们发送到目标分区目录。在这种情况下,每个mapper必须为遇到的每个动态分区创建一个新的文件写入器(file writer)。mapper在运行时所需的内存量随着它遇到的分区数量的增加而增加。

详细原因:https://blog.csdn.net/frank_jyp/article/details/81780821

2、第一次修改

set hive.optimize.sort.dynamic.partition = true,从新跑上述语句。

通过这个优化,这个只有map任务的mapreduce会引入reduce过程,这样动态分区的那个字段比如日期在传到reducer时会被排序。由于分区字段是排序的,因此每个reducer只需要保持一个文件写入器(file writer)随时处于打开状态,在收到来自特定分区的所有行后,关闭记录写入器(record writer),从而减小内存压力。这种优化方式在写parquet文件时使用的内存要相对少一些,但代价是要对分区字段进行排序。

但reduce阶段一直卡在99%,判断是uiappid数据倾斜导致。验证数据倾斜:

代码语言:javascript
复制
# 找出uiappid条数大于1亿条的uiappid
select uiappid, count(*) as t from xxx where dt=%s group by uiappid having t>100000000;  

然后你会发现跑得特别慢。开启map group优化(Map端部分聚合,相当于Combiner):

代码语言:javascript
复制
hive.map.aggr=true

设置上述参数即可。若是其他情况的group优化,可参考hive.groupby.skewindata参数。

代码语言:javascript
复制
hive.groupby.skewindata=true

有数据倾斜的时候进行负载均衡,当hive.groupby.skewindata设定为 true,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。

3、第二次修改

分两步: 1、第一步:找出条数大于1亿的uiappid后,select时过滤调这些大的uiappid。通过这个优化过,reduce阶段单个key的数据都不超过1亿条,可以快速得到结果。

代码语言:javascript
复制
set hive.optimize.sort.dynamic.partition = true;
insert into table xx_parquet_v2 PARTITION(dt='%s', uiappid) select %s from xxx where dt= %s and uiappid not in ('a','b');

2、第二步:再次将uiappid条数大于1亿的数据插入表中。因为大于1亿条的uiappid比较少,可以为每个mapper遇到的分区创建一个文件写入器(file writer)。

代码语言:javascript
复制
insert into table xx_parquet_v2 PARTITION(dt='%s', uiappid) select %s from xxx where dt= %s and uiappid in ('a','b');

4、其他的配置

代码语言:javascript
复制
set mapreduce.map.memory.mb=6144;
set mapreduce.map.java.opts=-Xmx4096m;  # map 内存配置
set mapreduce.input.fileinputformat.split.maxsize=1024000000;
set mapreduce.input.fileinputformat.split.minsize=1024000000;
set mapred.max.split.size=1024000000;
set mapred.min.split.size.per.node=1024000000;
set mapred.min.split.size.per.rack=1024000000;  # map文件大小配置
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
set parquet.memory.min.chunk.size=100000; # parquet文件格式配置
set hive.exec.dynamic.partition.mode=nonstrict; #配置动态分区
set mapreduce.reduce.memory.mb=8192;
set mapreduce.reduce.java.opts=-Xmx6144m; # 配置reduce内存限制
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018.10.16 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1、最初查询
  • 2、第一次修改
  • 3、第二次修改
  • 4、其他的配置
相关产品与服务
负载均衡
负载均衡(Cloud Load Balancer,CLB)提供安全快捷的流量分发服务,访问流量经由 CLB 可以自动分配到云中的多台后端服务器上,扩展系统的服务能力并消除单点故障。负载均衡支持亿级连接和千万级并发,可轻松应对大流量访问,满足业务需求。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档