和 Join 相关的逻辑层的优化规则主要包含以下几种:
ReorderJoin
EliminateOuterJoin
以及
中和 Join 相关的 predicate pushDown
在 Spark SQL 中,参与 Join 操作的两张表分别被称为流式表(StreamTable)和构件表(BuildTable),不同表的角色在 Spark SQL 中会通过一定的策略进行设定。通常来讲,系统会将大表设置为 StreamTable,小表设置为 BuildTable。流式表的迭代器为 streamIter,构建表的迭代器为 buildIter。遍历 streamIter 的每一条记录,然后在 buildIter 中查找匹配的记录。这个查找过程称为 build 过程。每次 build 操作的结果为一条 JoinedRow(A, B)
,其中 A 来自 streamedIter,B 来自 buildIter。
再例如,在 BroadcastHashJoin 中需要决定广播哪个数据表。这里的 BuildSide 可以简单理解为 “构建的一边”。
在 Spark 中,BuildSide 作为一个抽象类,包含 BuildLeft 和 BuildRight 两个子类,一般在构造 Join 的执行算子时,都会传入一个 BuildSide 的构造参数。在 JoinSelection 中通过 canBuildRight
和 canBuildLeft
判断一个 Join 类型能否 “构建” 右表和左表。
Join 物理执行计划的选取在 JoinSelection 中进行,其主要逻辑如下:
如果是一个等值 join(equi-join)且包含 join hint,我们依次查看 join hint:
broadcast hint
:如果 join 类型支持,使用 broadcast hash join。如果 left 和 right 都有 broadcast hint,选择 size 较小的一侧(基于统计数据)进行 broadcastsort merge hint
:如果 join keys 是可排序的,使用 sort merge join。shuffle hash hint
:如果 join 类型支持,如果 left 和 right 都设置了 shuffle hash hints,选择 size 较小的一侧作为 build sideshuffle replicate NL hint
:如果 join type 为 inner like,使用 cartesian product join(笛卡尔积)JoinSelection 通过 ExtractEquiJoinKeys
来判断是否为等值 Join 并提取相关信息:
如果没有指定 hint 或 hint 不适用,Join 选择顺序如下:
spark.sql.join.preferSortMergeJoin
为 falseinner like join
注①:
createJoinWithoutHint 如下: