
全网第一个 flink sql 实战,本文主要介绍 flink sql 与 calcite 之间的关系。flink sql 的解析主要依赖 calcite。
而博主通过此文抛砖引玉帮助大家理解 flink sql 在解析中是怎样依赖 calcite 的,以及 flink sql 解析的流程,sql parser 相关内容。希望对大家有所帮助。
本文通过以下几节进行介绍,对某个章节感兴趣的可以直接划到对应章节。
本节先给大家大致描述一条 flink sql 的执行过程,不了解详细内容不要紧,主要先了解整个流程,有了全局视角之后,后续会详述细节。
在介绍一条 flink sql 的执行过程之前,先来看看 flink datastream 任务的执行过程,这对理解一条 flink sql 的执行过程有很大的帮助。
我们逆向思维进行考虑,如果想让一条 flink sql 按照我们的预期在 jvm 中执行,需要哪些过程。
如下图所示,描绘了上述逻辑:

12
那么这个和 flink 实际实现有啥异同呢?
flink 大致是这样做的,虽在 flink 本身的中间还有一些其他的流程,后来的版本也不是基于 datastream,但是整体的处理逻辑还是和上述一致的。
所以不了解整体流程的同学可以先按照上述流程进行理解。
按照 博主的脑洞 来总结一条 sql 的使命就是:sql -> AST -> codegen(java code) -> 让我们 run 起来好吗

26
上面手绘可能看不清,下面这张图更清楚。

28
标准的一条 flink sql 运行起来的流程如下:
Notes:刚开始对其中的 SqlNode,RelNode 概念可能比较模糊。先理解整个流程,后续会详细介绍这些概念。
可以发现 flink 的实现 比 博主的脑洞 整体主要框架上面是一致的。多出来的部分主要是 SqlNode 验证阶段,优化阶段。
大致了解了 一条 flink sql 的运行流程 之后,我们来看看 calcite 这玩意到底在 flink 里干了些啥。
根据上文总结来说 calcite 在 flink sql 中担当了 sql 解析、验证、优化功能。

30
看着 calcite 干了这么多事,那 calcite 是个啥东东,它的定位是啥?
calcite 是一个动态数据的管理框架,它可以用来构建数据库系统的不同的解析的模块,但是它不包含数据存储数据处理等功能。
calcite 的目标是一种方案,适应所有的需求场景,希望能为不同计算平台和数据源提供统一的 sql 解析引擎,但是它只是提供查询引擎,而没有真正的去存储这些数据。

61
下图是目前使用了 calcite 能力的其他组件,也可见官网 https://calcite.apache.org/docs/powered_by.html 。

4
简单来说的话,可以先理解为 calcite 具有这几个功能(当然还有其他很牛逼的功能,感兴趣可以自查官网)。
SqlSelect、SqlNode),就可以根据这些对象做具体逻辑处理了。举个例子,如下图,一条简单的 select c,d from source where a = '6' sql,经过 calcite 的解析之后,就可以得到 AST model(SqlNode)。可以看到有 SqlSelect、SqlIdentifier、SqlIdentifier、SqlCharStringLiteral。上面的这些能力整体组成如下图所示:

29
实际使用 calcite 解析一条 sql,跑起来看看。

2
重中之重,在了解原理之前,先跑起来是王道,也会帮助我们逐步理解。
官网已经有一个 csv 的案例了。感兴趣的可以直达 https://calcite.apache.org/docs/tutorial.html 。
跑完一个 csv demo,在详细了解 calcite 之前还需要了解下 sql,calcite 的支柱:关系代数。
sql 是基于关系代数的查询语言,是关系代数在工程上的一种很好的实现方案。在工程中,关系代数难表达,但是 sql 就易于理解。关系代数和 sql 的关系如下。
总结下,有哪些常用的关系代数:

50
关系代数等价变换是 calcite optimizer 的基础理论。
下面是一些等价变换的例子。
1.连接(⋈),笛卡尔积(×)的交换律

51
2.连接(⋈),笛卡尔积(×)的结合律

3.投影(Π)的串接定律

4.选择(σ)的串接定律

5.选择(σ)与投影(Π)的交换

6.选择(σ)与笛卡尔积(×)的交换

7.选择(σ)与并(∪)的交换

8.选择(σ)与差(-)的交换

9.投影(Π)与笛卡尔积(×)的交换

10.投影(Π)与并(∪)的交换

