首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >记一次重度数据倾斜的排查与解决:Spark SQL 中 Skewed Join 的致命陷阱

记一次重度数据倾斜的排查与解决:Spark SQL 中 Skewed Join 的致命陷阱

原创
作者头像
大王叫我来巡山、
发布2025-08-25 17:58:19
发布2025-08-25 17:58:19
4310
举报
技术环境
  • 计算引擎: Spark 3.1.2 (运行在 YARN 集群上)
  • 资源调度: YARN
  • 数据仓库层: ODS -> DWD (本次问题发生在 DWD 层的事实表与维度表关联环节)
  • 编程语言: SQL (通过 Spark Thrift Server 提交) & Scala (用于最终解决方案)
  • 数据量级: 事实表 ~ 2TB (当日增量约 10亿+ 条),维度表 ~ 200GB (约 5000万+ 条)

一、Bug 现象

那天早上,数据团队收到告警,一个核心的每日增量ETL任务(负责生成DWD层交易事实宽表)已经超时2小时仍未完成。

该任务逻辑并不复杂:将ODS层流水事实表(ods_user_events) 与一个大的维度表(dim_user_info) 进行 LEFT JOIN,关联键是 user_id。之后还会关联其他几个小维度表。

异常表现如下:

  1. 任务运行极慢:在Spark UI上观察,大部分Task已在30分钟内完成,但总有最后几个Task一直处于Running状态,耗时远超其他Task(>2小时)。
  2. 资源利用不均:观察Executor情况,大部分Executor很快空闲,但总有一两个Executor持续高负荷运行,GC频繁,网络流量巨大。
  3. 数据溢出:在失败的某次尝试中,甚至出现了 Disk spillOOM (Java Heap Space) 的错误日志。
  4. 最终结果不正确:任务成功后(在调整参数勉强跑通后),发现最终表的数据量远大于预期,出现了严重的重复数据。

这些现象综合起来,几乎就是 “数据倾斜(Data Skewness)” 的典型教科书式症状。


二、排查步骤

面对这个问题,我展开了如下排查:

  1. 定位慢Task:首先打开Spark UI的Stages页,找到那个迟迟无法完成的Stage。发现其最后一个环节(join操作)的某个分区的输入数据量(Input Size)异常大,高达 几百GB,而其他分区大多只有几个GB。这确认了数据倾斜的存在。
  2. 识别倾斜Key:问题的核心是找到导致倾斜的 user_id。由于生产环境数据敏感且庞大,直接查询效率低。我采用了采样分析的方法:
代码语言:sql
复制
-- 1. 检查事实表中user_id的分布
SELECT user_id, count(1) as cnt
FROM ods_user_events
WHERE dt = '20231025'
GROUP BY user_id
ORDER BY cnt DESC
LIMIT 100;

-- 2. 检查维度表中是否存在空值或默认值被大量关联
SELECT user_id, count(1) as cnt
FROM dim_user_info
WHERE dt = '20231025'
GROUP BY user_id
ORDER BY cnt DESC
LIMIT 100;

查询结果令人震惊:事实表中存在大量 user_id = 0user_id = -999(未知用户或测试数据的默认值)的记录,数量达数亿条。同时,维度表中 user_id = 0 也恰好有一条记录(是一条“未知用户”的兜底记录)。

  1. 根因分析
    • 事实表中数亿条的 user_id = 0/ -999 的数据。
    • LEFT JOIN 时,所有这些数据都会去和维度表中的那一条 user_id = 0 的记录进行关联。
    • 在Spark的Hash Join机制中,所有相同Key的数据会被分配到同一个Reduce Task(分区)上进行处理。
    • 这就导致一个Task要处理数亿条数据,而其他Task可能只处理几十万条。这个“倒霉”的Task不堪重负,成为整个任务的瓶颈,并可能导致OOM。同时,因为维度表的一条数据被重复关联了数亿次,导致最终结果数据量激增。

三、解决方案(附代码)

找到根因后,解决方案就清晰了。我们的目标是打散倾斜Key,分散计算压力

方案一:临时解决方案 - 分离倾斜数据再合并 (SQL思路)

代码语言:sql
复制
-- 1. 创建临时视图,筛选出倾斜Key(例如0和-999)的数据单独处理
WITH skewed_fact AS (
  SELECT /*+ BROADCAST(dim) */ 
    f.*, 
    dim.* -- 选择需要的维度字段
  FROM ods_user_events f
  LEFT JOIN dim_user_info dim ON f.user_id = dim.user_id
  WHERE f.user_id IN (0, -999) -- 处理倾斜Key
),
normal_fact AS (
  SELECT /*+ SHUFFLE_HASH(dim) */ 
    f.*, 
    dim.* -- 选择需要的维度字段
  FROM ods_user_events f
  LEFT JOIN dim_user_info dim ON f.user_id = dim.user_id
  WHERE f.user_id NOT IN (0, -999) -- 处理正常Key
)

