专栏首页Spark学习技巧SparkSql的优化器-Catalyst

SparkSql的优化器-Catalyst

一,概述

为了实现Spark SQL,基于Scala中的函数编程结构设计了一个新的可扩展优化器Catalyst。Catalyst可扩展的设计有两个目的。

首先,希望能够轻松地向Spark SQL添加新的优化技术和功能,特别是为了解决大数据(例如,半结构化数据和高级分析)所遇到的各种问题。第二,我们希望使外部开发人员能够扩展优化器 - 例如,通过添加可将过滤或聚合推送到外部存储系统的数据源特定规则,或支持新的数据类型。Catalyst支持基于规则(rule-based)和基于成本(cost-based)的优化。

其核心是Catalyst包含一个用于表示树并应用规则来操纵它们的通用库。在框架的顶层,构建了特定针对关系查询处理(例如,表达式(expressions),逻辑查询计划(logical query plans))的库,也有一系列的处理不同层次查询执行的规则。总共有四个层次:analysis(语法分析),logical optimization(逻辑优化),physical planning(物理计划),和代码生成,即将请求查询编译成java字节码。对于后者,我们使用另一个scala特性,quasiquotes,使得在运行的过程中从组合表达式产生代码更简单。最后,Catalyst提供一些公共扩展点,包括外部数据源和用户自定义类型。

二,语法树

Catalyst 的主要数据类型就是有节点对象组成的树。每个node都有一个node类型和零个或者多个子节点。Scala中新定义的node类型是TreeNode类的子类。这些对象都是不可改变的,可以使用函数转换来操作。

举一个简单的例子,针对一个非常简单的expression我们总共有下面三种node类型:

A),Literal(value: Int): 一个常量

B),Attribute(name: String):输入行的一个列属性,例如:“x”

C),Add(left: TreeNode, right: TreeNode):两个expressions求加

这些类可以用来构建一棵树。举例,x+(1+2),这个表达式,在scala代码中就如下:

Add(Attribute(x), Add(Literal(1), Literal(2)))

三,规则

可以使用规则来操纵树,这些规则是从一颗树到另一棵树的转换函数。虽然一个规则可以在其输入树上运行任意代码(给定这个树只是一个Scala对象),但最常见的方法是使用一组模式匹配函数来查找和替换子树为特定结构。模式匹配是许多函数编程语言的特征,允许从代数数据类型的潜在嵌套结构中提取值。在Catalyst中,语法树提供了一种转换方法,可以在树的所有节点上递归地应用模式匹配函数,将匹配到的节点转换为特定结果。例如,我们可以实现一个在常量之间进行Add操作的规则,如下所示:

tree.transform {
 case Add(Literal(c1), Literal(c2)) => Literal(c1+c2)
}

将这个规则应用到x+(1+2)这棵语法树上,就会产生一棵新的树,x+3。Case关键词是scala的标准模式匹配的语法,可以用来匹配一个节点类型,同时将名字和抽取到的值对应。(就是c1和c2)。

模式匹配的表达式是部分函数,这也意味着只需要匹配到输入语法树的子集。Catalyst将测试给定规则适用的树的哪些部分,自动跳过不匹配的子树。这种能力意味着规则只需要对给定优化适用的树进行推理,而不是那些不匹配的树。结果就是,新的操作类型加入到系统时规则无需修改。

规则(和Scala模式匹配一般)可以匹配相同转换调用中的多个模式,使其非常简洁,可以一次实现多个转换:

tree.transform {
 case Add(Literal(c1), Literal(c2)) => Literal(c1+c2)
 case Add(left, Literal(0)) => left
 case Add(Literal(0), right) => right
}

为了完全转换一棵树规则往往需要执行多次。Catalyst会将规则分组,在达到稳定点之前会一直执行当前组的规则,fixed point的意思也就是在使用当前组的规则树不会再变化了。将规则运行到fixed point意味着每个规则可以简单的,但仍然最终对树有更大的全局影响。在上面的例子中,重复应用规则会使较大的树(例如(x + 0)+(3 + 3))达到一个稳定的状态。另一个例子,第一批可以分析表达式以将类型分配给所有属性,而第二批可能使用这些类型来执行常量折叠(合并)。每个批次后,开发人员还可以在新树上进行合理检查(例如,看看是否所有属性都是分配类型了),通常也通过递归匹配来编写。

最后,规则条件及其本身可以包含任意的Scala代码。这使得Catalyst比优化器的域特定语言更强大,同时保持简洁的简单规则。

在经验中,对不变树的功能转换使得整个优化器非常容易推理和调试。它们还可以在优化器中实现并行化,尽管目前还没有开发它。

四,在Sparksql中使用Catalyst

在四个层面,可以使用Catalyst通用树的转换框架,如下:

(1),分析一个逻辑计划,解析引用,也即unresolved logical plan转化为logical plan。

(2),逻辑计划优化。

(3),物理计划。

(4),代码生成。将query转化为java字节码。

在物理计划层,Catalyst也许会产生多个物理计划,然后根据cost进行选择。其它,层都是单纯的基于规则的优化。每个层使用不同的树节点类型。Catalyst 拥有的节点库包括表达式(expressions),数据类型(data types),逻辑和物理操作(logical and physical operators)。下面开始详细介绍每个层次。

1,语法解析-Analysis

SparkSql开始relation计算,既不是从一个SQL parser生成的抽象语法树,也不是从DataFrame对象。两种情况下,relation都有可能存在未被解析的属性引用或者relations:例如,在SQL查询SELECT col FROM sales,col的类型,甚至是否是有效的列名称,直到我们查找sales表前都是不知道的。如果我们不知道它的类型或者没有将它与输入表(或者别名)匹配,那么这个属性称为未解析。Spark SQL使用Catalyst规则和Catalog对象来跟踪所有数据源中的表以解析这些属性。它首先构建一个具有未绑定属性和数据类型的树(unresolved logical plan),然后应用执行以下操作的规则:

1),通过name从catalog中查找relations。

2),将命名的属性(如“col”)映射到给定操作符的子节点的输入中。(Mapping named attributes, such as col, to the input provided given operator’s children.)

3),确定哪些属性引用相同的值,以便给它们一个唯一的ID(稍后允许对表达式进行优化(如 col = col)

4),在expressions中传播和强制类型:例如,我们不能知道1 + col的返回类型,直到我们解析col并且可能将其子表达式转换为兼容类型。(Propagating and coercing types through expressions: for example, we cannot know the return type of 1 + col until we have resolved col and possibly casted its subexpressions to a compatible types.)

Analyzer的规则仅仅共1000行代码。

2,逻辑优化-Logical Optimizations

逻辑优化层为逻辑执行计划提供了标准的基于规则的优化。(基于cost的优化是产生多个逻辑计划,然后计算他们的cost。)这些基于规则的优化包括常量合并,谓词下推,列裁剪,null propagation,boolean表达式简化,和其它的规则。一般来说,我们发现为各种情况添加规则非常简单。比如,我们想为SparkSql增加一个固定精度的DECIMAL类型,我们想优化聚合规则,比如sum 和average均值。它只需要12行代码来编写一个在SUM和AVG表达式中找到这样的小数的规则,并将它们转换为未缩放的64位长整型,然后将聚合后的结果类型转换回来。求和的表达式实现如下:

