功能简介
TDSQL Boundless 并行查询(Parallel Query,简称 PQ)是内建于计算引擎的并行查询框架。当查询数据量达到一定阈值时,优化器自动将查询计划分解为多个子任务,由多个 Worker 线程并行执行,使复杂查询的响应时间大幅下降。
总体流程
1. SQL 解析与优化:SQL 经过 MySQL 原生优化器生成串行执行计划
2. 并行计划生成:PQ 优化器对串行计划进行评估,判断是否满足并行条件
3. 计划分解:将符合条件的表扫描拆分为多个范围分片,生成 PartialPlan(部分计划)
4. 并行执行:Worker 线程(简称为 Worker)各自执行 PartialPlan,独立扫描分配到的数据分片
5. 结果收集:Leader 线程(简称为 Leader)通过 Gather 操作汇总各 Worker 的结果,必要时做归并排序或去重
6. 返回客户端:Leader 将最终结果返回客户端
适用场景
大表全表扫描:表数据量在万行以上
多表关联查询:JOIN 查询涉及大表扫描
聚合分析:GROUP BY + 聚合函数的大数据量查询
排序查询:需要对大量数据进行 ORDER BY 排序
分区表查询:按分区键并行扫描各分区
含子查询的复杂 SQL:子查询可并行预先执行
功能原理
并行计划生成
并行查询在 MySQL 原生优化器之后工作。MySQL 先产生串行执行计划,PQ 优化器再对该计划进行分析:
1. 参数限制检查:
max_parallel_workers 必须大于0、max_parallel_degree 必须有效或存在 PARALLEL Hint,否则直接退出2. 代价评估:检查串行执行代价是否超过
parallel_plan_cost_threshold,parallel_query_switch 中 force=on 或存在 PARALLEL Hint 时跳过此检查3. 并行安全检查:遍历查询中所有表达式,任一表达式不支持并行执行则拒绝并行
4. 分布策略选择:为每张表依次生成 final_singleton(最终单点分布,始终生成)、replicated(复制分布,表可下推时生成)、partial_scan (部分扫描,仅当表行数大于
parallel_scan_records_threshold 时生成)三种分布路径,分片系统中额外生成 hashed(哈希分布)5. JOIN 路径组合:对 outer/inner 两侧路径做笛卡尔积,剪枝不兼容组合(如 replicated 与 final_singleton),必要时插入 Collector(数据汇总算子) 桥接
6. 最优路径选择:根据代价选择路径,优先选 Hint 匹配最多的路径。
parallel_plan_compare_serial_cost 控制串行路径是否参与比较7. Slice 划分:沿最优路径在 Collector 处切分,根 Slice 运行于 Leader,每个子 Slice 对应一个 PartialPlan
Slice 与 PartialPlan
并行计划将原始计划按 Slice 进行分片。每个 Slice 是一个独立的执行单元:
Slice 0(Leader 端):在 Leader 上执行,负责汇总结果。包括 Gather 操作和需要全局视角的操作
Slice 1 ~ N(Worker 端):在 Worker 上并行执行,每个包含一张表的并行扫描及相关算子
例如一个简单的
SELECT * FROM t1 WHERE a > 0 的并行计划:Slice 1:4个 Worker 各自扫描 t1 的1/4数据范围
Slice 0:Leader 通过 Gather 收集4个 Worker 的结果
对于带 JOIN 的查询
SELECT * FROM t1 JOIN t2:Slice 1:4个 Worker 并行扫描 t1,各自与 t2 在本 Worker 内完成 Hash Join
Slice 0:Leader 通过 Gather 收集4个 Worker 的 JOIN 结果
并行扫描
PQ 支持两种并行扫描方式,优化器根据表结构和查询特征自动选择。
Dynamic Range Scan(动态范围扫描)
将表数据按主键范围动态切分为 N 个连续分片。每个 Worker 负责一个分片的扫描。
适用场景:
未分区或无需按分区分片的普通表
带范围条件的查询(
WHERE id BETWEEN X AND Y),优化器可进一步剪枝无关分片工作方式:
1. 优化器估算表的总行数和数据分布
2. 将 Key 空间均匀切分为 N 个区间(N ≤
max_parallel_degree)3. 每个 Worker 拿到一个区间后独立执行扫描
Partition Scan(分区扫描)
按物理分区将数据分片,每个分区分配给一个 Worker。
适用场景:
使用 HASH 或 RANGE 分区的分区表
希望利用分区裁剪的查询(
WHERE partition_key = X)工作方式:
1. 优化器识别表中可用的分区列表
2. 将各分区均匀分配给 Worker
3. Worker 扫描分配给自己的分区
并行扫描方式选择建议:
分区表且分区数足够多(≥ 并行度)→ 优先使用 Partition Scan
非分区表或分区数少 → 使用 Dynamic Range Scan
可通过 Hint 显式指定:
SELECT /*+ PARALLEL(PARTITION) */ 或 SELECT /*+ PARALLEL(DYNAMIC_RANGE) */Gather(结果收集)
Gather 是 Leader 线程从多个 Worker 收集结果的算子。它位于 Leader 端计划的关键位置,负责将并行执行的结果汇总。
Gather 支持多种数据处理模式:
模式 | 适用场景 | 说明 |
普通 Gather | 无需保证顺序 | 简单将各 Worker 结果逐行收集并传递给上层算子 |
Merge Sort | 需要 ORDER BY 输出 | 收集时对各 Worker 已排序的结果做归并排序。Worker 端做局部排序,Leader 端做归并 |
Gather 也支持对结果进行流式或物化输出。
并行聚合
GROUP BY 聚合操作在并行执行中有多种策略,优化器根据查询特征自动选择最优方案。
一阶段聚合
Worker 只负责扫描数据,数据汇聚到 Leader 后,由 Leader 执行聚合。
适用场景:
聚合函数或 GROUP BY 表达式不满足下推条件(如含有不安全的聚合函数、子查询等),无法将聚合下推到 Worker 时的兜底策略。
两阶段聚合(默认策略)
Worker 先执行局部聚合 → Gather 汇总 → Leader 执行最终聚合。
流程:
1. 各 Worker 并行扫描数据,对各自范围内的数据执行局部 GROUP BY
2. Worker 将局部聚合结果发送给 Leader
3. Leader 对来自所有 Worker 的中间结果再次执行 GROUP BY,得到最终聚合结果
完全下推聚合
当优化器可以确定各 Worker 间的 GROUP BY 键值不会重复时,将聚合完全下推到 Worker。此时 Leader 无需再次 GROUP BY,只需做简单汇总。
开启条件:
parallel_query_switch 中 full_grouping_pushdown 选项已开启(默认开启)聚合函数和 GROUP BY 表达式满足下推安全性
数据的分布键与 GROUP BY 表达式兼容,即每个 Worker 上的分组数据完整
并行排序
下推排序(默认)
Worker 先对各自结果执行排序 → Leader 通过 Gather Merge Sort 归并:
1. 各 Worker 使用本地排序算法(filesort)对各自数据排序
2. Leader 启动 Merge Sort,同时从 N 个 Worker 的有序结果流中读取
3. 通过最小堆选出全局最小值,逐行返回
优势:充分利用多核并行排序能力,加速大数据量排序
非下推排序
当排序列无法下推(排序表达式包含非并行安全的函数、排序列是常量),数据先 Gather 到 Leader,再在 Leader 上串行排序。
子查询并行
PQ 对于查询提供三种执行策略:
预先执行(Pre-Evaluation)
子查询在父查询执行之前并行执行完毕。执行结果供所有 Worker 直接读取。
适用场景:
非关联子查询(子查询不依赖父查询的列)
子查询结果集相对较小,可放入内存
示例:
SELECT * FROM t1 WHERE t1.dept_id IN (SELECT id FROM departments WHERE region = 'ASIA');
子查询
SELECT id FROM departments WHERE region = 'ASIA' 先并行执行,结果共享给扫描 t1 的 Worker。下推到 Worker(Pushdown)
相关子查询整体推送到每个 Worker 上执行。每个 Worker 独立执行自己的子查询部分。
适用场景:
相关子查询(子查询引用了父查询的列)
需
subquery_pushdown 选项开启(默认开启)按需执行(Inline Evaluation)
保持 MySQL 原始逻辑,父查询驱动子查询按需执行。使用
PQ_INLINE_EVALUATION Hint 可强制此模式。EXPLAIN 查看并行计划
基本并行标识
tdsql> EXPLAIN SELECT DISTINCT (COUNT(b) + 1) AS c FROM t1 GROUP BY a;+----+-------------+-------+------------+------+---------------+------+---------+------+------+----------+--------------------------------------------+| id | select_type | table | partitions | type | possible_keys | key | key_len | ref | rows | filtered | Extra |+----+-------------+-------+------------+------+---------------+------+---------+------+------+----------+--------------------------------------------+| 1 | SIMPLE | t1 | NULL | ALL | PRIMARY | NULL | NULL | NULL | 43 | 100.00 | Parallel scan (4 workers); Using temporary |+----+-------------+-------+------------+------+---------------+------+---------+------+------+----------+--------------------------------------------+tdsql> SHOW WARNINGS;+-------+------+--------------------------------------------------------------------------------------------------+| Level | Code | Message |+-------+------+--------------------------------------------------------------------------------------------------+| Note | 1003 | /* select#1 */ select distinct (count(`b`) + 1) AS `c` from `test`.`t1` group by `test`.`t1`.`a` || Note | 1003 | Query is executed in a parallel plan; explain with tree format to see the plan details. |+-------+------+--------------------------------------------------------------------------------------------------+
若 Extra 列显示
Parallel scan (N workers),说明并行生效。同时 WARNING 会提示 "Query is executed in a parallel plan; explain with tree format to see the plan details.",建议使用 tree 格式查看完整并行计划。树形格式(推荐)
tdsql> EXPLAIN FORMAT=TREE-> SELECT-> t2.a,-> (SELECT SUM(t1.a) FROM t1) as total_sum,-> (SELECT COUNT(t1.a) FROM t1) as total_count-> FROM t2-> ORDER BY t2.a;+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+| EXPLAIN |+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+| -> Gather (slice: 1, workers: 4) (cost=2310.16..2312.04 rows=5)Merge sort: t2.a-> Sort: t2.a (cost=5.54..5.54 rows=1)-> Table scan on t2, with range parallel scan (cost=2.78..3.48 rows=1)-> Select #2 (subquery in projection; run only once)-> Aggregate: sum(t1.a) (cost=2309.05..2309.05 rows=1)-> Gather (slice: 1, workers: 4) (cost=2308.65..2308.65 rows=1)-> Aggregate: sum(t1.a) (cost=4.14..4.14 rows=1)-> Table scan on t1, with range parallel scan (cost=2.60..3.25 rows=1)-> Select #3 (subquery in projection; run only once)-> Aggregate: count(t1.a) (cost=2306.05..2306.05 rows=1)-> Gather (slice: 1, workers: 4) (cost=2304.43..2305.27 rows=4)-> Count rows in t1 (cost=0.83..0.83 rows=1)Table scan on t1, with pushed projection, with range parallel scan|+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
关键节点含义:
节点 | 含义 |
Gather (slice: 1, workers: 4) | 4个 Worker 并行执行 Slice 1,Leader 收集结果 |
with range parallel scan 或者 with partition parallel scan | 使用 Dynamic Range Scan/Partition Scan 方式扫描表数据 |
Merge sort: col_name | Gather 收集时做归并排序(保证输出按 col_name 有序) |
Merge sort with duplicate removal: col | Gather 收集时做归并排序并去重 |
Pre-evaluated subqueries: select #N | 预先并行执行完毕的子查询(子查询结果已就绪) |
执行分析
tdsql> explain analyze verbose select * from t1, t2 where t1.a = t2.a;+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+| EXPLAIN |+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+| -> Inner hash join (t2.a = t1.a) (cost=6.28 rows=3) (actual time=0.511..0.533 rows=3 loops=1)Chunk pair files: 0, memory usage: 16kB-> Table scan on t2 (cost=0.88 rows=3) (actual time=0.167..0.185 rows=3 loops=1)-> Hash-> Table scan on t1 (cost=2.84 rows=3) (actual time=0.262..0.283 rows=3 loops=1)RPC statistics: leader-> LocalScanRecord=latency(ms): 2,0.266323,0.081208...0.185115, retry_count: 0, retry_interval_all(ms): 0.000000, failure_count: 0 |+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
上述结果表示:
查询内存使用量为16KB
查询包含2次 LocalScanRecord RPC
RPC 总耗时为0.266323ms
单次 RPC 最短耗时为0.081208ms,最长耗时为0.185115ms
RPC 执行过程中未发生重试和失败
EXPLAIN ANALYZE VERBOSE 适用于需要进一步分析 RPC 开销、内存使用和算子执行细节的场景。Optimizer Trace 查看并行决策
当需要了解为什么某条 SQL 走了或没走并行时:
SET optimizer_trace_features = 'parallel_plan_optimization=on';SET optimizer_trace = 'enabled=on';EXPLAIN SELECT * FROM t1 WHERE a > 4;SELECT * FROM information_schema.optimizer_trace \\G
Trace 中的关键信息:
considering.chosen: true/false — 是否选择了并行considering.optimization.enumerating_paths — 优化器评估了哪些并行路径cause(未选择并行查询时)— 具体原因,如 plan_cost_less_than_threshold、disabled_by_limit系统变量
并行度控制
变量名 | 级别 | 默认值 | 说明 |
max_parallel_degree | Session | 4 | 单个查询的最大 Worker 数。设为0禁用并行 |
max_parallel_workers | Global | 10000 | 整个节点的最大 Worker 总数。设为0全局禁用并行 |
阈值控制
变量名 | 级别 | 默认值 | 说明 |
parallel_plan_cost_threshold | Session | 50000 | 串行代价超过该值才考虑生成并行计划 |
parallel_scan_records_threshold | Session | 10000 | 表扫描行数超过该值才将该表纳入并行扫描候选 |
parallel_scan_ranges_threshold | Session | 2 | 可切分的扫描范围数超过该值才使用并行计划 |
功能开关
变量名 | 级别 | 默认值 | 说明 |
parallel_query_switch | Session | 见说明 | 并行查询功能开关,位掩码格式,支持多个选项同时设置 |
parallel_query_switch 选项说明:选项 | 默认 | 说明 |
force | off | 强制使用并行计划,跳过所有 cost 和行数阈值检查 |
subquery_pushdown | on | 支持相关子查询整体推送到 Worker 上并行执行 |
full_grouping_pushdown | on | 当 Worker 间 GROUP BY 键值不重复时,将聚合完全下推到 Worker |
其他还包括
group_min_max_pushdown、partition_group_min_max_pushdown、dynamic_range_parallel_scan、partition_parallel_scan 等扫描方式选项,默认均开启。警告:
parallel_query_switch 是一个开发者选项,用于调试。每个发布都可能调整开关的内容(包括添加或删除),请勿在管理工具或正规的维护脚本中使用。使用方式:
-- 查看当前开关值SHOW VARIABLES LIKE 'parallel_query_switch';-- 强制并行(跳过所有阈值检查)SET parallel_query_switch = 'force=on';-- 关闭子查询下推(含相关子查询的查询不走并行)SET parallel_query_switch = 'subquery_pushdown=off';
优化器 Hints
使用限制
限制项 | 说明 |
Dynamic Range Scan | 只能在 Leader 上执行 |
Index Merge 扫描 | 不支持并行扫描 |
Rollup | 不包含 |
SELECT 列表中相关子查询 | 不支持(如 SELECT 列表中引用了外层查询的列) |
非 Parallel Safe 表达式 | 如用户变量 @a:=1 等不支持并行 |
并行生效条件
并行计划生成需同时满足以下条件(或设置
parallel_query_switch='force=on' 跳过所有检查):1.
max_parallel_degree > 02. 查询的串行执行代价 >
parallel_plan_cost_threshold常见问题
EXPLAIN 显示没有走并行?
1. 检查
max_parallel_degree 是否为02. 使用 Optimizer Trace 查看具体原因:
plan_cost_less_than_threshold → 降低 parallel_plan_cost_threshold3. 快速验证:
SET parallel_query_switch='force=on' 强制并行并行查询比串行还慢?
小数据量:并行启动和线程间通信的开销超过并行收益。调高阈值避免小查询走并行
Worker 过多:线程上下文切换开销过大。适当降低
max_parallel_degree不必要的排序:检查是否因为 Gather 的 Merge Sort 引入了额外排序开销
如何为特定 SQL 固定使用并行?
无需修改应用代码,通过 Statement Outline 绑定 Hint:
CALL dbms_admin.statement_outline_add_rule('rule_name','SELECT /*+ PARALLEL(4) */ SUM(c), b FROM t1 GROUP BY b');
分区表应该选哪种扫描方式?
分区数 ≥ 期望并行度:使用
PARALLEL(PARTITION) 按分区扫描分区数少或非分区表:使用
PARALLEL(DYNAMIC_RANGE) 按范围扫描不确定时:不指定扫描方式,让优化器自动选择
参考文档
理解执行计划
SQL 调优概述