首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >HashJoin性能优化: RuntimeFilter

HashJoin性能优化: RuntimeFilter

原创
作者头像
awakeljw
修改2022-10-27 10:57:30
1.3K0
修改2022-10-27 10:57:30
举报

1.什么是RuntimeFilter

HashJoin是关联查询中最重要的算子,对于计算密集型应用,关联查询的性能瓶颈主要在于HashJoin算子Probe阶段的Hash查找和Data Shuffle上。RuntimeFilter是用于运行时优化HashJoin性能的一种常见方法,RuntimeFilter对于INNER JOIN, Right Join, Semi Join等都有显著的性能提升效果。目前RuntimeFilter技术已经在很多数据库中得以应用,比如SnowFlake(BloomJoins), Impala,EMR Spark,Apache doris,Starrocks,PolarDB-X等。

分析型数据库中星型模型是常见的建模方法。比较有代表性的测试集就是SSB(Star Schema Benchmark)。星型模型主要分为事实表和维度表,事实表一般是大表,比如SSB测试集中的lineorder表,维度表一般为小表,比如SSB测试集中的customer,date等。这里的大表小表准确来说是指表的Distinct记录数。RuntimeFilter对于这类星型的数据模型下的复杂查询有非常大的提升作用。

HashJoin实现上是通过内表(一般为较小的表)构建Hash Table,然后遍历外表(一般为大表)数据查找Hash Table,根据不同的Join类型输出匹配的结果。HashJoin中Hash table probe算子一般是最为耗时的过程,另外一个耗时的过程就是数据的Shuffle过程,一般它们也就是性能的瓶颈点。那么对HashJoin性能优化最朴素的思想是减少probe遍历的数据量或者减少数据移动的大小从而提升性能。RuntimeFilter的原理正是将probe操作push down到外表的Scan算子,使用更快的非精确查找算法(MINMAX,BloomFilter)或者更快的精确查找算法(HashSet)来提前过滤数据,从而提升整个查询的性能。RuntimeFilter与SemiJoin相比,不同在于RuntimeFilter需要下推到Scan,实现更少的数据移动。类似的优化思路比如mysql中的pickup join,通过内表过滤后的结果集,通过索引计算左表的值,如此便不需要计算hash,这对于内表非常小的场景效果最佳。

2. 如何生成RuntimeFilter

RuntimeFilter是在优化器的CBO阶段之后插入物理计划中的。具体来说,首先需要从根节点遍历整个查询计划树,找到HashJoinNode节点,然后找到该HashJoinNode的等值表达式,将Join右孩子节点的条件下推到左孩子节点是Scan的节点上。举例来说对于下面的Join查询,首先生成如下的物理计划,遍历第一个Join节点,将 t3.a加入到RuntimeFilters中,编号为RuntimeFilterId=0,然后继续遍历左子树,遇到Join节点,将t2.a加入到RuntimeFilters中,编号为RuntimeFilterId=1。然后遇到ScanNode节点时,将RuntimeFilters上的所有RuntimeFilter下推到Scan t1节点上。当然,在论文https://dl.acm.org/doi/pdf/10.1145/3318464.3389769也提到将RuntimeFilter纳入到CBO的代价估算中可能获得更优的执行计划。

图1:生成带有RuntimeFilter的物理计划
图1:生成带有RuntimeFilter的物理计划

2.1 RuntimeFilter不能下推的情况

并非所有的RuntimeFilter都可以下推,比如对于下面的查询

select count(*) from store t1 left outer join store t2 on t1.s_store_sk = t2.s_store_sk where coalesce(t2.s_store_sk + 100, 100) in (select ifnull(100, s_store_sk) from store);

按照上面的思路生成的查询计划是这样的

Query plan:
    |   4:HASH JOIN
    |   |  join op: LEFT SEMI JOIN (BROADCAST)
    |   |  equal join conjunct: coalesce(`t2`.`s_store_sk` + 100, 100) = ifnull(100, `s_store_sk`)
    |   |  runtime filters: RF000[in] <- ifnull(100, `s_store_sk`)
    |   |----7:EXCHANGE
    |   3:HASH JOIN
    |   |  join op: LEFT OUTER JOIN
    |   |  equal join conjunct: `t1`.`s_store_sk` = `t2`.`s_store_sk`
    |   |----1:OlapScanNode
    |   |       TABLE: store
    |   |       runtime filters: RF000[in] -> coalesce(`t2`.`s_store_sk` + 100, 100)
    |   0:OlapScanNode
    |      TABLE: store
  1. 假设store中有N条记录,并且为非零。
  2. 生成RF000由于ifnull函数的特点,所有的RF000中是一个100
  3. 将RF000下推到coalesce中,只有当s_store_sk为null时,coalesce的结果才是100,所以plannode1的结果是null,但由于LEFT OUTER JOIN的特点,如果内表不存在则需要补NULL,所以PlanNode3的结果为N个NULL
  4. 在LEFT SEMI JOIN中由于coalesce计算结果都是NULL,所以函数结果都是100,而ifnull的结果也是100,所以最后count(*)的结果是N。

