专栏首页SAMshareBigData | 一文带你搞清楚"数据倾斜"(上)

BigData | 一文带你搞清楚"数据倾斜"(上)

Index

  • 什么是数据倾斜
  • 数据倾斜的原因
  • Hadoop计算框架的特点
  • 优化的常用手段
  • 优化案例

什么是数据倾斜

我们在用hive取数的时候,有的时候只是跑一个简单的join语句,但是却跑了很长的时间,有的时候我们会觉得是集群资源不够导致的,但是很大情况下就是出现了"数据倾斜"的情况。

在了解数据倾斜之前,我们应该有一个常识,就是现实生活中的数据分布是不均匀的,俗话说"28定理",80%的财富集中在20%的人手中之类的故事相信大家都看得不少。所以,在我们日常处理的现实数据中,也是符合这种数据分布的,数据倾斜一般有两种情况:

  • 变量值很少: 单个变量值的占比极大,常见的字段如性别、学历、年龄等。
  • 变量值很多: 单个变量值的占比极小,常见的字段如收入、订单金额之类的。

数据倾斜,在MapReduce编程模型中十分常见,就是大量的相同key被partition分配到一个分区里,造成了"一个人累死,其他人闲死"的情况,这违背了并行计算的初衷,整体的效率是十分低下的。

数据倾斜的原因

当我们看任务进度长时间维持在99%(或100%),查看任务监控页面就会发现只有少量(1个或几个)reduce子任务未完成。因为其处理的数据量和其他reduce差异过大,这就是数据倾斜的直接表现。

而导致这个的原因,大致可以分为下面几点:

  • key分布不均匀
  • 业务数据本身的特性
  • 建表时考虑不周
  • 某些SQL语句本身就有数据倾斜

具体可以体现在下面的常见操作:

备注:图片文字内容来自网络

Hadoop计算框架的特点

在了解如何避免数据倾斜之前,我们先来看看Hadoop框架的特性:

  • 大数据量不是大问题,数据倾斜才是大问题;
  • jobs数比较多的作业效率相对比较低,比如即使有几百万的表,如果多次关联多次汇总,产生十几个jobs,耗时很长。原因是map reduce作业初始化的时间是比较长的;
  • sum,count,max,min等UDAF(User Defined Aggregate Function:自定义函数),不怕数据倾斜问题,hadoop在map端的汇总并优化,使数据倾斜不成问题;
  • count(distinct),在数据量大的情况下,效率较低,如果是多count(distinct)效率更低,因为count(distinct)是按group by字段分组,按distinct字段排序,一般这种分布式是很倾斜的,比如男uv,女uv,淘宝一天30亿的pv,如果按性别分组,分配2个reduce,每个reduce处理15亿数据。

优化的常用手段

说的是优化手段,但更多的是"踩坑"的经验之谈。

优化之道:

  • 首先要了解数据分布,自己动手解决数据倾斜问题是个不错的选择;
  • 增加jvm(Java Virtual Machine:Java虚拟机)内存,这适用于变量值非常少的情况,这种情况下,往往只能通过硬件的手段来进行调优,增加jvm内存可以显著的提高运行效率;
  • 增加reduce的个数,这适用于变量值非常多的情况,这种情况下最容易造成的结果就是大量相同key被partition到一个分区,从而一个reduce执行了大量的工作;
  • 重新设计key,有一种方案是在map阶段时给key加上一个随机数,有了随机数的key就不会被大量的分配到同一节点(小几率),待到reduce后再把随机数去掉即可;
  • 使用combiner合并。combinner是在map阶段,reduce之前的一个中间阶段,在这个阶段可以选择性的把大量的相同key数据先进行一个合并,可以看做是local reduce,然后再交给reduce来处理,减轻了map端向reduce端发送的数据量(减轻了网络带宽),也减轻了map端和reduce端中间的shuffle阶段的数据拉取数量(本地化磁盘IO速率);(hive.map.aggr=true)
  • 设置合理的map reduce的task数,能有效提升性能。(比如,10w+级别的计算,用160个reduce,那是相当的浪费,1个足够);
  • 数据量较大的情况下,慎用count(distinct),count(distinct)容易产生倾斜问题;
  • hive.groupby.skewindata=true 有数据倾斜的时候进行负载均衡,当选项设定为 true,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。

