前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Flink SQL】Apache Calcite 架构剖析

【Flink SQL】Apache Calcite 架构剖析

作者头像
王知无-import_bigdata
发布2023-04-07 18:48:43
7530
发布2023-04-07 18:48:43
举报

1. 简介和架构

Apache Calcite 是一个动态的数据管理框架, 可以实现 SQL 的解析、验证、优化和执行。Calcite 是模块化和插件式的, 解析、验证、优化和执行的步骤都对应着一个相对独立的模块。用户可以选择使用其中的一个或多个模块,也可以对任意模型进行定制化扩展。

Calcite 的架构如下图所示:

  1. JDBC:构建了一个独立的 Avatica 框架,可以通过标准的JDBC 接口访问 Calcite 获取数据。
  2. SQL Parser 和 SQL Validator:可以进行 SQL 的解析和验证,,并将原始的 SQL 字符串解析并转化为内部的 SqlNode 树来表示。
  3. Query Optimizer:进行查询优化,,基于在关系代数在 Calcite 内部有一种关系代数表示方法,将关系代数表示为 RelNode 树。RelNode 树不只是由 SqlNode 树转化而来,也可以通过Calcite 提供的 Expressions Builder 接口构建。

说明:Calcite 包含许多组成典型数据库管理系统的部件,但是省略了一些关键的组成部分,例如数据的存储、处理数据的算法和存储元数据的存储库等。因为对不同的数据类型有不同的存储和计算引擎,是不可能将它们统一到一个框架的,所以 Calcite 是一个统一的 SQL 接口实现数据访问框架。

2. SQL 处理流程

如下图所示,Calcite 的处理流程实际上就是 SQL 的解析、校验、优化和执行。

  1. Parser:解析 SQL,将输入的 SQL 字符串转化为抽象语法树(AST),即 SqlNode 树表示
  2. Validator:根据元数据信息对 SqlNode 树进行验证, 其输出仍是 SqlNode
  3. Converter:将 SqlNode 树转化为关系代数,其中 RelNode 树表示关系代数
  4. Optimizer:对输入的关系代数进行优化,并输出优化后的 RelNode
  5. Execute:根据优化后的 RelNode 生成执行计划

demo:利用 Calcite 实现使用 SQL 访问 CSV 文件

3. 案例分析

users 表的内容:

代码语言:javascript
复制
id:string,name:string,age:int
1,Jack,28
2,John,21
3,Tom,32
4,Peter,24

orders 表内容:

代码语言:javascript
复制
id:string,user_id:string,goods:string,price:double
001,1,Cola,3.5
002,1,Hat,38.9
003,2,Shoes,199.9
004,3,Book,39.9
005,4,Phone,2499.9

查询的 SQL 语句:

代码语言:javascript
复制
SELECT u.id, name, age, sum(price)
FROM users AS u join orders AS o ON u.id = o.user_id
WHERE age >= 20 AND age <= 30
GROUP BY u.id, name, age
ORDER BY u.id
3.1 SQL 解析

通过词法分析和语法分析将 SQL 字符串转化为 AST。在Calcite中, 借助 JavaCC 实现了 SQL 的解析, 并转化为 SqlNode 表示。

SqlNode 是 AST 的抽象基类,不同类型的节点有对应的实现类。下面的 SQL 语句会生成 SqlSelectSqlOrderBy 两个主要的节点。

代码语言:javascript
复制
String sql = "SELECT u.id, name, age, sum(price) " +
    "FROM users AS u join orders AS o ON u.id = o.user_id " +
    "WHERE age >= 20 AND age <= 30 " +
    "GROUP BY u.id, name, age " +
    "ORDER BY u.id";
// 创建SqlParser, 用于解析SQL字符串
SqlParser parser = SqlParser.create(sql, SqlParser.Config.DEFAULT);
// 解析SQL字符串, 生成SqlNode树
SqlNode rootSqlNode = parser.parseStmt();

上述代码中的 rootSqlNode 是 AST 的根节点。如下图所示,可以看到 rootSqlNodeSqlOrderBy类型,其中 query 字段是一个 SqlSelect 类型,即代表原始的 SQL 语句去掉ORDER BY 部分。

3.2 SQL 校验

SQL 校验阶段一方面会借助元数据信息执行上述验证,另一方面会对 SqlNode 树进行一些改写,以转化为统一的格式。

代码语言:javascript
复制
// 创建Schema, 一个Schema中包含多个表
SimpleTable userTable = SimpleTable.newBuilder("users")
    .addField("id", SqlTypeName.VARCHAR)
    .addField("name", SqlTypeName.VARCHAR)
    .addField("age", SqlTypeName.INTEGER)
    .withFilePath("/path/to/user.csv")
    .withRowCount(10)
    .build();