-- 2. 将两部分数据合并Union起来
SELECT * FROM skewed_fact
UNION ALL
SELECT * FROM normal_fact;

优点:简单直观,可以用纯SQL实现。

缺点:需要手动指定倾斜Key,不够自动化;代码冗余;如果倾斜Key很多或变化,维护成本高。

方案二:终极解决方案 - 使用Spark SALTLING(Spark高级特性)

我们最终采用了更为优雅和自动化的方式:给倾斜Key添加随机前缀(SALT),从而将其打散到多个Task中去计算。

由于纯SQL实现较为繁琐,我们选择了使用Spark DataFrame API(Scala)来实现。

代码语言:scala
复制
// 1. 定义可能存在的数据倾斜Key(可根据前期分析配置化)
val skewedKeys = Seq(0, -999)

// 2. 对事实表(左表)添加随机Salt前缀
import org.apache.spark.sql.functions._
val saltedFactDF = odsUserEventsDF
  .withColumn("salted_user_id",
    when(col("user_id").isin(skewedKeys: _*),
      concat(col("user_id"), lit("_"), (rand() * 5).cast("int")) // 假设我们打散成5份
    ).otherwise(col("user_id"))
  )

// 3. 对维度表(右表)进行扩容,生成所有可能的Salt后缀对应的数据
val saltedDimDF = dimUserInfoDF
  // 首先过滤出倾斜Key,并为其生成5条带不同后缀的数据
  .filter(col("user_id").isin(skewedKeys: _*))
  .withColumn("salted_user_id",
    explode(array((0 until 5).map(lit(_)): _*)) // 生成0-4的数组并炸开
  )
  .withColumn("salted_user_id", concat(col("user_id"), lit("_"), col("salted_user_id")))
  // 然后union上非倾斜Key的原始数据(salted_user_id就是原始的user_id)
  .union(
    dimUserInfoDF
      .filter(!col("user_id").isin(skewedKeys: _*))
      .withColumn("salted_user_id", col("user_id"))
  )

// 4. 使用新的salted_user_id字段进行Join
val resultDF = saltedFactDF
  .join(saltedDimDF,
    saltedFactDF("salted_user_id") === saltedDimDF("salted_user_id"),
    "left"
  )
  .drop("salted_user_id") // join完成后丢弃临时加的Salt字段

// 5. 将resultDF写入DWD层目标表

代码解释

  • 打散左表:为所有倾斜Key(如0)的记录随机添加 _0, _1, ..., _4 的后缀,将原本一个Key的数据随机分散到5个新Key上。
  • 扩容右表:为维度表中倾斜Key的那条记录,复制生成5条记录,其Key分别为 0_0, 0_1, ..., 0_4
  • 重新Join:现在,左表里一个 0_2 的记录只会和右表里 0_2 的那条记录匹配。计算压力就从1个Task分散到了5个Task上。
  • 非倾斜Key:正常处理,不受影响。

通过这种方式,彻底解决了单个Task负载过重的问题,任务从超时失败优化到20分钟内稳定完成


四、避坑总结
  1. 数据质量是根基:数据倾斜往往是数据质量问题的体现(如无效默认值、脏数据)。在数仓建设初期,必须建立严格的数据监控和稽核体系,及时发现并治理此类问题,从源头上减少倾斜的发生。
  2. 监控与预警:对ETL任务的关键指标(Task耗时分布、输入数据量分布、Shuffle读写量)进行监控。一旦发现显著差异,立即报警,做到问题早发现。
  3. 不要盲目调参:遇到运行慢,不要只会调大 spark.sql.shuffle.partitions 或 Executor 内存。这治标不治本,甚至可能拖垮整个集群。先分析,再优化
  4. 善用高级特性:了解并熟练运用 Spark 提供的高级优化手段,如 SALTINGBroadcast Join(对小表)等。本次的 SALTING 是解决重度数据倾斜的“银弹”。
  5. 配置化思维:可以将常见的倾斜Key(如0, -1, -999, ‘’空字符串等)放入配置文件中,使优化代码与业务逻辑解耦,便于维护和管理。

希望这次真实的“踩坑”和“填坑”经历能给大家带来一些启发。数仓建设之路,道阻且长,行则将至。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 技术环境
  • 一、Bug 现象
  • 二、排查步骤
  • 三、解决方案(附代码)
  • 四、避坑总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档