并行查询

最近更新时间:2026-06-30 16:45:32

我的收藏

功能简介

TDSQL Boundless 并行查询(Parallel Query,简称 PQ)是内建于计算引擎的并行查询框架。当查询数据量达到一定阈值时,优化器自动将查询计划分解为多个子任务,由多个 Worker 线程并行执行,使复杂查询的响应时间大幅下降。

总体流程

1. SQL 解析与优化:SQL 经过 MySQL 原生优化器生成串行执行计划
2. 并行计划生成:PQ 优化器对串行计划进行评估,判断是否满足并行条件
3. 计划分解:将符合条件的表扫描拆分为多个范围分片,生成 PartialPlan(部分计划)
4. 并行执行:Worker 线程(简称为 Worker)各自执行 PartialPlan,独立扫描分配到的数据分片
5. 结果收集:Leader 线程(简称为 Leader)通过 Gather 操作汇总各 Worker 的结果,必要时做归并排序或去重
6. 返回客户端:Leader 将最终结果返回客户端

适用场景

大表全表扫描:表数据量在万行以上
多表关联查询:JOIN 查询涉及大表扫描
聚合分析:GROUP BY + 聚合函数的大数据量查询
排序查询:需要对大量数据进行 ORDER BY 排序
分区表查询:按分区键并行扫描各分区
含子查询的复杂 SQL:子查询可并行预先执行

功能原理

并行计划生成

并行查询在 MySQL 原生优化器之后工作。MySQL 先产生串行执行计划,PQ 优化器再对该计划进行分析:
1. 参数限制检查max_parallel_workers 必须大于0、max_parallel_degree 必须有效或存在 PARALLEL Hint,否则直接退出
2. 代价评估:检查串行执行代价是否超过 parallel_plan_cost_thresholdparallel_query_switchforce=on 或存在 PARALLEL Hint 时跳过此检查
3. 并行安全检查:遍历查询中所有表达式,任一表达式不支持并行执行则拒绝并行
4. 分布策略选择:为每张表依次生成 final_singleton(最终单点分布,始终生成)、replicated(复制分布,表可下推时生成)、partial_scan (部分扫描,仅当表行数大于 parallel_scan_records_threshold 时生成)三种分布路径,分片系统中额外生成 hashed(哈希分布)
5. JOIN 路径组合:对 outer/inner 两侧路径做笛卡尔积,剪枝不兼容组合(如 replicated 与 final_singleton),必要时插入 Collector(数据汇总算子) 桥接
6. 最优路径选择:根据代价选择路径,优先选 Hint 匹配最多的路径。parallel_plan_compare_serial_cost 控制串行路径是否参与比较
7. Slice 划分:沿最优路径在 Collector 处切分,根 Slice 运行于 Leader,每个子 Slice 对应一个 PartialPlan

Slice 与 PartialPlan

并行计划将原始计划按 Slice 进行分片。每个 Slice 是一个独立的执行单元:
Slice 0(Leader 端):在 Leader 上执行,负责汇总结果。包括 Gather 操作和需要全局视角的操作
Slice 1 ~ N(Worker 端):在 Worker 上并行执行,每个包含一张表的并行扫描及相关算子
例如一个简单的 SELECT * FROM t1 WHERE a > 0 的并行计划:
Slice 1:4个 Worker 各自扫描 t1 的1/4数据范围
Slice 0:Leader 通过 Gather 收集4个 Worker 的结果
对于带 JOIN 的查询 SELECT * FROM t1 JOIN t2
Slice 1:4个 Worker 并行扫描 t1,各自与 t2 在本 Worker 内完成 Hash Join
Slice 0:Leader 通过 Gather 收集4个 Worker 的 JOIN 结果

并行扫描

PQ 支持两种并行扫描方式,优化器根据表结构和查询特征自动选择。

Dynamic Range Scan(动态范围扫描)