SQL语句调节:

  • 如何Join: 关于驱动表的选取,选用join key分布最均匀的表作为驱动表; 做好列裁剪和filter操作,以达到两表做join的时候,数据量相对变小的效果。
  • 大小表Join: 使用map join让小的维度表(1000条以下的记录条数) 先进内存。在map端完成reduce。
  • 大表Join大表: 把空值的key变成一个字符串加上随机数,把倾斜的数据分到不同的reduce上,由于null值关联不上,处理后并不影响最终结果。
  • count distinct大量相同特殊值: count distinct时,将值为空的情况单独处理,如果是计算count distinct,可以不用处理,直接过滤,在最后结果中加1。如果还有其他计算,需要进行group by,可以先将值为空的记录单独处理,再和其他计算结果进行union。
  • group by维度过小: 采用sum() group by的方式来替换count(distinct)完成计算。
  • 特殊情况特殊处理: 在业务逻辑优化效果的不大情况下,有些时候是可以将倾斜的数据单独拿出来处理。最后union回去。

看完上面的经验总结还是有点懵逼?说实话我也是的,太多的信息量冲击,不过我们可以收藏起来以后继续看多几次,加深印象。

接下来,我们将从一些具体的案例来讲讲SQL语句优化的技巧,非常常用,对我们日常写SQL很有帮助。

? 优化案例

场景1:RAC常用(real application clusters的缩写,译为“实时应用集群”)

有一张user表,为卖家每天收入表,user_id,dt(日期)为key,属性有主营类目(cat),指标有交易金额,交易比数。每天要取每个user的主营类目在过去10天的总收入,总比数。

常规做法:取每个user_id最近一天的主营类目,存入临时表t1,汇总过去10天的总交易金额,交易比数,存入临时表t2,连接t1,t2,得到最终的结果。

优化做法:

SELECT user_id
  , substr(MAX(concat(dt, cat)), 9) AS main_cat
  , SUM(qty), SUM(amt)
FROM users
WHERE dt BETWEEN 20101201 AND 20101210
GROUP BY user_id;

场景2:空值产生的数据倾斜(最常见的现象)

日志中,常会有信息丢失的问题,比如全网日志中的 user_id,如果取其中的 user_id 和 bmw_users 关联,会碰到数据倾斜的问题。

解决方式1:user_id为空的不参与关联

SELECT *
FROM log a
  JOIN bmw_users b
  ON a.user_id IS NOT NULL
      AND a.user_id = b.user_id
UNION ALL
SELECT *
FROM log a
WHERE a.user_id IS NULL;

解决方式2:赋与空值分新的key值

SELECT *
FROM log a
  LEFT JOIN bmw_users b ON CASE
          WHEN a.user_id IS NULL THEN concat(‘dp_hive’, rand())
          ELSE a.user_id
      END = b.user_id;

结论:方法2比方法1效率更好,不但io少了,而且作业数也少了。解决方法1 log表被读取了两次,jobs是2。这个优化适合无效 id (比如 -99 , ’’, null 等) 产生的倾斜问题。把空值的 key 变成一个字符串加上随机数,就能把倾斜的数据分到不同的reduce上 ,解决数据倾斜问题。

场景3:不同数据类型关联产生数据倾斜

一张表 s8_log,每个商品一条记录,要和商品表关联。但关联却碰到倾斜的问题。s8_log 中有字符串商品 id,也有数字的商品 id。字符串商品 id 类型是 string 的,但商品中的数字 id 是 bigint 的。问题的原因是把 s8_log 的商品 id 转成数字 id 做 Hash(数字的 Hash 值为其本身,相同的字符串的 Hash 也不同)来分配 Reducer,所以相同字符串 id 的 s8_log,都到一个 Reducer 上了。

解决方式:把数字类型转换成字符串类型

SELECT *
FROM s8_log a
  LEFT JOIN r_auction_auctions b ON a.auction_id = CAST(b.auction_id AS string);

场景4:多表 union all 会优化成一个 job

推广效果表要和商品表关联,效果表中的 auction id 列既有商品 id,也有数字 id,和商品表关联得到商品的信息。