但实际上正确的结果应该是0.所以在以上这种情况下,RuntimeFilter就不能下推。

除此之外,RuntimeFilter对于Left Outer Join,Anti Join,Full Outer Join等也不适用,这是因为外连接没有找到对应的数据时需要补NULL,而不能直接过滤掉。目前关于RuntimeFilter的限制可以参考https://doris.apache.org/docs/advanced/join-optimization/runtime-filter/

2.2 BloomFilterBits

BloomFilter 中衡量 BloomFilter 的误判率称为 false positives。 false positives 可以通过以下公式计算得到 \epsilon = (1-(1-1/m)^{kn})^{k} \approx(1-e^{-kn/m})^{k}

所以对于给定的误判率,我们可以估算 BloomFilter 位数组的大小,m 即 BloomFilter 的位数组大小。 m=\frac {-kn}{ln(1-\epsilon^{1/k})}

k表示BloomFilter中hash函数的个数。这里非常重要的一点就是n,需要通过估算 number distinct value 值得到。

所以精确的统计信息有助于更高效的实现BloomFilter。

2.3 Filter顺序对性能的影响

多个Filter在实现上有两种方式。

一种是短路计算,short-circuit 的优势在于逐步减少向下传递的数据量,所以过滤效果越好的过滤条件需要提前。所以依赖于优化器利用CBO提前准备好过滤顺序。这种方法在过滤效果较好时性能更好。

另一种是避免利用Filter的提前过滤,将多个Filter的计算移动到最后的AND或OR中,充分利用位运算加速最后的与或运算。这种方式稳定性较好,不受过滤效果的影响。

动态调整RuntimeFilter的顺序对提高查询性能会有一定的帮助。

3.Runtime是怎样执行的

3.1 RuntimeFilter的构建

RuntimeFilter一般是在HashJoin构建HashTable的时候构建出来的,主流的RuntimeFilterType有三种,In,BloomFilter,MinMax。在我们的实现中,可以根据HashTable读取的数据多少来决定使用哪种Type。一般来说In更适合数据量较少的场景。BloomFilter适合数据量较多的场景,MinMax通常可以与BloomFilter一起使用,在实现上统一抽象为一个RuntimeFilter。

分布式数据库中,HashJoin最常使用的有两种分布方式,BoardCastJoin和RedistirbuteJoin。如下图3中的Join(BC)代表的是BoardCastJoin,表示将小表广播到所有的节点。Join(Re)代表的是RedistributeJoin,表示根据JoinKey对大小表数据进行重分布。从实现上来看,BoardCastJoin更适合小表比较少的场景下,在每个Backend上,BoardCastJoin得到的RuntimeFilter都是完整的,可以直接下推到外表。而RedistributeJoin得到的RuntimeFilter需要先Merge后再Shuffle后,才能下推到外表。RedistributeJoin由于右表构建的RuntimeFilter是不完整的,如果将不完整的RuntimeFilter下推到Scan层,则有可能会漏掉部分数据,所以需要将所有RuntimeFilter合并后才可以下推到外表。多个RuntimeFilter可以通过And的逻辑运算同时过滤数据。

3.2 RuntimeFilter的关键数据结构

RuntimeFilterBuilder: 用于构建RuntimeFilter并更新RuntimeFilterMgr中对应的RuntimeFilterMerger中的RuntimeFilter。

RuntimeFilterMgr:Backend上全局唯一的数据结构,维护所有queryid到RuntimeFilterMerger的映射关系

RuntimeFilterMerger:维护当前执行query的所有RuntimeFilter结构,每个FilterId对应唯一的RuntimeFilterMergerEntity,而RuntimeFilterKey唯一对应FilterId, 多个RuntimeFilterKey可以对应同一个FilterId。FilterId是在优化器生成计划时生成的。

假设有两张表,每张表有两个col,t1(col1, col2), t2(col1, col2)。以查询select count(*) from t1 inner join t2 on t1.col1 = t2.col1 and t1.col1 = t2.col2为例,此时有两个RuntimeFilterKey , FilterId, RuntimeFilter的对应关系则如下所示。

图2:RuntimeFilter关键数据结构
图2:RuntimeFilter关键数据结构

3.3 RuntimeFilter的执行流程

  1. 在Pipeline执行框架中分为prepare和execute两个阶段,Prepare阶段会进行整个pipeline的构建,在Prepare阶段根据已经生成的执行计划构建RuntimeKey,FilterId,RuntimeFilterMerger所有的关键数据结构。
  2. 在HashJoin build内表时通过RuntimeFilterBuilder构建RuntimeFilter,在Build结束时将RuntimeFilter加入对应的RuntimeFilterMergerEntity中。如果RuntimeFilter需要Merge,则每个Backend将当前构建的RuntimeFilter发送到Merge节点,Merge节点接收到所有的RuntimeFilter合并后shuffle到指定的Backend节点.Backend节点接收到最终的RuntimeFilter后将自身标记为Ready。RuntimeFilter的merge以及dispatch过程都是异步执行的,不会阻塞整个Pipeline的执行。
  3. 在Pipeline执行时,Filter operator中的checkRuntimeFilter函数会反复检测是否当前key对应的RuntimeFilter是Ready的,如果是Ready的则获取后通过RuntimeFilter对每一个需要Filter的Column执行过滤操作,如果没有Ready则不过滤数据。
  4. Probe结束后将所有的RuntimeFilter回收资源,清理RuntimeFilterMgr中当前queryId对应的内存资源。