object DecimalAggregates extends Rule[LogicalPlan] {
 /** Maximum number of decimal digits in a Long */
 val MAX_LONG_DIGITS = 18
 def apply(plan: LogicalPlan): LogicalPlan = {
    plan transformAllExpressions {
 case Sum(e @ DecimalType.Expression(prec, scale))
 if prec + 10 <= MAX_LONG_DIGITS =>
        MakeDecimal(Sum(UnscaledValue(e)), prec + 10, scale) }
  }

另一个简单点的例子,一个仅仅12行代码优化LIKE表达式的规则,使用简单的正则表达式,如String.startWith或者String.contains。在规则中使用任意Scala代码的自由使得这些优化,超越了模式匹配子树的结构,容易表达。

Logical优化总共使用了800行代码。

3,物理计划-Physical Planning

在物理计划层,SparkSql会获取一个逻辑计划,用物理操作算子产生一个或者多个物理计划。然后用cost模型选择一个物理计划。目前基于cost-based的优化仅仅用于选择join算法:对已知的很小的relations,sparksql会选择使用spark的提供的点对点的广播功能实现Broadcast join。该框架支持更广泛地使用cost-based的优化,然而,由于可以使用规则为整个树递归地估计成本。因此,我们打算在未来实现更加丰富的cost-based优化。

物理计划还可以执行基于规则的物理优化,比如将列裁剪和过滤操在一个Spark的Map算子中以pipeline方式执行。此外,它可以将逻辑计划的操作下推到支持谓词或projection 下推的数据源。

物理计划层总共实现代码仅500行。

4,代码生成-Code Generation

查询优化的最后阶段是生成Java字节码以在每台机器上运行。因为Spark SQL通常操作的是内存数据集,意味着处理是CPU-bound型的,因此我们希望支持代码生成以加快执行速度。尽管如此,代码生成引擎通常很难构建,实际上与编译器相当。Catalyst依靠Scala语言,名为quasiquotes,的特殊功能,使代码生成更简单。Quasiquotes允许用Scala语言编程构建抽象语法树(AST),然后可以在运行时将其提供给Scala编译器以生成字节码。我们使用Catalyst将表示SQL中的表达式的树转换为Scala代码的AST,以评估该表达式,然后编译并运行生成的代码。

一个简单的例子,结合Add,Attribute,Literal树节点,如上面提到的,就像(x+y)+1的表达式。没有代码生成,这些表达式必须通过走一个Add,Attribute和Literal节点的树来解释每行数据。这引入了大量的分支和虚拟函数调用,从而减慢了执行速度。使用代码生成,我们可以编写一个函数来将特定表达式树转换为Scala AST,如下所示:

def compile(node: Node): AST = node match {
 case Literal(value) => q"$value"
 case Attribute(name) => q"row.get($name)"
 case Add(left, right) => q"${compile(left)} + ${compile(right)}"
}

以q开头的字符串是quasiquote,这意味着尽管它们看起来像字符串,但它们在编译时由Scala编译器解析,代表了代码的AST。Quasiquotes可以将变量或其他AST引用到它们中,使用$符号开头。例如,Literal(1)将成为1的Scala AST,而Attribute(“x”)变为row.get(“x”)。最后,像Add(Literal(1),Attribute(“x”))这样的树成为像1 + row.get(“x”)这样的Scala表达式的AST。

Quasiquotes在编译时进行类型检查,以确保仅替换适当的AST或literals ,使其比字符串连接更可用,并且它们直接生成Scala AST,而不是在运行时运行Scala解析器。此外,它们是高度可组合的,因为每个节点的代码生成规则不需要知道如何构建其子节点返回的树。最后,Scala编译器进一步优化了最终的代码,以防止Catalyst错过了表达式优化。下图显示,quasiquotes让我们生成与手动调优程序相似的代码。

我们发现使用quasiquotes进行代码生成是非常简单直接的,我们观察到,即使SparkSql的新贡献者也可以快速添加新类型的表达式的规则。Quasiquotes也适用于我们在原生Java对象上运行的目标:当访问这些对象的字段时,我们可以对所需字段进行代码生成直接访问,而不必将对象复制到Spark SQL Row中,并使用Row 存取方法。最后,将代码生成评估与对我们还没有生成代码的表达式的解释性评估结合起来是很明智的,因为我们编译的Scala代码可以直接调用到我们的表达式解释器中。

Catalyst的代码生成器总共700行代码。

四,总结

Catalyst新颖的,简单的设计使Spark社区能够快速实现和扩展引擎。

为了让大家更彻底的了解spark的Catalyst,后面会陆续出文章结合源码及结构图进行讲解,也会讲解涉及到的规则,模式匹配,scala的语法树,Quasiquotes深入和使用。

Quasiquotes可以帮助我们实现很多功能,比如规则引擎。后面也会举例讲解,如何在我们的应用中使用。

本文由浪尖部分翻译:<Deep Dive into Spark SQL’s Catalyst Optimizer >by Michael Armbrust, Yin Huai, Cheng Liang, Reynold Xin and Matei Zaharia 。

本文分享自微信公众号 - Spark学习技巧(bigdatatip),作者:浪尖

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2017-08-11

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 一文了解函数式查询优化器Spark SQL Catalyst

    记录一下个人对sparkSql的catalyst这个函数式的可扩展的查询优化器的理解,目录如下:

    大数据真好玩
  • 简单回答:SparkSQL数据抽象和SparkSQL底层执行过程

    就易用性而言,对比传统的MapReduce API,Spark的RDD API有了数量级的飞跃并不为过。然而,对于没有MapReduce和函数式编程经验的新手来...