然后看一个基于关系代数优化的实际 sql 案例:
有三个关系 A(a1,a2,a3,…)、B(b1,b2,b3, … )、C(a1,b1,c1,c2, … )
有一个查询请求如下:
SELECT A.a1 FROM A,B,C WHERE A.a1 = C.a1 AND B.b1 = C.b1 AND f(c1)
1.首先将 sql 转为关系代数的语法树。

36
2.优化:选择(σ)的串接定律。

47

37
3.优化:选择(σ)与笛卡尔积(×)的交换。

48

38
4.优化:投影(π)与笛卡尔积(×)的交换。

49







关于关系代数我们就有了大致的了解。
除此之外,对于更深入了解 flink sql,calcite 而言,我们还需要了解一下在 calcite 代码体系中有哪些重要 model。
calcite 中有两个最最基础、重要的 model 在我们理解 flink sql 解析流程时需要知道的。
举个例子来说明下,下面这条 flink sql,经过解析之后的 SqlNode,RelNode 如下图:
SELECT
sum(part_pv) as pv,
window_start
FROM (
SELECT
count(1) as part_pv,
cast(tumble_start(rowtime, INTERVAL '60' SECOND) as bigint) * 1000 as window_start
FROM
source_db.source_table
GROUP BY
tumble(rowtime, INTERVAL '60' SECOND)
, mod(id, 1024)
)
GROUP BY
window_start

62
可以看到 SqlNode 包含的内容是 sql 的层次结构,包括 selectList,from,where,group by 等。
RelNode 包含的是关系代数的层次结构,每一层都有一个 input 来承接。结合上面优化案例的树状结构一样。

63

29
如上图所示,此处我们结合上节介绍的 calcite 的 model,以及 flink sql 的实现来走一遍其处理流程:
SELECT
sum(part_pv) as pv,
window_start
FROM (
SELECT
count(1) as part_pv,
cast(tumble_start(rowtime, INTERVAL '60' SECOND) as bigint) * 1000 as window_start
FROM
source_db.source_table
GROUP BY
tumble(rowtime, INTERVAL '60' SECOND)
, mod(id, 1024)
)
GROUP BY
window_start
其中前三步解析和转化,都在 在执行 TableEnvironment#sqlQuery 进行。
最后一步优化,在执行 sink 操作时进行,即在这个例子中是 tEnv.toRetractStream(result, Row.class)。
源码公众号后台回复flink sql 知其所以然(六)| flink sql 约会 calcite获取。
sql 解析阶段使用 Sql Parser 将 sql 解析为 SqlNode。这一步在执行 TableEnvironment#sqlQuery 进行。




可以从上图看到 flink sql 具体实现类是 FlinkSqlParserImpl。

68
具体 parse 得到 SqlNode 如上图。
上面的第一步生产的 SqlNode 对象是一个未经验证的,这一步就是语法检查阶段,语法检查前需要知道元数据信息,这个检查会包括表名、字段名、函数名、数据类型的检查。进行语法检查的实现如下:



可以从上图看到 flink sql 校验器的具体实现类是 FlinkCalciteSqlValidator,其中包含了元数据信息,从而可以进行元数据信息检查。
这一步就是将 SqlNode 转换成 RelNode,也就是生成相应的关系代数层面的逻辑(这里一般都叫做逻辑计划:Logical Plan)。



这一步就是优化阶段。详细内容可以自己 debug 代码查看,此处不赘述。


此处以 calcite parser 举例说明,其模块为什么这通用?其他的模块都是类似的方式。
先说结论:因为 calcite parser 模块提供了接口,具体的 parse 逻辑、规则是可以根据用户自定义进行配置的。大家可以看下图,博主画出了一张图进行详述。

5
如上图,引擎 sql 解析器的生成是有一个输入的,就是 用户自定义语法分析规则变量,具体引擎的 sql 解析器其实也是根据用户自定义的 解析规则 去生成的 解析器。其 解析器 的动态生成依赖 javacc 这样的组件。calcite 提供的是统一的 sql AST 模型、优化模型接口等,而具体的解析实现交给了用户自己去决定。
javacc 会根据 calcite 中定义的 Parser.jj 文件,生成具体的 sql parser 代码(如上图),这个 sql parser 的能力就是将 sql 转换成 AST (SqlNode)。关于 calcite 能力的更详细内容见 https://matt33.com/2019/03/07/apache-calcite-process-flow/ 。
上图涉及到的文件大家可以下载 calcite 源码 https://github.com/apache/calcite.git 之后,切换到 core module 之后查看。

