Spark Analyzer简介

1Analyzer 的运行机制

Analyzer 由 5 个 Batches 组成 ,一个 Batch 包含 name , Strategy 和一个或多个 Rules 。Strategy 定义了该 Batch 的最大运行次数,而 Rules 就组成了 Analyzer 的关键, Analyzer 对 LogicalPlan 分析的过程其实就是对 plan 逐个应用 Rule 的过程。

处理的流程为依次遍历 Batches ,使用每一个 Batch 处理 LocgicalPlan 。对于每一个 Batch ,依次使用该 Batch 中的 Rule 处理 plan ,处理完的 plan 作为该 Batch 中的下一条 Rule 处理的输入继续处理。经过该 Batch 所有 Rules 处理过的 plan 再重复上述的 Rule 处理过程,直至达到该 Batch 的最大运行次数或该 plan 不再变化为止。然后接着使用下一个 Batch 继续处理该 plan 。具体流程可参考以下流程图。

从上述流程可能得知, Analyzer 对 LocgicalPlan 分析的过程就是使用 Rule 对 LocgicalPlan 处理修改的过程,所以 HuaweiAnalyzer 对原生 Spark 的 Analyzer 进行了继承,并增加了一些 Batch 与 Rule ,从而形成了自己的 Analyzer 。如果要增加自己的 Rule 可以定义一个继承 Rule 的 Object ,并实现它的 apply 方法。在 apply 方法中实现对 LocgicalPlan 的修改,并将该 Rule 添加到 HuaweiAnalyzer 的某一条 Batch 中。

2 详细功能介绍

2.1 Substitution

Substitution 组共有 7 个规则组成。该组规则最大运行次数为 100 次。该 Batch 主要是做权限检查,对一些 Windows 表达式, Or 表达式, Not Exist/IN , Subquery 等进行转换,对 CTE 中的 tablename 进行分析等一些需要前期处理的 Rule 的实现。

2.1.1 SimpleAccessControlRules

该规则用作权限检查,会检查 SQL 语句的 SELECT, INSERT, UPDATE, DELETE, CREATE, OWNER, ADMIN 等权限。其中以下这些命令会在这里检查 DescribeCommand 、 CreateTableUsing 、 CreateTableUsingAsSelect 、 CreateTableAsSelect 、 CacheTableCommand 、 UncacheTableCommand 、 ClearCacheCommand 、 RefreshTable 、 DropTable 、 AnalyzeTable 、 AnalyzeTableCommand 、 AnalyzeTableColumnCommand 、 InsertIntoTable 、 ControlCommand 、 AddJar 、 AddFile 。

2.1.2 WindowsSubstitution

对 LogicalPlan 中的 Windows 表达式进行转换。对于 WithWindowDefinition 如果其 children 中包含 UnresolvedWindowExpression ,则转换为 WindowExpression 。

2.1.3 SubstituteOr

如果 Filter 中包含 Or 表达式,并且 Or 表达式中包含子查询,则进行转换,用 And 和 Union 来表达 Or 。如 Filter( condition=A or B) 转换为 Filter(condition=A) Union Filter(condition=B&!A) 。

2.1.4 NotExistsExpression

将 Not(ExistsExpression) 转换为 ExistsExpression(isExists = is) ,即将 Not 移除, exist 表达式中的 isExists 取反。将 Not(InExpression) 转换为 InExpression(notIn = newNotIn) ,即将 Not 移除, in 表达式中的 notIn 取反。

2.1.5 SubqueryExistsInSubstitution

如果 Filter 的 condition 中包含 Exist,In 或者 Subquery, 则将 Exist/In/Subquery 条件从 Filter 中移除,然后在 Filter 之上添加相应的 Exists/InSubquery/Filter LogicalPlan 。

2.1.6 SubqueryExpressionSubstitution

如果 Plan 中有 UnresolvedSubqueryExpression ,则将其替换为 RoughSubqueryReference 。

2.1.7 HWCTESubstitution

对于包含 With 这样子句的 Plan ,如果 table name 在 database 和 CTE definition 都存在,在分析 UnresolvedRelation 时,优先使用 CTE definition ,而不是 database 中的 table 。

2.2 Resolution

Resolution 组共有 36 个规则组成。该组规则最大运行次数为 100 次。该 Batch 主要用来解析 relation ,列名, function ,别名, condition 或 aggregation 等表达式中的 reference 等。还用于一些类型检查,如加、减、乘、除或其他表达式中的类型进行检查并做简单的类型转换。

2.2.1 RewriteWhereClause

对于 Where 子句中的 Exists , In , Subquery ,将其转换为 LeftSemiJoin 或者 LeftSemiNotExist 。