SimpleTable orderTable = SimpleTable.newBuilder("orders")
    .addField("id", SqlTypeName.VARCHAR)
    .addField("user_id", SqlTypeName.VARCHAR)
    .addField("goods", SqlTypeName.VARCHAR)
    .addField("price", SqlTypeName.DECIMAL)
    .withFilePath("/path/to/order.csv")
    .withRowCount(10)
    .build();
SimpleSchema schema = SimpleSchema.newBuilder("s")
    .addTable(userTable)
    .addTable(orderTable)
    .build();    
CalciteSchema rootSchema = CalciteSchema.createRootSchema(false, false);
rootSchema.add(schema.getSchemaName(), schema);

RelDataTypeFactory typeFactory = new JavaTypeFactoryImpl();

// 创建CatalogReader, 用于指示如何读取Schema信息
Prepare.CatalogReader catalogReader = new CalciteCatalogReader(
    rootSchema,
    Collections.singletonList(schema.getSchemaName()),
    typeFactory,
    config);
// 创建SqlValidator, 用于执行SQL验证
SqlValidator.Config validatorConfig = SqlValidator.Config.DEFAULT
    .withLenientOperatorLookup(config.lenientOperatorLookup())
    .withSqlConformance(config.conformance())
    .withDefaultNullCollation(config.defaultNullCollation())
    .withIdentifierExpansion(true);
SqlValidator validator = SqlValidatorUtil.newValidator(
    SqlStdOperatorTable.instance(), catalogReader, typeFactory, validatorConfig);
// 执行SQL验证
SqlNode validateSqlNode = validator.validate(node);

如下图可知,SQL 验证后的输出结果仍是 SqlNode 树。不过其内部结构发生了改变,一个明显的变化是验证后的 SqlOrderBy 节点被改写为了 SqlSelect 节点,并在其 orderBy 变量中记录了排序字段。

如果把表名或者字段写错,validator.validate(node) 运行时在就会报错。如果把验证前后的SqlNode完全打印出来,可以发现在校验时会为每个字段加上表名限定。

代码语言:javascript
复制
-- 验证前的SqlNode树打印结果
SELECT `u`.`id`, `name`, `age`, SUM(`price`)
FROM `users` AS `u`
INNER JOIN `orders` AS `o` ON `u`.`id` = `o`.`user_id`
WHERE `age` >= 20 AND `age` <= 30
GROUP BY `u`.`id`, `name`, `age`
ORDER BY `u`.`id`

-- 验证后的SqlNode树打印结果
SELECT `u`.`id`, `u`.`name`, `u`.`age`, SUM(`o`.`price`)
FROM `s`.`users` AS `u`
INNER JOIN `s`.`orders` AS `o` ON `u`.`id` = `o`.`user_id`
WHERE `u`.`age` >= 20 AND `u`.`age` <= 30
GROUP BY `u`.`id`, `u`.`name`, `u`.`age`
ORDER BY `u`.`id`
3.3 转换为关系代数 RelNode

关系代数是 SQL 的理论基础,可以阅读 Introduction of Relational Algebra in DBMS简单了解,其中“数据库系统概念“中对关系代数有更深入的介绍。

在 Calcite 中, 关系代数由 RelNode 表示。如下代码所示,将校验后的 SqlNode 树转化为RelNode树。

代码语言:javascript
复制
// 创建VolcanoPlanner, VolcanoPlanner在后面的优化中还需要用到
VolcanoPlanner planner = new VolcanoPlanner(RelOptCostImpl.FACTORY, Contexts.of(config));
planner.addRelTraitDef(ConventionTraitDef.INSTANCE);
// 创建SqlToRelConverter
RelOptCluster cluster = RelOptCluster.create(planner, new RexBuilder(typeFactory));
SqlToRelConverter.Config converterConfig = SqlToRelConverter.config()
    .withTrimUnusedFields(true)
    .withExpand(false);
SqlToRelConverter converter = new SqlToRelConverter(
    null,
    validator,
    catalogReader,
    cluster,
    StandardConvertletTable.INSTANCE,
    converterConfig);
// 将SqlNode树转化为RelNode树
RelNode relNode = converter.convertQuery(validateSqlNode, false, true);

RelNode树实质上是一个逻辑执行计划,上述 SQL 对应的逻辑执行计划如下,其中每一行都表示一个节点,即 RelNode 的实现类。

代码语言:javascript
复制
LogicalSort(sort0=[$0], dir0=[ASC])
  LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[SUM($3)])
    LogicalProject(id=[$0], name=[$1], age=[$2], price=[$6])
      LogicalFilter(condition=[AND(>=($2, 20), <=($2, 30))])
        LogicalJoin(condition=[=($0, $4)], joinType=[inner])
          LogicalTableScan(table=[[s, users]])
          LogicalTableScan(table=[[s, orders]])
