前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Apache Hudi从零到一:深入研究读取流程和查询类型(二)

Apache Hudi从零到一:深入研究读取流程和查询类型(二)

作者头像
ApacheHudi
发布2024-01-10 14:03:19
3490
发布2024-01-10 14:03:19
举报
文章被收录于专栏:ApacheHudiApacheHudi

在上一篇文章中,我们讨论了 Hudi 表中的数据布局,并介绍了 CoW 和 MoR 两种表类型,以及它们各自的权衡。在此基础上我们现在将探讨 Hudi 中的读取操作是如何工作的。

有多种引擎(例如 Spark、Presto 和 Trino)与 Hudi 集成来执行分析查询。尽管集成 API 可能有所不同,但分布式查询引擎中的基本过程保持一致。此过程需要解释输入 SQL、创建在工作节点上执行的查询计划以及收集结果以返回给用户。

在这篇文章中,我选择 Spark 作为示例引擎来说明读取操作的流程,并提供代码片段来展示各种 Hudi 查询类型的用法。我将首先通过入门介绍 Spark 查询,然后深入研究 Hudi-Spark 集成点,最后解释不同的查询类型。

Spark 查询入门

Spark SQL是一个分布式SQL引擎,可以对大规模数据执行分析任务。典型的分析查询从用户提供的 SQL 开始,旨在从存储上的表中检索结果。Spark SQL 接受此输入并继续执行多个阶段,如下图所示。

在分析阶段,输入被解析、解析并转换为树结构,作为 SQL 语句的抽象。查询表目录以获取表名称和列类型等信息。

在逻辑优化步骤中,在逻辑层对树进行评估和优化。一些常见的优化包括谓词下推、模式裁剪和空传播。此步骤生成一个逻辑计划,概述查询所需的计算。由于它是逻辑表示,逻辑计划缺乏在实际节点上运行所需的细节。

物理规划充当逻辑层和物理层之间的桥梁。物理计划指定了执行计算的精确方式。例如,在逻辑计划中,可能有一个连接节点指示连接操作,而在物理计划中,连接操作可以指定为sort-merge连接或broadcast-hash连接,具体取决于相关表的大小估计。选择最佳物理计划用于代码生成和实际执行。

这三个阶段是 Catalyst Optimizer[1] 提供的功能。要进一步研究该主题可以探索此处[2]和此处[3]链接的精彩演讲。

在执行过程中,Spark 应用程序在称为 RDD(弹性分布式数据集)的基础数据结构上运行。RDD 是 JVM 对象的集合,这些对象是不可变的、跨节点分区的,并且由于跟踪数据沿袭信息而具有容错能力。当应用程序运行时,将执行计划的计算:RDD 被转换并执行操作以产生结果。这个过程通常也称为 RDD 的“物化”。

数据源API

当 Catalyst Optimizer 制定查询计划时,连接到数据源变得有利,可以将优化下推。Spark 的 DataSource API 旨在提供与各种数据源集成的可扩展性。有些源是开箱即用的,例如 JDBC、Hive 表和 Parquet 文件。Hudi 表由于特定的数据布局而代表了另一种类型的自定义数据源。

Spark-Hudi 读取流程

下图展示了Spark-Hudi读取流程中的一些关键接口和方法调用。

  1. 1. DefaultSource 作为集成的入口点,将数据源的格式定义为 org.apache.hudihudi。它提供了一个 BaseRelation 实现,我将其设想为建立一个“关系”来简化表中的数据访问。
  2. 2. buildScan() 是一个核心 API,用于将过滤器传递到数据源以进行优化。Hudi定义了collectFileSplits()来收集相关文件。
  3. 3. collectFileSplits() 将所有过滤器传递给 FileIndex 对象,该对象有助于识别要读取的必要文件。
  4. 4. FileIndex 定位所有相关的 FileSlice 以进行进一步处理。
  5. 5. 识别 FileSlices 后调用 composeRDD()。
  6. 6. FileSlice 作为 RDD 加载和读取。对于 Parquet 中的基本文件等列式文件,此读取操作通过仅读取必要的列来最大限度地减少传输的字节。
  7. 7. RDD 从 API 返回,用于进一步规划和代码生成。

请注意上述步骤仅提供读取流程的高级概述,省略了读取模式支持和高级索引技术(例如使用元数据表跳过数据)等细节。

