有奖捉虫:办公协同&微信生态&物联网文档专题 HOT
Apache Hudi 在 HDFS 的数据集上提供了插入更新和增量拉取的流原语。一般来说,我们会将大量数据存储到 HDFS,新数据增量写入,而旧数据鲜有改动,特别是在经过数据清洗,放入数据仓库的场景。而且在数据仓库如 Hive 中,对于 update 的支持非常有限,计算昂贵。另一方面,若是有仅对某段时间内新增数据进行分析的场景,则 Hive、Presto、Hbase 等也未提供原生方式,而是需要根据时间戳进行过滤分析。在此需求下,Hudi 可以提供这两种需求的实现。第一个是对 record 级别的更新,另一个是仅对增量数据的查询。且 Hudi 提供了对 Hive、Presto、Spark 的支持,可以直接使用这些组件对 Hudi 管理的数据进行查询。
Hudi 是一个通用的大数据存储系统,主要特性:
摄取和查询引擎之间的快照隔离,包括 Hive、Presto 和 Spark。
支持回滚和存储点,可以恢复数据集。
自动管理文件大小和布局,以优化查询性能准实时摄取,为查询提供最新数据。
实时数据和列数据的异步压缩。

时间轴

Hudi 维护一条包含在不同的即时时间所有对数据集操作的时间轴,从而提供了从不同时间点出发得到不同的视图下的数据集。
Hudi 即时包含以下组件:
操作类型:对数据集执行的操作类型。
即时时间:即时时间通常是一个时间戳(例如20190117010349),该时间戳按操作开始时间的顺序单调增加。
状态:即时的状态。

文件组织

Hudi 将 DFS 上的数据集组织到基本路径下的目录结构中。数据集分为多个分区,这些分区是包含该分区的数据文件的文件夹,这与 Hive 表非常相似。
每个分区被相对于基本路径的特定分区路径区分开来。在每个分区内,文件被组织为文件组,由文件id唯一标识。每个文件组包含多个文件切片,其中每个切片包含在某个提交/压缩即时时间生成的基本列文件*.parquet以及一组日志文件*.log*,该文件包含自生成基本文件以来对基本文件的插入/更新。
Hudi 采用 MVCC 设计,其中压缩操作将日志和基本文件合并以产生新的文件片,而清理操作则将未使用的/较旧的文件片删除以回收 DFS 上的空间。Hudi 通过索引机制将给定的 hoodie 键(记录键+分区路径)映射到文件组,从而提供了高效的 Upsert。
一旦将记录的第一个版本写入文件,记录键和文件组/文件id之间的映射就永远不会改变。简而言之,映射的文件组包含一组记录的所有版本。

存储类型

Hudi 支持以下存储类型:
写时复制:仅使用列文件格式(例如 parquet)存储数据。通过在写入过程中执行同步合并以更新版本并重写文件。
读时合并:更新记录到增量文件中,然后进行同步或异步压缩以生成列文件的新版本。
下表总结了这两种存储类型之间的权衡:
权衡
写时复制
读时合并
数据延迟
更高
更低
查询延迟
更低
更高
更新代价(I/O)
更高(重写整个 parquet 文件)
更低(追加到增量日志)
写放大
更高
更低(取决于压缩策略)

Hudi 对 EMR 底层存储支持

HDFS
COS

安装 Hudi

进入 EMR 购买页,选择产品版本为 EMR-V3.6.0,选择可选组件为 Hudi 0.13.0。各个产品版本对应的 Hudi 组件版本可以查看组件版本概览页面。
注意:
Hudi 组件依赖 Hive 和 Spark 组件, 如果选择安装 Hudi 组件,需同时选择安装 Hive 和 Spark 组件。

使用示例

本示例将演示使用 SparkSQL 建表并写入数据,然后使用 Hive 和 Trino 进行查询分析。

使用 SparkSQL 读写 Hudi 表

登录 master 节点,切换为 hadoop 用户。
Hudi 支持使用 SparkSQL 的 HoodieSparkSessionExtension 扩展进行数据读写:
spark-sql --master yarn \\
--num-executors 2 \\
--executor-memory 1g \\
--executor-cores 2 \\
--jars /usr/local/service/hudi/hudi-bundle/hudi-spark3.3-bundle_2.12-0.13.0.jar \\
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \\
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \\
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
说明:
其中 --master 表示您的 master URL,--num-executors 表示 executor 数量,--executor-memory 表示 executor 的储存容量,以上参数也可以根据您的实际情况作出修改。 --jars 使用的依赖包版本在不同 EMR 版本中可能存在差异,请在 /usr/local/service/hudi/hudi-bundle 目录下查看并使用正确的依赖包。
建表
-- 创建COW非分区表

spark-sql> create table hudi_cow_nonpcf_tbl (
uuid int,
name string,
price double
) using hudi
tblproperties (
primaryKey = 'uuid'
);

-- 创建COW分区表

spark-sql> create table hudi_cow_pt_tbl (
id bigint,
name string,
ts bigint,
dt string,
hh string
) using hudi
tblproperties (
type = 'cow',
primaryKey = 'id',
preCombineField = 'ts'
)
partitioned by (dt, hh);

-- 创建MOR分区表

spark-sql> create table hudi_mor_tbl (
id int,
name string,
price double,
ts bigint,
dt string
) using hudi
tblproperties (
type = 'mor',
primaryKey = 'id',
preCombineField = 'ts'
)
partitioned by (dt);