将表数据按主键范围动态切分为 N 个连续分片。每个 Worker 负责一个分片的扫描。
适用场景
未分区或无需按分区分片的普通表
带范围条件的查询(WHERE id BETWEEN X AND Y),优化器可进一步剪枝无关分片
工作方式
1. 优化器估算表的总行数和数据分布
2. 将 Key 空间均匀切分为 N 个区间(N ≤ max_parallel_degree
3. 每个 Worker 拿到一个区间后独立执行扫描

Partition Scan(分区扫描)

按物理分区将数据分片,每个分区分配给一个 Worker。
适用场景
使用 HASH 或 RANGE 分区的分区表
希望利用分区裁剪的查询(WHERE partition_key = X
工作方式
1. 优化器识别表中可用的分区列表
2. 将各分区均匀分配给 Worker
3. Worker 扫描分配给自己的分区

并行扫描方式选择建议

分区表且分区数足够多(≥ 并行度)→ 优先使用 Partition Scan
非分区表或分区数少 → 使用 Dynamic Range Scan
可通过 Hint 显式指定:SELECT /*+ PARALLEL(PARTITION) */SELECT /*+ PARALLEL(DYNAMIC_RANGE) */

Gather(结果收集)

Gather 是 Leader 线程从多个 Worker 收集结果的算子。它位于 Leader 端计划的关键位置,负责将并行执行的结果汇总。
Gather 支持多种数据处理模式:
模式
适用场景
说明
普通 Gather
无需保证顺序
简单将各 Worker 结果逐行收集并传递给上层算子
Merge Sort
需要 ORDER BY 输出
收集时对各 Worker 已排序的结果做归并排序。Worker 端做局部排序,Leader 端做归并
Gather 也支持对结果进行流式或物化输出。

并行聚合

GROUP BY 聚合操作在并行执行中有多种策略,优化器根据查询特征自动选择最优方案。

一阶段聚合

Worker 只负责扫描数据,数据汇聚到 Leader 后,由 Leader 执行聚合。
适用场景:
聚合函数或 GROUP BY 表达式不满足下推条件(如含有不安全的聚合函数、子查询等),无法将聚合下推到 Worker 时的兜底策略。

两阶段聚合(默认策略)

Worker 先执行局部聚合 → Gather 汇总 → Leader 执行最终聚合。
流程
1. 各 Worker 并行扫描数据,对各自范围内的数据执行局部 GROUP BY
2. Worker 将局部聚合结果发送给 Leader
3. Leader 对来自所有 Worker 的中间结果再次执行 GROUP BY,得到最终聚合结果

完全下推聚合

当优化器可以确定各 Worker 间的 GROUP BY 键值不会重复时,将聚合完全下推到 Worker。此时 Leader 无需再次 GROUP BY,只需做简单汇总。
开启条件
parallel_query_switchfull_grouping_pushdown 选项已开启(默认开启)
聚合函数和 GROUP BY 表达式满足下推安全性
数据的分布键与 GROUP BY 表达式兼容,即每个 Worker 上的分组数据完整

并行排序

下推排序(默认)

Worker 先对各自结果执行排序 → Leader 通过 Gather Merge Sort 归并:
1. 各 Worker 使用本地排序算法(filesort)对各自数据排序
2. Leader 启动 Merge Sort,同时从 N 个 Worker 的有序结果流中读取
3. 通过最小堆选出全局最小值,逐行返回
优势:充分利用多核并行排序能力,加速大数据量排序

非下推排序

当排序列无法下推(排序表达式包含非并行安全的函数、排序列是常量),数据先 Gather 到 Leader,再在 Leader 上串行排序。

子查询并行

PQ 对于查询提供三种执行策略:

预先执行(Pre-Evaluation)

子查询在父查询执行之前并行执行完毕。执行结果供所有 Worker 直接读取。
适用场景
非关联子查询(子查询不依赖父查询的列)
子查询结果集相对较小,可放入内存
示例
SELECT * FROM t1 WHERE t1.dept_id IN (
SELECT id FROM departments WHERE region = 'ASIA'
);
子查询 SELECT id FROM departments WHERE region = 'ASIA' 先并行执行,结果共享给扫描 t1 的 Worker。

下推到 Worker(Pushdown)

相关子查询整体推送到每个 Worker 上执行。每个 Worker 独立执行自己的子查询部分。
适用场景
相关子查询(子查询引用了父查询的列)
subquery_pushdown 选项开启(默认开启)

按需执行(Inline Evaluation)

保持 MySQL 原始逻辑,父查询驱动子查询按需执行。使用 PQ_INLINE_EVALUATION Hint 可强制此模式。

EXPLAIN 查看并行计划

基本并行标识


tdsql> EXPLAIN SELECT DISTINCT (COUNT(b) + 1) AS c FROM t1 GROUP BY a;
+----+-------------+-------+------------+------+---------------+------+---------+------+------+----------+--------------------------------------------+
| id | select_type | table | partitions | type | possible_keys | key | key_len | ref | rows | filtered | Extra |
+----+-------------+-------+------------+------+---------------+------+---------+------+------+----------+--------------------------------------------+
| 1 | SIMPLE | t1 | NULL | ALL | PRIMARY | NULL | NULL | NULL | 43 | 100.00 | Parallel scan (4 workers); Using temporary |
+----+-------------+-------+------------+------+---------------+------+---------+------+------+----------+--------------------------------------------+
tdsql> SHOW WARNINGS;
+-------+------+--------------------------------------------------------------------------------------------------+
| Level | Code | Message |
+-------+------+--------------------------------------------------------------------------------------------------+
| Note | 1003 | /* select#1 */ select distinct (count(`b`) + 1) AS `c` from `test`.`t1` group by `test`.`t1`.`a` |
| Note | 1003 | Query is executed in a parallel plan; explain with tree format to see the plan details. |
+-------+------+--------------------------------------------------------------------------------------------------+

若 Extra 列显示 Parallel scan (N workers),说明并行生效。同时 WARNING 会提示 "Query is executed in a parallel plan; explain with tree format to see the plan details.",建议使用 tree 格式查看完整并行计划。

树形格式(推荐)


tdsql> EXPLAIN FORMAT=TREE
-> SELECT
-> t2.a,
-> (SELECT SUM(t1.a) FROM t1) as total_sum,
-> (SELECT COUNT(t1.a) FROM t1) as total_count
-> FROM t2
-> ORDER BY t2.a;
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| EXPLAIN |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| -> Gather (slice: 1, workers: 4) (cost=2310.16..2312.04 rows=5)
Merge sort: t2.a
-> Sort: t2.a (cost=5.54..5.54 rows=1)
-> Table scan on t2, with range parallel scan (cost=2.78..3.48 rows=1)
-> Select #2 (subquery in projection; run only once)
-> Aggregate: sum(t1.a) (cost=2309.05..2309.05 rows=1)
-> Gather (slice: 1, workers: 4) (cost=2308.65..2308.65 rows=1)
-> Aggregate: sum(t1.a) (cost=4.14..4.14 rows=1)
-> Table scan on t1, with range parallel scan (cost=2.60..3.25 rows=1)
-> Select #3 (subquery in projection; run only once)
-> Aggregate: count(t1.a) (cost=2306.05..2306.05 rows=1)
-> Gather (slice: 1, workers: 4) (cost=2304.43..2305.27 rows=4)
-> Count rows in t1 (cost=0.83..0.83 rows=1)
Table scan on t1, with pushed projection, with range parallel scan
|
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

关键节点含义
节点
含义
Gather (slice: 1, workers: 4)
4个 Worker 并行执行 Slice 1,Leader 收集结果
with range parallel scan 或者 with partition parallel scan
使用 Dynamic Range Scan/Partition Scan 方式扫描表数据
Merge sort: col_name
Gather 收集时做归并排序(保证输出按 col_name 有序)
Merge sort with duplicate removal: col
Gather 收集时做归并排序并去重
Pre-evaluated subqueries: select #N
预先并行执行完毕的子查询(子查询结果已就绪)

执行分析

tdsql> explain analyze verbose select * from t1, t2 where t1.a = t2.a;
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| EXPLAIN |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| -> Inner hash join (t2.a = t1.a) (cost=6.28 rows=3) (actual time=0.511..0.533 rows=3 loops=1)
Chunk pair files: 0, memory usage: 16kB
-> Table scan on t2 (cost=0.88 rows=3) (actual time=0.167..0.185 rows=3 loops=1)
-> Hash
-> Table scan on t1 (cost=2.84 rows=3) (actual time=0.262..0.283 rows=3 loops=1)
RPC statistics: leader
-> LocalScanRecord=latency(ms): 2,0.266323,0.081208...0.185115, retry_count: 0, retry_interval_all(ms): 0.000000, failure_count: 0 |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
上述结果表示:
查询内存使用量为16KB
查询包含2次 LocalScanRecord RPC
RPC 总耗时为0.266323ms
单次 RPC 最短耗时为0.081208ms,最长耗时为0.185115ms
RPC 执行过程中未发生重试和失败
EXPLAIN ANALYZE VERBOSE 适用于需要进一步分析 RPC 开销、内存使用和算子执行细节的场景。

Optimizer Trace 查看并行决策

当需要了解为什么某条 SQL 走了或没走并行时:
SET optimizer_trace_features = 'parallel_plan_optimization=on';
SET optimizer_trace = 'enabled=on';
EXPLAIN SELECT * FROM t1 WHERE a > 4;
SELECT * FROM information_schema.optimizer_trace \\G
Trace 中的关键信息:
considering.chosen: true/false — 是否选择了并行
considering.optimization.enumerating_paths — 优化器评估了哪些并行路径
cause(未选择并行查询时)— 具体原因,如 plan_cost_less_than_thresholddisabled_by_limit

系统变量

并行度控制

变量名
级别
默认值
说明
max_parallel_degree
Session
4
单个查询的最大 Worker 数。设为0禁用并行
max_parallel_workers
Global
10000
整个节点的最大 Worker 总数。设为0全局禁用并行

阈值控制

变量名
级别
默认值
说明
parallel_plan_cost_threshold
Session
50000
串行代价超过该值才考虑生成并行计划
parallel_scan_records_threshold
Session
10000
表扫描行数超过该值才将该表纳入并行扫描候选
parallel_scan_ranges_threshold
Session
2
可切分的扫描范围数超过该值才使用并行计划

功能开关

变量名
级别
默认值
说明
parallel_query_switch
Session
见说明
并行查询功能开关,位掩码格式,支持多个选项同时设置
parallel_query_switch 选项说明
选项
默认
说明
force
off
强制使用并行计划,跳过所有 cost 和行数阈值检查
subquery_pushdown
on
支持相关子查询整体推送到 Worker 上并行执行
full_grouping_pushdown
on
当 Worker 间 GROUP BY 键值不重复时,将聚合完全下推到 Worker
其他还包括 group_min_max_pushdownpartition_group_min_max_pushdowndynamic_range_parallel_scanpartition_parallel_scan 等扫描方式选项,默认均开启。
警告:
parallel_query_switch 是一个开发者选项,用于调试。每个发布都可能调整开关的内容(包括添加或删除),请勿在管理工具或正规的维护脚本中使用。
使用方式
-- 查看当前开关值
SHOW VARIABLES LIKE 'parallel_query_switch';

-- 强制并行(跳过所有阈值检查)
SET parallel_query_switch = 'force=on';

-- 关闭子查询下推(含相关子查询的查询不走并行)
SET parallel_query_switch = 'subquery_pushdown=off';

优化器 Hints

详细请参见 优化器 Hints

使用限制

限制项
说明
Dynamic Range Scan
只能在 Leader 上执行
Index Merge 扫描
不支持并行扫描
Rollup
不包含
SELECT 列表中相关子查询
不支持(如 SELECT 列表中引用了外层查询的列)
非 Parallel Safe 表达式
如用户变量 @a:=1 等不支持并行

并行生效条件

并行计划生成需同时满足以下条件(或设置 parallel_query_switch='force=on' 跳过所有检查):
1. max_parallel_degree > 0
2. 查询的串行执行代价 > parallel_plan_cost_threshold

常见问题

EXPLAIN 显示没有走并行?

1. 检查 max_parallel_degree 是否为0
2. 使用 Optimizer Trace 查看具体原因:
plan_cost_less_than_threshold → 降低 parallel_plan_cost_threshold
3. 快速验证:SET parallel_query_switch='force=on' 强制并行

并行查询比串行还慢?

小数据量:并行启动和线程间通信的开销超过并行收益。调高阈值避免小查询走并行
Worker 过多:线程上下文切换开销过大。适当降低 max_parallel_degree
不必要的排序:检查是否因为 Gather 的 Merge Sort 引入了额外排序开销

如何为特定 SQL 固定使用并行?

无需修改应用代码,通过 Statement Outline 绑定 Hint:
CALL dbms_admin.statement_outline_add_rule(
'rule_name',
'SELECT /*+ PARALLEL(4) */ SUM(c), b FROM t1 GROUP BY b'
);

分区表应该选哪种扫描方式?

分区数 ≥ 期望并行度:使用 PARALLEL(PARTITION) 按分区扫描
分区数少或非分区表:使用 PARALLEL(DYNAMIC_RANGE) 按范围扫描
不确定时:不指定扫描方式,让优化器自动选择

参考文档