2.2.2 ResolveRelations

调用 catalog.lookupRelation 获取 SQL 中 table 和其别名的 relation 。生成 DataSourceRelation 或 MetastoreRelation 。

2.2.3 HWResolveReferences

对列名进行解析。对 project , aggregation 中出现 * 进行扩展,展开为其 child 的输出。

对 Join , Intersect , Sort , Generate , Rollup , Cube , GroupingSets 中的 Expression 进行解析,生成相应解析过的 Expression 。

2.2.4 HWResolveGroupingAnalytics

对 Cube , Rollup 转换为 GroupingSets ,而 GroupingSets 转换为 Aggregate 。

2.2.5 ResolveSortReferencesForFunction

如果 OrderBy 中有 Project 中没有的列,那么需要在原来的 Project 中添加这些列以完成 Sort 操作,同时在 Sort 之后增加一个 Project 移除这些列。

2.2.6 ResolveGenerate

对 Generate plan ,将 Generate 中的 OutputList 进行转换,转换为 AttributeReference 。对于 Project, 如果 projectList 中包含 AliasedGenerator ,则增加 Generate 作为 Project 的 child ,并替换 projectList 中相应的列为该 Generate 的 generatorOutput 。

2.2.7 ResolveFunctions

调用 registry.lookupFunction 将 UnresolvedFunction 转换为相对应的函数。

2.2.8 ResolveAliases

如果 Aggregate 、 GroupingAnalytics 、 Project 存在未分析过的别名,则进行转换,增加 Alias 表达式。

2.2.9 ExtractWindowExpressions

如果 Project 、 Aggregate 、 Filter->Aggregate 中包含 WindowExpression ,则对其进行转换。将表达式分为 WindowExpression 和非 WindowExpression ,根据 WindowExpression 中的 WindowSpecDefinition ,创建 Winddow 操作,插入到执行计划树中。

2.2.10 GlobalAggregates

如 Project 中包含 AggregateExpression ,则将其转换为 Aggregate 。

2.2.11 ResolveHaving

对于包含 Having 语句的 Filter ,使用 Having 语句的 NamedExpression 替换掉 Filter 中对应的 condition 。

2.2.12 ResolveAggregateFunctions

如果 Filter 或 Sort 的 child 为 Aggregate ,而且包含 Aggregate 中不存在的 NamedExpression ,则将这些 NamedExpression 下压到 Aggregate 中。

2.2.13 UnrelatedSubqueryConversion

将非相关子查询 RoughSubqueryReference 替换为 UnrelatedSubqueryLiteral 。

2.2.14 PropagateTypes

该 Rule 属于 HiveTypeCoercion 的一部分。转换节点表达式中的 AttributeReference ,如果该 AttributeReference 在其 inputSet 中,则使用该 inputSet 中的表达式。

2.2.15 InConversion

该 Rule 属于 HiveTypeCoercion 的一部分。如果 In 表达式 ( a in b) 中 a,b 中有数据类型不一致,将 b 的数据类型强制 cast 为 a 的数据类型。

2.2.16 WidenSetOperationTypes

该 Rule 属于 HiveTypeCoercion 的一部分。当 Plan 为 SetOperation 时,如果 left,right 两个子 Plan 输出出现数据类型不一致时,则尝试将类型转换为再者都兼容的类型。

2.2.17 PromoteStrings

该 Rule 属于 HiveTypeCoercion 的一部分。对包含 String 类型的二元表达式,如 BinaryArithmetic , Equality , BinaryComparison , In , Sum , SumDistinct , Average 对其中一侧进行转换。

2.2.18 DecimalPrecision

该 Rule 属于 HiveTypeCoercion 的一部分。对 Decimal 的加减乘除、 Pmod 操作进行溢出检查。对 BinaryComparison 、 BinaryOperator 二元操作的 left,right 进行类型检查,如果 Decimal 的精度不一样则进行 cast 操作对对齐精度。

2.2.19 DecimalLimiter

该 Rule 属于 HiveTypeCoercion 的一部分。如果表达式是 cast to Decimal ,则检查转换后 Decimal 的 PRECISION 与 SCALE 是否超过最大支持的 PRECISION 与 SCALE ,如果超过,则使用最大支持的 PRECISION 与 SCALE 。

2.2.20 BooleanEquality

该 Rule 属于 HiveTypeCoercion 的一部分。如果表达式为 bool 与 NumericType 类型的 Literal 进行相等判断,则根据 Literal 将表达式直接转换为判断 bool 为真或为假。如果表达式为 bool 与 NumericType 比较,则转换为 CaseKeyWhen 。

2.2.21 StringToIntegralCasts