31
javacc 是一个用 java 开发的最受欢迎的语法分析生成器。这个分析生成器工具可以读取上下文无关且有着特殊意义的语法并把它转换成可以识别且匹配该语法的 java 程序。它是 100% 的纯 java 代码,可以在多种平台上运行。
简单解释 javacc 就是它是一个通用的语法分析生产器,用户可以使用 javacc 任意定义一套 DSL 及解析器。
举个例子,如果哪天你觉得 sql 也不够简洁通用,你可以使用 javacc 自己定义一套更简洁的 user-define-ql。然后使用 javacc 作为你的 user-define-ql 的解析器。是不是很流批,可以自己去搞编译器了。
这里不介绍具体的 javacc 语法,直接以官网的 Simple1.jj 为案例。详细语法和功能可以参考官网(https://javacc.github.io/javacc/) 或者一下博客。
Simple1.jj 是用于识别一系列的 {相同数量的花括号},之后跟着 0 个或多个行终结符。

7
下面是合法的字符串例子:
{},{{{{{}}}}},etc.
下面是不合法的字符串例子:
{{{{,{}{},{}},{{}{}},etc.
接下来让我们实际将 Simple1.jj 编译生成具体的规则代码。
在 pom 中加入 javacc build 插件:
<plugin>
<!-- This must be run AFTER the fmpp-maven-plugin -->
<groupId>org.codehaus.mojo</groupId>
<artifactId>javacc-maven-plugin</artifactId>
<version>2.4</version>
<executions>
<execution>
<phase>generate-sources</phase>
<id>javacc</id>
<goals>
<goal>javacc</goal>
</goals>
<configuration>
<sourceDirectory>${project.build.directory}/generated-sources/</sourceDirectory>
<includes>
<include>**/Simple1.jj</include>
</includes>
<!-- This must be kept synced with Apache Calcite. -->
<lookAhead>1</lookAhead>
<isStatic>false</isStatic>
<outputDirectory>${project.build.directory}/generated-sources/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
在 compile 之后,就会在 generated-sources 下生成代码:

8
然后把代码 copy 到 Sources 路径下:

33
执行下代码,可以看到 {},{{}} 都可以校验通过,一旦出现不符合规则的 {{ 输入,就会抛出异常。



关于 javacc 基本上就了解个大概了。
感兴趣的可以尝试自定义一个编译器。

5
fmpp 就是一个基于 freemarker 的模板生产器。用户可以统一管理自己的变量,然后用 ftl 模板 + 变量 生成对应的最终文件。在 calcite 中使用 fmpp 作为变量 + 模板的统一管理器。然后基于 fmpp 来生成对应的 Parser.jj 文件。
博主画了一张图,包含了其中重要组件之间的依赖关系。

3
你没猜错,还是上面那些流程,fmpp(Parser.jj 模板生成) -> javacc(Parser 生成) -> calcite。
在介绍 Parser 生成流程之前,先看看 flink 最终生成的 Parser:FlinkSqlParserImpl (此处使用 Blink Planner)。
以下面这个案例出发(代码基于 flink 1.13.1 版本):
public class ParserTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(10);
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
DataStream<Tuple3<String, Long, Long>> tuple3DataStream =
env.fromCollection(Arrays.asList(
Tuple3.of("2", 1L, 1627254000000L),
Tuple3.of("2", 1L, 1627218000000L + 5000L),
Tuple3.of("2", 101L, 1627218000000L + 6000L),
Tuple3.of("2", 201L, 1627218000000L + 7000L),
Tuple3.of("2", 301L, 1627218000000L + 7000L),
Tuple3.of("2", 301L, 1627218000000L + 7000L),
Tuple3.of("2", 301L, 1627218000000L + 7000L),
Tuple3.of("2", 301L, 1627218000000L + 7000L),
Tuple3.of("2", 301L, 1627218000000L + 7000L),
Tuple3.of("2", 301L, 1627218000000L + 86400000 + 7000L)))
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, Long, Long>>(Time.seconds(0L)) {
@Override
public long extractTimestamp(Tuple3<String, Long, Long> element) {
return element.f2;
}
});
tEnv.registerFunction("mod", new Mod_UDF());
tEnv.registerFunction("status_mapper", new StatusMapper_UDF());
tEnv.createTemporaryView("source_db.source_table", tuple3DataStream,
"status, id, timestamp, rowtime.rowtime");
String sql = "SELECT\n"
+ " count(1),\n"
+ " cast(tumble_start(rowtime, INTERVAL '1' DAY) as string)\n"
+ "FROM\n"
+ " source_db.source_table\n"
+ "GROUP BY\n"
+ " tumble(rowtime, INTERVAL '1' DAY)";
Table result = tEnv.sqlQuery(sql);
tEnv.toAppendStream(result, Row.class).print();
env.execute();
}
}
debug 过程如之前分析 sql -> SqlNode 过程所示,如下图直接定位到 SqlParser:

21
如上图可以看到具体的 Parser 就是 FlinkSqlParserImpl。
定位到具体的代码如下图所示(flink-table-palnner-blink-2.11-1.13.1.jar)。

34
最终 parse 的结果 SqlNode 如下图。

22


再来看看 FlinkSqlParserImpl 是怎么使用 calcite 生成的。
具体到 flink 中的实现,位于源码中的 flink-table.flink-sql-parser 模块(源码基于 flink 1.13.1)。
flink 是依赖 maven 插件实现的上面的整体流程。

14
接下来看看整个 Parser 生成流程。
使用 maven-dependency-plugin 将 calcite 解压到 flink 项目 build 目录下。
<plugin>
<!-- Extract parser grammar template from calcite-core.jar and put
it under ${project.build.directory} where all freemarker templates are. -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>unpack-parser-template</id>
<phase>initialize</phase>
<goals>
<goal>unpack</goal>
</goals>
<configuration>
<artifactItems>
<artifactItem>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<type>jar</type>
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}/</outputDirectory>
<includes>**/Parser.jj</includes>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>

15
使用 maven-resources-plugin 将 Parser.jj 代码生成。
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<executions>
<execution>
<id>copy-fmpp-resources</id>
<phase>initialize</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/codegen</outputDirectory>
<resources>
<resource>
<directory>src/main/codegen</directory>
<filtering>false</filtering>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>com.googlecode.fmpp-maven-plugin</groupId>
<artifactId>fmpp-maven-plugin</artifactId>
<version>1.0</version>
<dependencies>
<dependency>
<groupId>org.freemarker</groupId>
<artifactId>freemarker</artifactId>
<version>2.3.28</version>
</dependency>
</dependencies>
<executions>
<execution>
<id>generate-fmpp-sources</id>
<phase>generate-sources</phase>
<goals>
<goal>generate</goal>
</goals>
<configuration>
<cfgFile>${project.build.directory}/codegen/config.fmpp</cfgFile>
<outputDirectory>target/generated-sources</outputDirectory>
<templateDirectory>${project.build.directory}/codegen/templates</templateDirectory>
</configuration>
</execution>
</executions>
</plugin>

16
使用 javacc 将根据 Parser.jj 文件生成 Parser。
<plugin>
<!-- This must be run AFTER the fmpp-maven-plugin -->
<groupId>org.codehaus.mojo</groupId>
<artifactId>javacc-maven-plugin</artifactId>
<version>2.4</version>
<executions>
<execution>
<phase>generate-sources</phase>
<id>javacc</id>
<goals>
<goal>javacc</goal>
</goals>
<configuration>
<sourceDirectory>${project.build.directory}/generated-sources/</sourceDirectory>
<includes>
<include>**/Parser.jj</include>
</includes>
<!-- This must be kept synced with Apache Calcite. -->
<lookAhead>1</lookAhead>
<isStatic>false</isStatic>
<outputDirectory>${project.build.directory}/generated-sources/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>

17
最终生成的 Parser 就是 FlinkSqlParserImpl。

18
blink planner(flink-table-planner-blink) 在打包时将 flink-sql-parser、flink-sql-parser-hive 打包进去。

35
本文主要介绍了 flink sql 与 calcite 之间的依赖关系,以及 flink sql parser 的生成过程。
https://www.slideshare.net/JordanHalterman/introduction-to-apache-calcite
https://arxiv.org/pdf/1802.10233.pdf
https://changbo.tech/blog/7dec2e4.html
http://www.liaojiayi.com/calcite/
https://www.zhihu.com/column/c_1110245426124554240
https://blog.csdn.net/QuinnNorris/article/details/70739094
https://www.pianshen.com/article/72171186489/
https://matt33.com/2019/03/07/apache-calcite-process-flow/
https://www.jianshu.com/p/edf503a2a1e7
https://blog.csdn.net/u013007900/article/details/78978271
https://blog.csdn.net/u013007900/article/details/78993101
http://www.ptbird.cn/optimization-of-relational-algebraic-expression.html
https://book.51cto.com/art/201306/400084.htm
https://book.51cto.com/art/201306/400085.htm
https://miaowenting.site/2019/11/10/Flink-SQL-with-Calcite/