图3:RuntimeFilter执行流程图
图3:RuntimeFilter执行流程图

4. RuntimeFilter有哪些实现方式

在分布式数据库的并行执行框架中有两种方式,一种是算子间并行,一种是算子内并行。Pipeline是实现算子间并行的最好的方式,Pipeline在很多领域都有提及,比如CPU的pipeline流水线。Pipeline的核心在于调度,每个算子只做一件事,但不同算子可以并行执行,在一个Pipeline内部不必等待上一个算子完全执行完毕。算子内并行是将算子逻辑拆分为多个子算子,子算子执行同样的工作但处理不同的数据。在MPP执行引擎的并行执行框架中两者是同时存在的,从而实现最大的并行度,获得最大的性能收益。

在ClickHouse的Pipeline的实现中,Scan算子要处理的数据块Granule是在Pipelien生成阶段确定的。而RuntimeFilter是在PipeLine执行时才可以确定,所以在Pipeline上实现RuntimeFilter的简单的 方式是通过插入Filter operator(Function)算子的方式,具体实现的是通过Internal Function函数实现的。Internal Function在执行时会通过RuntimeFilterMgr获取对应的RuntimeFilter,如果可以拿到则使用RuntimeFilter来过滤数据,如果没有则直接返回当前数据,由于runtimefilter一般执行足够快,一般在10ms-100ms内,所以这里不会成为瓶颈。

实际上ClickHouse中可以通过主键过滤和PreWhere来提前过滤数据,从而减少IO,这就需要等待RuntimeFilter生成。当RuntimeFilter对应的key是主键索引或者二级索引时,等待RuntimeFilter可以获得更优的性能。

Pipeline中实现的RuntimeFilter具有以下优势:

  1. RuntimeFilter for pipeline 有更少的RPC,一方面这是因为Pipeline中内表只有一个Instance执行,与算子内并行方案相比不需要向每个Fragment Instance发送RuntimeFilter,对于一个RuntimeFilter只需要一次(Backend number -1)次RPC,而像其它大多数方案需要(Backend_number - 1)* Parallel_degree 次RPC。
  2. 不同的RuntimeFilter之间完全独立,通过FilterId 隔离,更少的加锁阻塞,多个RuntimeFilter Consumer可以消费同一个Runtime Filter Producer。
  3. 在算子内并行时RuntimeFilter在Scan时一般需要等待RuntimeFilter生成,在一些并发较大的场景下有可能阻塞查询执行,Pipeline中RuntimeFilter是不会阻塞整个Pipeline执行的,一旦Ready,则立刻生效。在具有主键索引或者二级索引的情况下,也需要等待RuntimeFilter的生成,以获得更好的性能。
图4: 算子内并行与算子间并行
图4: 算子内并行与算子间并行

5. RuntimeFilter为什么能提升性能

  1. RuntimeFilter将Join Porbe phase时的计算下推到Scan层,一方面更快的模糊过滤方法使得Probe处理的数据量显著减少,另一方面对于外表需要Shuffle时减少了执行的network overhead。
  2. RuntimeFilter下推的Key如果是主键时,在Scan时可以进一步裁减数据,减少IO

但是RuntimeFilter也是双刃剑,

  1. RuntimeFilter如果没有过滤效果或者过滤效果非常差时,显然对于整个计划来说是一种冗余,此时RuntimeFilter对性能来说并不会有提升的效果。

     目前在Pipeline上实现的效果如下,下图是 RuntimeFilter开启前后SSB Join 100GB测试集的性能对比结果。

图4: RuntimeFilter优化前后性能对比
图4: RuntimeFilter优化前后性能对比

6. RuntimeFilter的进一步优化

  1. ClickHouse的Prewhere优化和主键索引可以减少扫描的数据量从而提升scan性能,如何更好的利用prewhere的优化,可以进一步IO性能从而优化系统性能
  2. 更精确的统计数据帮助生成精确的RuntimeFilter信息,防止RuntimeFilter对性能的反向优化。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.什么是RuntimeFilter
  • 2. 如何生成RuntimeFilter
    • 2.1 RuntimeFilter不能下推的情况
      • 2.2 BloomFilterBits
        • 2.3 Filter顺序对性能的影响
        • 3.Runtime是怎样执行的
          • 3.1 RuntimeFilter的构建
            • 3.2 RuntimeFilter的关键数据结构
              • 3.3 RuntimeFilter的执行流程
              • 4. RuntimeFilter有哪些实现方式
              • 5. RuntimeFilter为什么能提升性能
              • 6. RuntimeFilter的进一步优化
              相关产品与服务
              数据库
              云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档