数据倾斜是在数仓开发里最常见的,且最为头疼的问题。我们也或多或少的知道数据倾斜是因为数据分配不均匀,导致部分节点要花很长时间处理大量的数据,我们也知道不管是mr还是spark,大多是在shuffle阶段出现倾斜,当然我们也知道group by和join均可能出现数据倾斜现象,而网上大多数的解决方案都建议从2方面着手处理:1.从业务方面,能否直接过滤掉导致倾斜的数据;2、从技术层面上,a.调整运行参数,b.sql优化之改用mapjoin、关联或聚合key增加随机数、将join改成union all等
首先数据倾斜大多数场景是从业务层面无法过滤掉的,其次调整运行参数只是治标不能治本,只能起缓解作用,最后大部分情况下关联的表比较少符合mapjoin场景,因此大多数情况下还得从sql本身着手优化。
而在我们日常工作中最常遇到的就是group by倾斜和大表join倾斜,因此本文就对这两种情况进行具体优化过程进行分析,而在正式开始案例之前,有必要讲一下优化原则:尽量让shuffle阶段的数据保持量少且均匀。
group by倾斜相对简单,只需进行二次group by即可
下面这段sql在某一天开始之后就一直执行失败,就猜测会不会是device_id出现了数据倾斜
--统计每个device_id的当天首条日志和末条日志的账号等维度
select
device_id
,max(if(first_row=last_row,vuserid,'')) vuserid
,max(if(first_row=1,vuserid,'')) first_vuserid
from
( select
device_id
,vuserid
,row_number() over (partition by device_id order by first_act_date,first_ctime) first_row
,count(1) over (partition by device_id) last_row
from
dws_xxx_dau_df
where
imp_date = 20221204
)a2
group by
device_id
通过直接查device_id在表的数据情况,分析发现row_number()这里跑不过,原因就是分组的key(device_id)过大导致,最大的单个device_id的pv去到了253万,排第二的在51万,超过10000的不到100条,因此要将这100多个device_id的记录数都减少到10000条以下:
select
device_id
,count(1) pv
from
dws_xxx_dau_df
where
imp_date = 20221204
group by
device_id
order by
pv desc limit 1000
只需进行二次group by,举一个简单例子:如下图,第一次group by时aaa从4条变成2条,bbb从2条到1条,而对于ccc、ddd、eee增加一个随机数并不影响,接着再进行第二次group by,经过这样处理就能避免倾斜了。回到案例,这里需要增加一个随机数字段(0~200)先做一次group by,这样第一次group by限制最高key对应的记录数在10000条左右,第二次group by每个key对应的记录数就只会在200条以内,这样就能完美避开倾斜问题了,但是需要注意的是,这里随机数给多少需要根据实际数据情况进行分析:
select
device_id
,max(if(first_row=last_row,vuserid,'')) vuserid
,max(if(first_row=1,vuserid,'')) first_vuserid
from
( select
device_id
,vuserid
,first_vuserid
,row_number() over (partition by device_id order by first_act_date,first_ctime) first_row
,count(1) over (partition by device_id) last_row --再根据device_id group by第二次
from
( select
device_id
,rand_num
,max(if(first_row=last_row,vuserid,'')) vuserid
,max(if(first_row=1,vuserid,'')) first_vuserid
from
( select
device_id
,vuserid
,rand_num
,row_number() over (partition by device_id,rand_num order by first_act_date,first_ctime) first_row
,count(1) over (partition by device_id,rand_num) last_row --增加rand_num一起先group by一次
from
( select
device_id
,vuserid
,first_act_date
,first_ctime
,cast(rand()*200 as bigint) rand_num --增加一个200以内随机数
from
dws_xxx_dau_df
where
imp_date = 20221204
)a1
)a2
group by
device_id
,rand_num
)a3
)a4
group by
device_id
该任务从优化前的60~80分钟到优化后的10~20分钟,时效提升了4~6倍:
大表优化最好的办法就是能知道关联的key是啥,然后对出现倾斜的key进行打散,从而避免倾斜。
有一个实验指标表,包含了播放、页面访问、元素曝光点击等事件的表(30亿)需要关联一个全量维度表(500万),运行时间过长(在2~3小时),另外由于该表用在实验上是非常很重要,为避免影响SLA,因此需要进行优化。
select
a.qimei36 qimei36
,a.biz_id biz_id
,a.vid vid
,a.cid cid
,b.el_type_name el_type_name
,b.el_type_name_2 el_type_name_2
,b.el_type_name_3 el_type_name_3
from
( select --30亿记录
qimei36
,biz_id
,vid
,cid
,sum(play_cnt) play_cnt
,sum(page_cnt) page_cnt
from
dws_xxx_view_play_page_ele_imp_clck_aggr --播放、页面、元素曝光、元素点击聚合虚拟表
group by
qimei36
,biz_id
,vid
,cid
)a
left join
( select --500万记录
pg_cid cid
,el_type_name
,el_type_name_2
,el_type_name_3
from
dim_cid_content_df
)b
on
a.cid=b.cid
猜测很大可能出现数据倾斜了,该案例里面有group by又有join,因此需要进一步分析来确定倾斜出现的位置:
(1)去掉join 操作,仅仅执行group by操作落表,执行时间在20分钟内,也就说join操作用了80%的运行时间,因此可能出现倾斜的是在join操作上,需进一步分析
(2)分别分析left join的左表(下面用A表标志),右表(下面用B表标志)的数据情况,先看两个表总数据量级(A表30亿,B表500万),属于大表join大表操作,再分别用关联key进行group by 取top1000条数据进行分析:A表关联key出现很大的悬殊(最大的cid有接近3亿,前3条cid占比总记录数接近20%),B表关联key唯一,如下图:
(3)从执行日志分析,也能看出是join操作出现了数据倾斜,如下图所示,join操作用了1.8小时,占了80%+的运行时长:
由于B表是历史全量cid,而A表为当天日志,因此需要先分析A表的cid占据B表全量表的数量,经过直接关联发现只关联上维表10万cid,因此我解决思路如下:
(1)尽量减少join操作的记录量,因此可以先将10万关联得上的cid存储到临时表,并且增加一个统计每个cid的pv和row_num指标,用于判断是否倾斜的cid,这里设定单个cid超过100万行记录的cid为倾斜的cid。
create table tmp_table_C as
select --最后只剩10万左右用户
pg_cid
,pv
,el_type_name
,el_type_name_2
,el_type_name_3
,ceil(pv/1000000) row_num --控制最大数量在100万以下,又可以保证进行扩散的key不那么多
from
( select
pg_cid
,sum(pv) pv
,min(pv) min_pv
,max(el_type_name) el_type_name
,max(el_type_name_2) el_type_name_2
,max(el_type_name_3) el_type_name_3
from
( select
1 pv
,pg_cid
,null el_type_name
,null el_type_name_2
,null el_type_name_3
from
dws_xxx_view_play_page_ele_imp_clck_aggr --播放、页面、元素曝光、元素点击聚合虚拟表
union all
select
0 pv
,pg_cid
,el_type_name
,el_type_name_2
,el_type_name_3
from
dim_cid_content_df
)a
group by
pg_cid
)a
where
pv>0 -- pv>0 说明在日志表出现
and min_pv=0 -- min_pv=0 说明在维表中出现
(2)再将倾斜的cid进行随机打散关联,这里打散的的程度根据当前cid的数据倾斜程度进行动态分配,从而解决数据倾斜问题:
--先将需要扩散的cid,以及标志扩散程度的row_num提取出来
--这里row_num的值是:如果 pv<100w行,row_num=1,如果cid='asdfg'的pv=200万行,row_num=2,也就说在拼接成关联字段时,该cid会变成两个cid进行关联:'asdfg#0'和'asdfg#1'
sql = """
select
pg_cid
,row_num
from
%(table)s_%(imp_date)s
where
row_num>1
"""%{"table": self.table_list[0]
, "imp_date": self.datetime}
self.log(sql)
res = self.tdw.execute(sql)
if(len(res)>=1):
in_sql=""
key_sql=""
for index in range(len(res)):
res_a = res[index].split("\t")[0]
res_b = res[index].split("\t")[1]
if len(res_a)>0 :
in_sql = in_sql + res_a + ":" + res_b + ","
key_sql += "'%s',"%(res_a)
insql = "'" + in_sql + "'" --例子: 'mzc00200xh9313v:13,mzc002003q7sks5:2,mzc0038zhzf6h5t:2,mzc00200aakg0rw:4,'
keysql = key_sql[:-1] --例子:'mzc00200xh9313v','mzc002003q7sks5','mzc0038zhzf6h5t','mzc00200aakg0rw'
return insql,keysql
--插入表
insert overwrite table %(table)s partition(imp_date=%(imp_date)s)
select
a.qimei36 qimei36
,a.biz_id biz_id
,a.vid vid
,a.cid cid
,b.el_type_name el_type_name
,b.el_type_name_2 el_type_name_2
,b.el_type_name_3 el_type_name_3
from
( select --30亿记录
qimei36
,biz_id
,vid
,cid
,case
when coalesce(pg_cid, '') = '' then floor(rand()*10000) --如果是null,给随机数
when pg_cid in (%(keysql)s) then concat(pg_cid,'#',floor(rand()*cast(split(split(%(insql)s,concat(pg_cid,':'))[1],',')[0] as bigint))) --如果是极大值,将key给随机数进行打散处理
else pg_cid
end join_key
,sum(play_cnt) play_cnt
,sum(page_cnt) page_cnt
from
view_play_page_ele_imp_clck_aggr --播放、页面、元素曝光、元素点击聚合虚拟表
group by
qimei36
,biz_id
,vid
,cid
)a
left join
( select
concat(pg_cid,'#',id_num) cid
,pv
,el_type_name
,el_type_name_2
,el_type_name_3
from
( select
pg_cid
,pv
,el_type_name
,el_type_name_2
,el_type_name_3
,row_num
,1 id
from
tmp_table_C
where
row_num > 1
)a
join
(select * from dim_id_num_0_to_9999_f where id_num<1000) b --进行将维表的cid扩散成id_num倍(id_num=2就扩展成2倍,id_num=100就扩展成100倍)
on
a.id=b.id
where
b.id_num < a.row_num
union all
select
pg_cid cid
,pv
,el_type_name
,el_type_name_2
,el_type_name_3
from
tmp_table_C
where
row_num = 1 -- pv<100万行的数据不用扩散,只保留1行就行
)b
on
a.cid=b.cid
其中有几个点需要说明一下:
a、insql和key_sql的作用是将在A表出现倾斜的cid进行随机打散,通过传入常数参数的方式进行处理,下图为真正执行的情况
b、A表的cid后面随机加上0~row_num-1的数字,B表对应的cid扩散到row_num行,其中前面也是cid后面拼接上0~row_num-1的数字,因此关联的时候,不管A表随机变成啥,在B表都能找到关联,从而解决倾斜的同时也不会导致关联数据量变少,如下图,对aaa发散4倍,bbb发散2倍,ccc、ddd和eee不作发散关联,此时左边关联最大key记录数为4,右边关联最大key记录数为2,从而解决了数据倾斜问题:
c、dim_id_num_0_to_9999_f 是对维表cid发散处理的维表,本质上是做了笛卡尔积,所以只需拿需要发散的100多条cid进行row_num倍发散,其他不做处理,避免对大量数据进行无意义的笛卡尔积操作
该任务从优化前的2~3小时到优化后的40~50分钟,时效提升了2~4倍:
本文只是简单列举了两种比较常见的解决数据倾斜的sql优化案例,但是实际开发过程中还会有遇到其他情况,因此需要特殊情况进行特殊处理,而且有些时候需要多种解决方案同时作用才能比较好的解决数据倾斜问题,但是万变不离其宗,因此我认为自我思考并且进行分析的过程才是最重要的,本文列举的两个案例都有详细分析过程,而且解决的方向都是让一次shuffle的记录数尽量不要过大,尽量保持更均匀的原则,因此只要能让你的数据保持均匀,数据倾斜就会得到解决了。
如果想要更详细了解数据倾斜的原理可以自行搜索引擎里,希望本文对于想要解决自身数据倾斜业务的同学有一个实际参考作用。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。