我们在用hive取数的时候,有的时候只是跑一个简单的join语句,但是却跑了很长的时间,有的时候我们会觉得是集群资源不够导致的,但是很大情况下就是出现了"数据倾斜"的情况。
在了解数据倾斜之前,我们应该有一个常识,就是现实生活中的数据分布是不均匀的,俗话说"28定理",80%的财富集中在20%的人手中之类的故事相信大家都看得不少。所以,在我们日常处理的现实数据中,也是符合这种数据分布的,数据倾斜一般有两种情况:
数据倾斜,在MapReduce编程模型中十分常见,就是大量的相同key被partition分配到一个分区里,造成了"一个人累死,其他人闲死"的情况,这违背了并行计算的初衷,整体的效率是十分低下的。
当我们看任务进度长时间维持在99%(或100%),查看任务监控页面就会发现只有少量(1个或几个)reduce子任务未完成。因为其处理的数据量和其他reduce差异过大,这就是数据倾斜的直接表现。
而导致这个的原因,大致可以分为下面几点:
具体可以体现在下面的常见操作:
备注:图片文字内容来自网络
在了解如何避免数据倾斜之前,我们先来看看Hadoop框架的特性:
说的是优化手段,但更多的是"踩坑"的经验之谈。
优化之道:
SQL语句调节:
看完上面的经验总结还是有点懵逼?说实话我也是的,太多的信息量冲击,不过我们可以收藏起来以后继续看多几次,加深印象。
接下来,我们将从一些具体的案例来讲讲SQL语句优化的技巧,非常常用,对我们日常写SQL很有帮助。
场景1:RAC常用(real application clusters的缩写,译为“实时应用集群”)
有一张user表,为卖家每天收入表,user_id,dt(日期)为key,属性有主营类目(cat),指标有交易金额,交易比数。每天要取每个user的主营类目在过去10天的总收入,总比数。
常规做法:取每个user_id最近一天的主营类目,存入临时表t1,汇总过去10天的总交易金额,交易比数,存入临时表t2,连接t1,t2,得到最终的结果。
优化做法:
SELECT user_id
, substr(MAX(concat(dt, cat)), 9) AS main_cat
, SUM(qty), SUM(amt)
FROM users
WHERE dt BETWEEN 20101201 AND 20101210
GROUP BY user_id;
场景2:空值产生的数据倾斜(最常见的现象)
日志中,常会有信息丢失的问题,比如全网日志中的 user_id,如果取其中的 user_id 和 bmw_users 关联,会碰到数据倾斜的问题。
解决方式1:user_id为空的不参与关联
SELECT *
FROM log a
JOIN bmw_users b
ON a.user_id IS NOT NULL
AND a.user_id = b.user_id
UNION ALL
SELECT *
FROM log a
WHERE a.user_id IS NULL;
解决方式2:赋与空值分新的key值
SELECT *
FROM log a
LEFT JOIN bmw_users b ON CASE
WHEN a.user_id IS NULL THEN concat(‘dp_hive’, rand())
ELSE a.user_id
END = b.user_id;
结论:方法2比方法1效率更好,不但io少了,而且作业数也少了。解决方法1 log表被读取了两次,jobs是2。这个优化适合无效 id (比如 -99 , ’’, null 等) 产生的倾斜问题。把空值的 key 变成一个字符串加上随机数,就能把倾斜的数据分到不同的reduce上 ,解决数据倾斜问题。
场景3:不同数据类型关联产生数据倾斜
一张表 s8_log,每个商品一条记录,要和商品表关联。但关联却碰到倾斜的问题。s8_log 中有字符串商品 id,也有数字的商品 id。字符串商品 id 类型是 string 的,但商品中的数字 id 是 bigint 的。问题的原因是把 s8_log 的商品 id 转成数字 id 做 Hash(数字的 Hash 值为其本身,相同的字符串的 Hash 也不同)来分配 Reducer,所以相同字符串 id 的 s8_log,都到一个 Reducer 上了。
解决方式:把数字类型转换成字符串类型
SELECT *
FROM s8_log a
LEFT JOIN r_auction_auctions b ON a.auction_id = CAST(b.auction_id AS string);
场景4:多表 union all 会优化成一个 job
推广效果表要和商品表关联,效果表中的 auction id 列既有商品 id,也有数字 id,和商品表关联得到商品的信息。
SELECT *
FROM effect a
JOIN (
SELECT auction_id AS auction_id
FROM auctions
UNION ALL
SELECT auction_string_id AS auction_id
FROM auctions
) b
ON a.auction_id = b.auction_id;
结论:这样子比分别过滤数字 id,字符串 id ,然后分别和商品表关联性能要好。这样写的好处:1个 MR 作业,商品表只读取一次,推广效果表只读取一次。把这个 sql 换成 MR 代码的话,map 的时候,把 a 表的记录打上标签 a ,商品表记录每读取一条,打上标签 t,变成两个<key,value> 对,<t,数字id,value>,<t,字符串id,value>。所以商品表的 HDFS(Hadoop Distributed File System) 读只会是一次。
场景5:消灭子查询内的 group by
原写法:
SELECT *
FROM (
SELECT *
FROM t1
GROUP BY c1, c2, c3
UNION ALL
SELECT *
FROM t2
GROUP BY c1, c2, c3
) t3
GROUP BY c1, c2, c3;
优化写法:
SELECT *
FROM (
SELECT *
FROM t1
UNION ALL
SELECT *
FROM t2
) t3
GROUP BY c1, c2, c3;
结论:从业务逻辑上说,子查询内的 group by 功能与外层的 group by 重复,除非子查询内有 count(distinct)。经过测试,并未出现 union all 的 hive bug,数据是一致的。MR 的作业数由3减少到1。t1 相当于一个目录,t2 相当于一个目录,对map reduce程序来说,t1,t2 可以做为 map reduce 作业的 mutli inputs。这可以通过一个 map reduce 来解决这个问题。Hadoop的计算框架,不怕数据多,怕作业数多。