    大数据真好玩
  • 在所有Spark模块中,我愿称SparkSQL为最强!

    我们之前已经学习过了《我们在学习Spark的时候,到底在学习什么?》,这其中有一个关于SQL的重要模块:SparkSQL。

    王知无-import_bigdata
  • SparkSql的Catalyst之图解简易版

    一,基本介绍 一言不合就上图。 ? 由上图可以看出Catalyst的作用尤为重要。MLPipelines Structured Streaming,GraphF...

    Spark学习技巧
  • [Spark SQL] 源码解析之Parser

    Parser就是将SQL字符串切分成一个个Token,再根据一定语义规则解析为一棵语法树。我们写的sql语句只是一个字符串而已,首先需要将其通过词法解析和语法解...

    UFO
  • SparkSQL的解析详解

      SparkSQL继承自Hive的接口,由于hive是基于MapReduce进行计算的,在计算过程中大量的中间数据要落地于磁盘,从而消耗了大量的I/O,降低了...

    用户3003813
  • [Spark SQL] 主要执行流程

    SparkSql的第一件事就是把SQLText解析成语法树,这棵树包含了很多节点对象,节点可以有特定的数据类型,同时可以有0个或者多个子节点,节点在SparkS...

    UFO
  • Flink SQL vs Spark SQL

    Spark已经在大数据分析领域确立了事实得霸主地位,而Flink则得到了阿里系的亲赖前途一片光明。我们今天会SparkSQL和FlinkSQL的执行流程进行一个...

    麒思妙想
  • 《从0到1学习Spark》-- 初识Spark SQL

    今天小强给大家介绍Spark SQL,小强的平时的开发中会经常使用Spark SQL进行数据分析查询操作,Spark SQL是整个Spark生态系统中最常用的组...

    程序员小强
  • 基于Spark的大规模推荐系统特征工程

    导读:特征工程在推荐系统中有着举足轻重的作用,大规模特征工程处理的效率极大的影响了推荐系统线上的性能。第四范式作为国际领先的机器学习和人工智能技术与平台服务提供...

    Spark学习技巧
  • 基于Spark的大规模推荐系统特征工程

    导读:特征工程在推荐系统中有着举足轻重的作用,大规模特征工程处理的效率极大的影响了推荐系统线上的性能。第四范式作为国际领先的机器学习和人工智能技术与平台服务提供...

    石晓文
  • Spark DataFrame简介(一)

    本片将介绍Spark RDD的限制以及DataFrame(DF)如何克服这些限制,从如何创建DataFrame,到DF的各种特性,以及如何优化执行计划。最后还会...

    用户1217611
  • SparkSQL内核解析-执行全过程概述

    用来表示一行数据的类,根据下标来访问和操作元素,其中每一列都是Catalyst内部定义的数据类型;物理算子树产生和转换的RDD类型为RDD[InternalRo...

    王知无-import_bigdata
  • Spark配置参数调优

           在项目中,由于数据量为几百万甚至千万级别,如果一个executor装载的对象过多,会导致GC很慢。项目中,我们使一个worker节点执行app时启...

    用户3003813
  • 总要到最后关头才肯重构代码,强如spark也不例外

    用过Python做过机器学习的同学对Python当中pandas当中的DataFrame应该不陌生,如果没做过也没有关系,我们简单来介绍一下。DataFrame...

    TechFlow-承志
  • Spark1.6 DataSets简介

        Apache Spark提供了强大的API,以便使开发者为使用复杂的分析成为了可能。通过引入SparkSQL,让开发者可以使用这些高级API接口来从事结...

    用户3003813
  • SparkSQL(源码阅读三)

      额,没忍住,想完全了解sparksql,毕竟一直在用嘛,想一次性搞清楚它,所以今天再多看点好了~

    用户3003813
  • 《从0到1学习Spark》--DataFrame和Dataset探秘

    昨天小强带着大家了解了Spark SQL的由来、Spark SQL的架构和SparkSQL四大组件:Spark SQL、DataSource Api、DataF...

    程序员小强
  • 工作中遇到的Spark错误(持续更新)

    1.java.io.IOException: No spa ce left on device 原因及解决办法:磁盘空间不足

    shengjk1

扫码关注云+社区

领取腾讯云代金券