该流程对于 Spark 的所有 Hudi 查询类型都是通用的。在以下部分将解释各种查询类型的工作原理。除读取优化外,所有这些都适用于 CoW 和 MoR 表。

快照查询

这是读取 Hudi 表时的默认查询类型。它的目的是从表中检索最新记录,本质上捕获查询时表的“快照”。在 MoR 表上执行时,会发生日志文件与基本文件的合并,并导致一些性能影响。

启动带有 Hudi 依赖的 Spark SQL Shell 后可以运行这些 SQL 来设置一个 MoR 表,其中插入和更新了一条记录。

代码语言:javascript
复制
create table hudi_mor_example (
  id int,
  name string,
  price double,
  ts bigint
) using hudi
tblproperties (
  type = 'mor',
  primaryKey = 'id',
  preCombineField = 'ts'
) location '/tmp/hudi_mor_example';

set hoodie.spark.sql.insert.into.operation=UPSERT;
insert into hudi_mor_example select 1, 'foo', 10, 1000;
insert into hudi_mor_example select 1, 'foo', 20, 2000;
insert into hudi_mor_example select 1, 'foo', 30, 3000;

可以通过运行如下所示的 SELECT 语句来执行快照查询,它将检索记录的最新值。

代码语言:javascript
复制
spark-sql> select id, name, price, ts from hudi_mor_example;
1       foo     30.0    3000
Time taken: 0.161 seconds, Fetched 1 row(s)

读取优化 (RO) 查询

RO 查询类型被设计为较低的读取延迟与可能较旧的结果的权衡,因此,它专门适用于 MoR 表。进行此类查询时,collectFileSplits() 将仅获取 FileSlices 的基本文件(Parquet文件)。

上面提供的设置代码自动生成一个名为 hudi_mor_example_ro 的目录表,该表指定属性 hoodie.query.as.ro.table=true。此属性指示查询引擎始终执行 RO 查询。运行下面的 SELECT 语句将返回记录的原始值,因为后续更新尚未应用于基本文件。

代码语言:javascript
复制
spark-sql> select id, name, price, ts from hudi_mor_example_ro;
1       foo     10.0    1000
Time taken: 0.114 seconds, Fetched 1 row(s)

时间旅行查询

通过指定时间戳,用户可以请求Hudi表在给定时间的历史快照。正如前面第 1 篇文章中所讨论的,FileSlices 与特定的提交时间相关联,因此支持过滤。执行时间旅行查询时,如果没有完全匹配,FileIndex 仅查找与指定时间相对应或早于指定时间的 FileSlice。

代码语言:javascript
复制
spark-sql> select id, name, price, ts from hudi_mor_example timestamp as of '20230905221619987';
1       foo     30.0    3000
Time taken: 0.274 seconds, Fetched 1 row(s)

spark-sql> select id, name, price, ts from hudi_mor_example timestamp as of '20230905221619986';
1       foo     20.0    2000
Time taken: 0.241 seconds, Fetched 1 row(s)

第一个 SELECT 语句精确地在最新插入的 deltacommit 时间执行时间旅行查询,提供表的最新快照。第二个查询设置的时间戳早于最新插入的时间戳,从而生成倒数第二个插入的快照。

示例中的时间戳遵循 Hudi 时间线的格式"yyyyMMddHHmmssSSS"。也可以以"yyyy-MM-dd HH:mm:ss.SSS"或"yyyy-MM-dd"的形式设置。

增量查询

用户可以设置起始时间戳(带或不带结束时间戳)以检索指定时间窗口内更改的记录。如果没有设置结束时间,则时间窗口将包括最近的记录。Hudi 还通过在写入端启用附加日志并为增量读取器激活 CDC 模式来提供完整的更改数据捕获 (CDC) 功能。更多详细信息将在专门介绍增量处理的单独帖子中介绍。

回顾

在这篇文章中,我们概述了 Spark 的 Catalyst 优化器,探讨了 Hudi 如何实现 Spark DataSource API 来读取数据,并介绍了四种不同的 Hudi 查询类型。代码片段也可以在这里[4]找到。在接下来的文章中将演示写入流程以进一步加深我们对 Hudi 的理解。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2024-01-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 ApacheHudi 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Spark 查询入门
  • 数据源API
  • Spark-Hudi 读取流程
  • 快照查询
  • 读取优化 (RO) 查询
  • 时间旅行查询
  • 增量查询
  • 回顾
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档