该 Rule 属于 HiveTypeCoercion 的一部分。如果表达式为 Cast(e @ StringType(), t: IntegralType), 则将其他转换为 Cast(Cast(e, DecimalType.forType(LongType)), t) 。

2.2.22 FunctionArgumentConversion

该 Rule 属于 HiveTypeCoercion 的一部分。对一些函数的参数进行检查,如 CreateArray 、 Sum 、 SumDistinct 、 Average 、 Coalesce 、 NaNvl 。

2.2.23 CaseWhenCoercion

该 Rule 属于 HiveTypeCoercion 的一部分。对 CaseWhenLike 、 CaseKeyWhen 表达式中的 valueTypes 、 key 、 whenlist 中的数据类型进行转换。DecimalType , IntegralType 对齐为相同精度的 DecimalType 类型, FractionalType 与 DecimalType 对齐为 DoubleType 类型。

2.2.24 IfCoercion

该 Rule 属于 HiveTypeCoercion 的一部分。对 If 表达式中,如果 left 、 right 的数据类型不一致,则进行转换。如果其中一边是 StringType ,另一边为非 BinaryType 、 BooleanType 的 AtomicType ,则将 AtomicType 转换为 StringType 。

2.2.25 Division

该 Rule 属于 HiveTypeCoercion 的一部分。如果 Divide 的分子分母类型不一致,但都为 NumericType ,则将它们转换为 DoubleType 。

2.2.26 PropagateTypes

该 Rule 属于 HiveTypeCoercion 的一部分。对于 AttributeReference 表达式,如果其数据类型与 inputSet 中相同 exprId 的表达式数据类型不一致时,使用 inputSet 中的表达式。

2.2.27 ImplicitTypeCasts

该 Rule 属于 HiveTypeCoercion 的一部分。对于 BinaryOperator ,如果 left 、 right 表达式的数据类型不一致,则尝试转换为二者兼容的类型;对于 ImplicitCastInputTypes ,如果 children 与自身的 inputTypes 不一致时,尝试将 children 的输出转换为 inputTypes ;对于 ExpectsInputTypes ,如果 children 与自身的 inputTypes 不一致且 children 的数据类型为 NullType 时,生成含 Null 的 Literal 。

2.2.28 DateTimeOperations

该 Rule 属于 HiveTypeCoercion 的一部分。如果出现对日期进行 Add 、 Subtract 操作时,将操作后的结果进行类型转换,转换为非 CalendarIntervalType 的另外一个参数的类型。

2.2.29 ParquetConversions

在对 Parquet 格式的表进行操作时,如 InsertIntoTable 、 InsertIntoHiveTable 、 MetastoreRelation ,如果表为非分区表,格式为 parquet ,则 relation 转换为 parquetRelation 。

2.2.30 CreateTables

对于 CreateTableAsSelect 的 LogicalPlan ,根据该 table 的 schema 或者从 subquery 中生成 schema ,生成 CreateTableUsingAsSelect 或者 execution.CreateTableAsSelect 。

2.2.31 PreInsertionCasts

对于 InsertIntoTable 中,如果 project 列与要 insert 的 table 列类型一致,则直接插入。如果不一致,则 project 生成的列进行类型转换,转换为要 insert 的 table 列类型。

2.2.32 WriteToDirs

对于 WriteToDirectory ,如果 tableDesc 包含 "columns.types" ,则对 "columns" 和 "columns.types" 进行修改。对于 WriteToDirectory(path, child, isLocal, tableDesc) 则转换为 execution.WriteToDirectory(path, hive.executePlan(child).executedPlan, isLocal, tableDesc) 。

2.2.33 ExtractPythonUDFs

对于 plan 中 resolved 的 PythonUDF ,如果该 udf 的 reference 是某一个 child’s output 的子集,则在该 child 中增加一层 EvaluatePython 做为该 plan 的 child ,最后在该 plan 上增加一层 Project 。

2.2.34 ResolveHiveWindowFunction

对于 WindowExpression ,将其中的 UnresolvedWindowFunction 进行解析,生成对应的 HiveWindowFunction ,从而生成解析后的 WindowExpression 。

2.2.35 PreInsertCastAndRename

对于 InsertIntoTable ,如果 child 的输出列的类型与名字与实际列类型与名字不一致时,增加 cast 和 alias 进行转换。

2.3 Resolution related subquery

Resolution related subquery 组共有 29 个规则组成。该组规则最大运行次数为 100 次。该 Batch 主要进行相关子查询的解析。大部分与 2.2 中的 Rule 一样。

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20200429A06ZSX00?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 yunjia_community@tencent.com 删除。

扫码关注云+社区

领取腾讯云代金券