SELECT *
FROM effect a
  JOIN (
      SELECT auction_id AS auction_id
      FROM auctions
      UNION ALL
      SELECT auction_string_id AS auction_id
      FROM auctions
  ) b
  ON a.auction_id = b.auction_id;

结论:这样子比分别过滤数字 id,字符串 id ,然后分别和商品表关联性能要好。这样写的好处:1个 MR 作业,商品表只读取一次,推广效果表只读取一次。把这个 sql 换成 MR 代码的话,map 的时候,把 a 表的记录打上标签 a ,商品表记录每读取一条,打上标签 t,变成两个<key,value> 对,<t,数字id,value>,<t,字符串id,value>。所以商品表的 HDFS(Hadoop Distributed File System) 读只会是一次。

场景5:消灭子查询内的 group by

原写法:

SELECT *
FROM (
  SELECT *
  FROM t1
  GROUP BY c1, c2, c3
  UNION ALL
  SELECT *
  FROM t2
  GROUP BY c1, c2, c3
) t3
GROUP BY c1, c2, c3;

优化写法:

SELECT *
FROM (
  SELECT *
  FROM t1
  UNION ALL
  SELECT *
  FROM t2
) t3
GROUP BY c1, c2, c3;

结论:从业务逻辑上说,子查询内的 group by 功能与外层的 group by 重复,除非子查询内有 count(distinct)。经过测试,并未出现 union all 的 hive bug,数据是一致的。MR 的作业数由3减少到1。t1 相当于一个目录,t2 相当于一个目录,对map reduce程序来说,t1,t2 可以做为 map reduce 作业的 mutli inputs。这可以通过一个 map reduce 来解决这个问题。Hadoop的计算框架,不怕数据多,怕作业数多。

References

  • 百度百科
  • Hive优化案例(很好):https://blog.csdn.net/u011500419/article/details/90266428
  • 数据倾斜是什么以及造成的原因?:https://blog.csdn.net/wyz0516071128/article/details/80997158

本文分享自微信公众号 - SAMshare(gh_8528ce7b7e80)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-05-27

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 通俗易懂的生产环境Web应用架构介绍

    看见一篇非常通俗易懂且适合新手阅读的Web应用架构文章,我将其手工翻译了出来,分享给大家。

    Rude3Knife的公众号
  • 如何连接别人的mysql数据库

    今天有个做数据库的前同事,在群里发了自己的数据库,并把主机,端口,用户名,密码一并发了出来,然而,我尝试着去连接访问。

    阮键
  • Airflow使用指南一 安装与启动

    数据库用户名与密码均为root,airflow使用的数据库为airflow.使用如下命令创建对应的数据库:

    smartsi
  • Flink1.7发布中的新功能

    Apache Flink 社区正式宣布 Apache Flink 1.7.0 发布。最新版本包括解决了420多个问题以及令人兴奋的新增功能,我们将在本文进行描述...

    smartsi
  • 案例-ClickHouse在头条的技术演进

    ClickHouse 是由号称“俄罗斯 Google”的 Yandex 开发而来,在 2016 年开源,在计算引擎里算是一个后起之秀,在内存数据库领域号称是最快...

    smartsi
  • MySQL 配置远程登录

    修改/etc/mysql/mysql.conf.d目录下的mysqld.cnf配置文件:

    smartsi
  • 如何把转入成功的XXX.sql导入到自己的数据库里

    1.新建自己的mysql连接,mysql连接名随便起,如cxf 密码尽量写123456或者root,防止忘记。按照图示右键(如果想在已有的mysql连接基础上...

    阮键
  • JAVA框架中XML文件

    我们应该在开头写返回结果 resultMap id="自己起的名字" type="返回的结果类型,此处为Department实体类"

    阮键
  • Flink1.5发布中的新功能

    Flink 1.5.0 是 1.x.y 系列的第六个主要版本。与往常一样,它兼容之前 1.x.y 版本中使用 @Public 注解标注过的 API。

    smartsi
  • 完整Demo:springboot实现多数据源配置

    公司有一套人脸识别动态布控系统,该系统有两个子系统组成,识别算法采用C++编写,后台管理系统采用Java编写,C程序提供HTTP接口供Java程序调用,两个程序...

    架构师小跟班

扫码关注云+社区

领取腾讯云代金券