3.4 查询优化

查询优化是 Calcite 的核心模块,主要有三部分组成:

  1. Planner rules:优化规则,例如内置优化规则有谓词下推、投影下推等。用户也可定义自己的优化规则。
  2. Metadata providers:元数据,主要用于基于成本的优化(Cost-based Optimize 即 CBO),包括表的行数、表的大小、给定列的值是否唯一等信息。
  3. Planner engines:优化器实现,HepPlanner 用于实现基于规则的优化(Rule-based Optimize 即 RBO),VolcanoPlanner 用于实现基于成本的优化。
代码语言:javascript
复制
// 优化规则
RuleSet rules = RuleSets.ofList(
    CoreRules.FILTER_TO_CALC,
    CoreRules.PROJECT_TO_CALC,
    CoreRules.FILTER_CALC_MERGE,
    CoreRules.PROJECT_CALC_MERGE,
    CoreRules.FILTER_INTO_JOIN,     // 过滤谓词下推到Join之前
    EnumerableRules.ENUMERABLE_TABLE_SCAN_RULE,
    EnumerableRules.ENUMERABLE_PROJECT_TO_CALC_RULE,
    EnumerableRules.ENUMERABLE_FILTER_TO_CALC_RULE,
    EnumerableRules.ENUMERABLE_JOIN_RULE,
    EnumerableRules.ENUMERABLE_SORT_RULE,
    EnumerableRules.ENUMERABLE_CALC_RULE,
    EnumerableRules.ENUMERABLE_AGGREGATE_RULE);
Program program = Programs.of(RuleSets.ofList(rules));
RelNode optimizerRelTree = program.run(
    planner,
    relNode,
    relNode.getTraitSet().plus(EnumerableConvention.INSTANCE),
    Collections.emptyList(),
    Collections.emptyList());

经过优化后的输出如下,可知所有的节点都变成了 Enumerable 开头的物理节点,其基类是EnumerableRel

代码语言:javascript
复制
EnumerableSort(sort0=[$0], dir0=[ASC])
  EnumerableAggregate(group=[{0, 1, 2}], EXPR$3=[SUM($3)])
    EnumerableCalc(expr#0..6=[{inputs}], proj#0..2=[{exprs}], price=[$t6])
      EnumerableHashJoin(condition=[=($0, $4)], joinType=[inner])
        EnumerableCalc(expr#0..2=[{inputs}], expr#3=[Sarg[[20..30]]], expr#4=[SEARCH($t2, $t3)], proj#0..2=[{exprs}], $condition=[$t4])
          EnumerableTableScan(table=[[s, users]])
        EnumerableTableScan(table=[[s, orders]])

优化前后的计划:users 表的过滤位置发生了变动,从先 Join 后过滤,变成了先过滤后 Join,如下图所示。

3.5 执行计划

将物理计划转化为执行计划通常需要自定义代码。Calcite 提供了一种执行计划生成方法,如下所示,可以生成执行计划并读取CSV文件中的数据。

代码语言:javascript
复制
EnumerableRel enumerable = (EnumerableRel) optimizerRelTree;
Map<String, Object> internalParameters = new LinkedHashMap<>();
EnumerableRel.Prefer prefer = EnumerableRel.Prefer.ARRAY;
Bindable bindable = EnumerableInterpretable.toBindable(internalParameters,
                                                       null, enumerable, prefer);
Enumerable bind = bindable.bind(new SimpleDataContext(rootSchema.plus()));
Enumerator enumerator = bind.enumerator();
while (enumerator.moveNext()) {
    Object current = enumerator.current();
    Object[] values = (Object[]) current;
    StringBuilder sb = new StringBuilder();
    for (Object v : values) {
        sb.append(v).append(",");
    }
    sb.setLength(sb.length() - 1);
    System.out.println(sb);
}

执行结果:

代码语言:javascript
复制
1,Jack,28,42.40
2,John,21,199.90
4,Peter,24,2499.90
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-09-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据技术与架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 2. SQL 处理流程
  • 3. 案例分析
    • 3.1 SQL 解析
      • 3.2 SQL 校验
        • 3.3 转换为关系代数 RelNode
          • 3.4 查询优化
            • 3.5 执行计划
            相关产品与服务
            数据库智能管家 DBbrain
            数据库智能管家(TencentDB for DBbrain,DBbrain)是腾讯云推出的一款为用户提供数据库性能、安全、管理等功能的数据库自治云服务。DBbrain 利用机器学习、大数据手段、专家经验引擎快速复制资深数据库管理员的成熟经验,将大量传统人工的数据库运维工作智能化,服务于云上和云下企业,有效保障数据库服务的安全、稳定及高效运行。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档