写入数据
-- insert into non-partitioned table
spark-sql> insert into hudi_cow_nonpcf_tbl select 1, 'a1', 20;

-- insert dynamic partition
spark-sql> insert into hudi_cow_pt_tbl partition (dt, hh) select 1 as id, 'a1' as name, 1000 as ts, '2021-12-09' as dt, '10' as hh;

-- insert static partition
spark-sql> insert into hudi_cow_pt_tbl partition(dt = '2021-12-09', hh='11') select 2, 'a2', 1000;
spark-sql> insert into hudi_mor_tbl partition(dt = '2021-12-09') select 1, 'a1', 20, 1000;
查询数据
spark-sql> select * from hudi_cow_nonpcf_tbl;
20230808165859974 20230808165859974_0_1 uuid:1 f564e6c7-631c-4f32-a01d-e042f37ad6e6-0_0-21-19_20230808165859974.parquet 1 a1 20.0
Time taken: 6.255 seconds, Fetched 1 row(s)

spark-sql> select count(*) from hudi_mor_tbl;
1
Time taken: 0.955 seconds, Fetched 1 row(s)

spark-sql> select name, count(*) from hudi_cow_pt_tbl group by name;
a2 1
a1 1
Time taken: 2.049 seconds, Fetched 2 row(s)

使用 Hive 查询 Hudi 表

以下演示在 Hive CLI 中如何查询上述创建的 Hudi 表:
说明:
不同 EMR 版本中加载的 Hudi 依赖包版本可能存在不同,请在/usr/local/service/hudi/hudi-bundle目录下选择正确版本依赖包进行加载。
hive> add jar /usr/local/service/hudi/hudi-bundle/hudi-hadoop-mr-bundle-0.13.0.jar;
Added [/usr/local/service/hudi/hudi-bundle/hudi-hadoop-mr-bundle-0.13.0.jar] to class path

hive> show tables;
OK
hudi_cow_nonpcf_tbl
hudi_cow_pt_tbl
hudi_mor_tbl
hudi_mor_tbl_ro
hudi_mor_tbl_rt
Time taken: 0.023 seconds, Fetched: 5 row(s)

hive> set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
hive> select * from hudi_mor_tbl_ro;
OK
20230808174602565 20230808174602565_0_1 id:1 dt=2021-12-09 af40667d-1dca-4163-89ca-2c48250985b2-0_0-34-1617_20230808174602565.parquet 1 a1 20.0 1000 2021-12-09
Time taken: 0.159 seconds, Fetched: 1 row(s)

hive> set hive.vectorized.execution.enabled=false;
hive> select name, count(*) from hudi_mor_tbl_rt group by name;
a1 1
Time taken: 17.618 seconds, Fetched: 1 row(s)

使用 Trino 查询 Hudi 表

以下演示如何在 Trino 中查询上述创建的 Hudi 表。
EMR 3.6.0之前版本:
Trino 查询 Hudi 使用 Hive Connector,无需额外配置。
/usr/local/service/hudi/hudi-bundle/hudi-hadoop-mr-bundle-$hudi_version.jar 拷贝到集群所有节点的/usr/local/service/trino/plugin/hive/路径下,并重启 Trino 服务。
EMR-3.6.0版本及以后版本:
需要在 Trino 组件配置管理中添加 hudi connector 配置文件。参考配置更新说明进行新增配置文件操作。其中 ${hivemetastore_uri} 需要改为集群的 Hive Metastore 服务地址,具体可以查看 Hive 组件配置文件 hive-site.xml 中的 hive.metastore.uris 配置项。
配置完成后重启Trino服务。
connector.name=hudi
hive.metastore.uri=${hivemetatore_uri}
完成以上步骤后,可以在 Trino 中查询 Hudi 表:
trino:default> show tables;
Table
---------------------
hudi_cow_nonpcf_tbl
hudi_cow_pt_tbl
hudi_mor_tbl
hudi_mor_tbl_ro
hudi_mor_tbl_rt
(5 rows)

Query 20230808_113217_00005_r3tqk, FINISHED, 3 nodes
Splits: 12 total, 12 done (100.00%)
0.34 [11 rows, 359B] [32 rows/s, 1.03KB/s]

trino:default> select uuid, name, price from hudi_cow_nonpcf_tbl;
uuid | name | price
------+------+-------
1 | a1 | 20.0
(1 row)

Query 20230808_114808_00001_72vwk, FINISHED, 1 node
Splits: 5 total, 5 done (100.00%)
3.48 [1 rows, 440KB] [0 rows/s, 126KB/s]

trino:default> select id, name, price from hudi_mor_tbl_ro;
id | name | price
----+------+-------
1 | a1 | 20.0
(1 row)

Query 20230808_115532_00006_72vwk, FINISHED, 1 node
Splits: 5 total, 5 done (100.00%)
1.75 [1 rows, 440KB] [0 rows/s, 251KB/s]

trino:default> select name, count(*) from hudi_mor_tbl_rt group by name;
name | _col1
------+-------
a1 | 1
(1 row)

Query 20230808_115728_00009_72vwk, FINISHED, 2 nodes
Splits: 21 total, 21 done (100.00%)
0.63 [1 rows, 440KB] [1 rows/s, 698KB/s]