那天早上,数据团队收到告警,一个核心的每日增量ETL任务(负责生成DWD层交易事实宽表)已经超时2小时仍未完成。
该任务逻辑并不复杂:将ODS层流水事实表(ods_user_events
) 与一个大的维度表(dim_user_info
) 进行 LEFT JOIN
,关联键是 user_id
。之后还会关联其他几个小维度表。
异常表现如下:
Disk spill
和 OOM (Java Heap Space)
的错误日志。这些现象综合起来,几乎就是 “数据倾斜(Data Skewness)” 的典型教科书式症状。
面对这个问题,我展开了如下排查:
join
操作)的某个分区的输入数据量(Input Size)异常大,高达 几百GB,而其他分区大多只有几个GB。这确认了数据倾斜的存在。user_id
。由于生产环境数据敏感且庞大,直接查询效率低。我采用了采样分析的方法:-- 1. 检查事实表中user_id的分布
SELECT user_id, count(1) as cnt
FROM ods_user_events
WHERE dt = '20231025'
GROUP BY user_id
ORDER BY cnt DESC
LIMIT 100;
-- 2. 检查维度表中是否存在空值或默认值被大量关联
SELECT user_id, count(1) as cnt
FROM dim_user_info
WHERE dt = '20231025'
GROUP BY user_id
ORDER BY cnt DESC
LIMIT 100;
查询结果令人震惊:事实表中存在大量 user_id = 0
或 user_id = -999
(未知用户或测试数据的默认值)的记录,数量达数亿条。同时,维度表中 user_id = 0
也恰好有一条记录(是一条“未知用户”的兜底记录)。
user_id = 0/ -999
的数据。LEFT JOIN
时,所有这些数据都会去和维度表中的那一条 user_id = 0
的记录进行关联。找到根因后,解决方案就清晰了。我们的目标是打散倾斜Key,分散计算压力。
方案一:临时解决方案 - 分离倾斜数据再合并 (SQL思路)
-- 1. 创建临时视图,筛选出倾斜Key(例如0和-999)的数据单独处理
WITH skewed_fact AS (
SELECT /*+ BROADCAST(dim) */
f.*,
dim.* -- 选择需要的维度字段
FROM ods_user_events f
LEFT JOIN dim_user_info dim ON f.user_id = dim.user_id
WHERE f.user_id IN (0, -999) -- 处理倾斜Key
),
normal_fact AS (
SELECT /*+ SHUFFLE_HASH(dim) */
f.*,
dim.* -- 选择需要的维度字段
FROM ods_user_events f
LEFT JOIN dim_user_info dim ON f.user_id = dim.user_id
WHERE f.user_id NOT IN (0, -999) -- 处理正常Key
)
-- 2. 将两部分数据合并Union起来
SELECT * FROM skewed_fact
UNION ALL
SELECT * FROM normal_fact;
优点:简单直观,可以用纯SQL实现。
缺点:需要手动指定倾斜Key,不够自动化;代码冗余;如果倾斜Key很多或变化,维护成本高。
方案二:终极解决方案 - 使用Spark SALTLING(Spark高级特性)
我们最终采用了更为优雅和自动化的方式:给倾斜Key添加随机前缀(SALT),从而将其打散到多个Task中去计算。
由于纯SQL实现较为繁琐,我们选择了使用Spark DataFrame API(Scala)来实现。
// 1. 定义可能存在的数据倾斜Key(可根据前期分析配置化)
val skewedKeys = Seq(0, -999)
// 2. 对事实表(左表)添加随机Salt前缀
import org.apache.spark.sql.functions._
val saltedFactDF = odsUserEventsDF
.withColumn("salted_user_id",
when(col("user_id").isin(skewedKeys: _*),
concat(col("user_id"), lit("_"), (rand() * 5).cast("int")) // 假设我们打散成5份
).otherwise(col("user_id"))
)
// 3. 对维度表(右表)进行扩容,生成所有可能的Salt后缀对应的数据
val saltedDimDF = dimUserInfoDF
// 首先过滤出倾斜Key,并为其生成5条带不同后缀的数据
.filter(col("user_id").isin(skewedKeys: _*))
.withColumn("salted_user_id",
explode(array((0 until 5).map(lit(_)): _*)) // 生成0-4的数组并炸开
)
.withColumn("salted_user_id", concat(col("user_id"), lit("_"), col("salted_user_id")))
// 然后union上非倾斜Key的原始数据(salted_user_id就是原始的user_id)
.union(
dimUserInfoDF
.filter(!col("user_id").isin(skewedKeys: _*))
.withColumn("salted_user_id", col("user_id"))
)
// 4. 使用新的salted_user_id字段进行Join
val resultDF = saltedFactDF
.join(saltedDimDF,
saltedFactDF("salted_user_id") === saltedDimDF("salted_user_id"),
"left"
)
.drop("salted_user_id") // join完成后丢弃临时加的Salt字段
// 5. 将resultDF写入DWD层目标表
代码解释:
_0
, _1
, ..., _4
的后缀,将原本一个Key的数据随机分散到5个新Key上。0_0
, 0_1
, ..., 0_4
。0_2
的记录只会和右表里 0_2
的那条记录匹配。计算压力就从1个Task分散到了5个Task上。通过这种方式,彻底解决了单个Task负载过重的问题,任务从超时失败优化到20分钟内稳定完成。
spark.sql.shuffle.partitions
或 Executor 内存。这治标不治本,甚至可能拖垮整个集群。先分析,再优化。SALTING
、Broadcast Join
(对小表)等。本次的 SALTING
是解决重度数据倾斜的“银弹”。希望这次真实的“踩坑”和“填坑”经历能给大家带来一些启发。数仓建设之路,道阻且长,行则将至。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。