专栏首页大数据学习与分享Spark SQL中Not in Subquery为何低效以及如何规避

Spark SQL中Not in Subquery为何低效以及如何规避

首先看个Not in Subquery的SQL:

// test_partition1 和 test_partition2为Hive外部分区表
select * from test_partition1 t1 where t1.id not in (select id from test_partition2);

对应的完整的逻辑计划和物理计划为:

== Parsed Logical Plan ==
'Project [*]
+- 'Filter NOT 't1.id IN (list#3 [])
   :  +- 'Project ['id]
   :     +- 'UnresolvedRelation `test_partition2`
   +- 'SubqueryAlias `t1`
      +- 'UnresolvedRelation `test_partition1`

== Analyzed Logical Plan ==
id: string, name: string, dt: string
Project [id#4, name#5, dt#6]
+- Filter NOT id#4 IN (list#3 [])
   :  +- Project [id#7]
   :     +- SubqueryAlias `default`.`test_partition2`
   :        +- HiveTableRelation `default`.`test_partition2`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#7, name#8], [dt#9]
   +- SubqueryAlias `t1`
      +- SubqueryAlias `default`.`test_partition1`
         +- HiveTableRelation `default`.`test_partition1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#4, name#5], [dt#6]

== Optimized Logical Plan ==
Join LeftAnti, ((id#4 = id#7) || isnull((id#4 = id#7)))
:- HiveTableRelation `default`.`test_partition1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#4, name#5], [dt#6]
+- Project [id#7]
   +- HiveTableRelation `default`.`test_partition2`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#7, name#8], [dt#9]

== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, LeftAnti, ((id#4 = id#7) || isnull((id#4 = id#7)))
:- Scan hive default.test_partition1 [id#4, name#5, dt#6], HiveTableRelation `default`.`test_partition1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#4, name#5], [dt#6]
+- BroadcastExchange IdentityBroadcastMode
   +- Scan hive default.test_partition2 [id#7], HiveTableRelation `default`.`test_partition2`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#7, name#8], [dt#9]

通过上述逻辑计划和物理计划可以看出,Spark SQL在对not in subquery处理,从逻辑计划转换为物理计划时,会最终选择BroadcastNestedLoopJoin(对应到Spark源码中BroadcastNestedLoopJoinExec.scala)策略。

提起BroadcastNestedLoopJoin,不得不提Nested Loop Join,它在很多RDBMS中得到应用,比如mysql。它的工作方式是循环从一张表(outer table)中读取数据,然后访问另一张表(inner table,通常有索引),将outer表中的每一条数据与inner表中的数据进行join,类似一个嵌套的循环并且在循环的过程中进行数据的比对校验是否满足一定条件。

对于被连接的数据集较小的情况下,Nested Loop Join是个较好的选择。但是当数据集非常大时,从它的执行原理可知,效率会很低甚至可能影响整个服务的稳定性。

而Spark SQL中的BroadcastNestedLoopJoin就类似于Nested Loop Join,只不过加上了广播表(build table)而已。

BroadcastNestedLoopJoin是一个低效的物理执行计划,内部实现将子查询(select id from test_partition2)进行广播,然后test_partition1每一条记录通过loop遍历广播的数据去匹配是否满足一定条件。

private def leftExistenceJoin(
   // 广播的数据
    relation: Broadcast[Array[InternalRow]],
    exists: Boolean): RDD[InternalRow] = {
  assert(buildSide == BuildRight)
  
/* streamed对应物理计划中:
  Scan hive default.test_partition1 [id#4, name#5, dt#6], HiveTableRelation `default`.`test_partition1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#4, name#5], [dt#6] 
 */
  streamed.execute().mapPartitionsInternal { streamedIter =>
    val buildRows = relation.value
    val joinedRow = new JoinedRow

   // 条件是否定义。此处为Some(((id#4 = id#7) || isnull((id#4 = id#7))))
    if (condition.isDefined) {
      streamedIter.filter(l => 
        // exists主要是为了根据joinType来进一步条件判断数据的返回与否,此处joinType为LeftAnti
        buildRows.exists(r => boundCondition(joinedRow(l, r))) == exists
      )
      
      // else
    } else if (buildRows.nonEmpty == exists) {
      streamedIter
    } else {
      Iterator.empty
    }
  }
}

由于BroadcastNestedLoopJoin的低效率执行,可能导致长时间占用executor资源,影响集群性能。同时,因为子查询的结果集要进行广播,如果数据量特别大,对driver端也是一个严峻的考验,极有可能带来OOM的风险。因此,在实际生产中,要尽可能利用其他效率相对高的SQL来避免使用Not in Subquery。

虽然通过改写Not in Subquery的SQL,进行低效率的SQL到高效率的SQL过渡,能够避免上面所说的问题。但是这往往建立在我们发现任务执行慢甚至失败,然后排查任务中的SQL,发现"问题"SQL的前提下。那么如何在任务执行前,就"检查"出这样的SQL,从而进行提前预警呢?

这里笔者给出一个思路,就是解析Spark SQL计划,根据Spark SQL的join策略匹配条件等,来判断任务中是否使用了低效的Not in Subquery进行预警,然后通知业务方进行修改。同时,我们在实际完成数据的ETL处理等分析时,也要事前避免类似的低性能SQL。

本文分享自微信公众号 - 大数据学习与分享(bigdatalearnshare),作者:fjs

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-06-04

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 经典的SparkSQL/Hive-SQL/MySQL面试-练习题

    32.查询每门课程的平均成绩,结果按平均成绩降序排列,平均成绩相同时,按课程编号升序排列

    大数据学习与分享
  • Spark SQL解析查询parquet格式Hive表获取分区字段和查询条件

    sparksql处理Hive表数据时,判断加载的是否是分区表,以及分区表的字段有哪些?再进一步限制查询分区表必须指定分区?

    大数据学习与分享
  • 如何为Kafka集群确定合适的分区数以及分区数过多带来的弊端

    通过之前的文章《Kafka分区分配策略》和《Kafka高性能揭秘》,我们了解到:Kafka高吞吐量的原因之一就是通过partition将topic中的消息保存到...

    大数据学习与分享
  • 【MyBatis-3】MyBatis xml映射文件详解

    查询语句是 MyBatis 中最常用的元素之一,光能把数据存到数据库中价值并不大,只有还能重新取出来才有用,多数应用也都是查询比修改要频繁。对每个插入、更新或删...

    云深i不知处
  • PHP的多维数组排序

    熟悉PHP的小伙伴都知道有很多内置函数可以对数组进行排序操作或者自定义一些排序方法(冒泡)等等。

    用户2475223
  • MySQL实现批量Insert和分页查询

    DROP PROCEDURE IF EXISTS test_insert;--如果存在此存储过程则删掉

    浩Coding
  • python list 参数传递方式

    上面方法的定义方式跟下面是一样的,相当于是参数引用的是一个全局变量,所以它的值一直在累加

    py3study
  • MySQL replace用法简介

    今天在工作的过程中碰到一个问题,要把数据库中某个列的所有值中含有"ceshi.test.com"的字符去掉,本来可以写个脚本,把所有的值都取出再导入进行处理,但...

    MySQL轻松学
  • id和instancetype的区别

    在开发当中,id和instancetype都是我们常见的类型,那么这两者有什么异同点呢?

    拉维
  • 一次夜维SQL的性能优化

    最近单位搬家,从国家会议中心,搬往空气清新的顺义后沙峪,搬迁之前的完结上线中,碰见了一些棘手的问题,有一些值得借鉴的地方。

    bisal

扫码关注云+社区

领取腾讯云代金券