Apache Hudi 在 HDFS 的数据集上提供了插入更新和增量拉取的流原语。一般来说,我们会将大量数据存储到 HDFS,新数据增量写入,而旧数据鲜有改动,特别是在经过数据清洗,放入数据仓库的场景。而且在数据仓库如 Hive 中,对于 update 的支持非常有限,计算昂贵。另一方面,若是有仅对某段时间内新增数据进行分析的场景,则 Hive、Presto、Hbase 等也未提供原生方式,而是需要根据时间戳进行过滤分析。在此需求下,Hudi 可以提供这两种需求的实现。第一个是对 record 级别的更新,另一个是仅对增量数据的查询。且 Hudi 提供了对 Hive、Presto、Spark 的支持,可以直接使用这些组件对 Hudi 管理的数据进行查询。
Hudi 是一个通用的大数据存储系统,主要特性:
摄取和查询引擎之间的快照隔离,包括 Hive、Presto 和 Spark。
支持回滚和存储点,可以恢复数据集。
自动管理文件大小和布局,以优化查询性能准实时摄取,为查询提供最新数据。
实时数据和列数据的异步压缩。
时间轴
Hudi 维护一条包含在不同的即时时间所有对数据集操作的时间轴,从而提供了从不同时间点出发得到不同的视图下的数据集。
Hudi 即时包含以下组件:
操作类型:对数据集执行的操作类型。
即时时间:即时时间通常是一个时间戳(例如20190117010349),该时间戳按操作开始时间的顺序单调增加。
状态:即时的状态。
文件组织
Hudi 将 DFS 上的数据集组织到
基本路径
下的目录结构中。数据集分为多个分区,这些分区是包含该分区的数据文件的文件夹,这与 Hive 表非常相似。每个分区被相对于基本路径的特定
分区路径
区分开来。在每个分区内,文件被组织为文件组
,由文件id
唯一标识。每个文件组包含多个文件切片
,其中每个切片包含在某个提交/压缩即时时间生成的基本列文件*.parquet
以及一组日志文件*.log*
,该文件包含自生成基本文件以来对基本文件的插入/更新。Hudi 采用 MVCC 设计,其中压缩操作将日志和基本文件合并以产生新的文件片,而清理操作则将未使用的/较旧的文件片删除以回收 DFS 上的空间。Hudi 通过索引机制将给定的 hoodie 键(记录键+分区路径)映射到文件组,从而提供了高效的 Upsert。
一旦将记录的第一个版本写入文件,记录键和
文件组
/文件id
之间的映射就永远不会改变。简而言之,映射的文件组包含一组记录的所有版本。存储类型
Hudi 支持以下存储类型:
写时复制:仅使用列文件格式(例如 parquet)存储数据。通过在写入过程中执行同步合并以更新版本并重写文件。
读时合并:更新记录到增量文件中,然后进行同步或异步压缩以生成列文件的新版本。
下表总结了这两种存储类型之间的权衡:
权衡 | 写时复制 | 读时合并 |
数据延迟 | 更高 | 更低 |
查询延迟 | 更低 | 更高 |
更新代价(I/O) | 更高(重写整个 parquet 文件) | 更低(追加到增量日志) |
写放大 | 更高 | 更低(取决于压缩策略) |
Hudi 对 EMR 底层存储支持
HDFS
COS
安装 Hudi
注意:
Hudi 组件依赖 Hive 和 Spark 组件, 如果选择安装 Hudi 组件,需同时选择安装 Hive 和 Spark 组件。
使用示例
本示例将演示使用 SparkSQL 建表并写入数据,然后使用 Hive 和 Trino 进行查询分析。
使用 SparkSQL 读写 Hudi 表
登录 master 节点,切换为 hadoop 用户。
Hudi 支持使用 SparkSQL 的 HoodieSparkSessionExtension 扩展进行数据读写:
spark-sql --master yarn \\--num-executors 2 \\--executor-memory 1g \\--executor-cores 2 \\--jars /usr/local/service/hudi/hudi-bundle/hudi-spark3.3-bundle_2.12-0.13.0.jar \\--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \\--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \\--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
说明:
其中 --master 表示您的 master URL,--num-executors 表示 executor 数量,--executor-memory 表示 executor 的储存容量,以上参数也可以根据您的实际情况作出修改。 --jars 使用的依赖包版本在不同 EMR 版本中可能存在差异,请在 /usr/local/service/hudi/hudi-bundle 目录下查看并使用正确的依赖包。
建表
-- 创建COW非分区表spark-sql> create table hudi_cow_nonpcf_tbl (uuid int,name string,price double) using huditblproperties (primaryKey = 'uuid');-- 创建COW分区表spark-sql> create table hudi_cow_pt_tbl (id bigint,name string,ts bigint,dt string,hh string) using huditblproperties (type = 'cow',primaryKey = 'id',preCombineField = 'ts')partitioned by (dt, hh);-- 创建MOR分区表spark-sql> create table hudi_mor_tbl (id int,name string,price double,ts bigint,dt string) using huditblproperties (type = 'mor',primaryKey = 'id',preCombineField = 'ts')partitioned by (dt);
写入数据
-- insert into non-partitioned tablespark-sql> insert into hudi_cow_nonpcf_tbl select 1, 'a1', 20;-- insert dynamic partitionspark-sql> insert into hudi_cow_pt_tbl partition (dt, hh) select 1 as id, 'a1' as name, 1000 as ts, '2021-12-09' as dt, '10' as hh;-- insert static partitionspark-sql> insert into hudi_cow_pt_tbl partition(dt = '2021-12-09', hh='11') select 2, 'a2', 1000;spark-sql> insert into hudi_mor_tbl partition(dt = '2021-12-09') select 1, 'a1', 20, 1000;
查询数据
spark-sql> select * from hudi_cow_nonpcf_tbl;20230808165859974 20230808165859974_0_1 uuid:1 f564e6c7-631c-4f32-a01d-e042f37ad6e6-0_0-21-19_20230808165859974.parquet 1 a1 20.0Time taken: 6.255 seconds, Fetched 1 row(s)spark-sql> select count(*) from hudi_mor_tbl;1Time taken: 0.955 seconds, Fetched 1 row(s)spark-sql> select name, count(*) from hudi_cow_pt_tbl group by name;a2 1a1 1Time taken: 2.049 seconds, Fetched 2 row(s)
使用 Hive 查询 Hudi 表
以下演示在 Hive CLI 中如何查询上述创建的 Hudi 表:
说明:
不同 EMR 版本中加载的 Hudi 依赖包版本可能存在不同,请在/usr/local/service/hudi/hudi-bundle目录下选择正确版本依赖包进行加载。
hive> add jar /usr/local/service/hudi/hudi-bundle/hudi-hadoop-mr-bundle-0.13.0.jar;Added [/usr/local/service/hudi/hudi-bundle/hudi-hadoop-mr-bundle-0.13.0.jar] to class pathhive> show tables;OKhudi_cow_nonpcf_tblhudi_cow_pt_tblhudi_mor_tblhudi_mor_tbl_rohudi_mor_tbl_rtTime taken: 0.023 seconds, Fetched: 5 row(s)hive> set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;hive> select * from hudi_mor_tbl_ro;OK20230808174602565 20230808174602565_0_1 id:1 dt=2021-12-09 af40667d-1dca-4163-89ca-2c48250985b2-0_0-34-1617_20230808174602565.parquet 1 a1 20.0 1000 2021-12-09Time taken: 0.159 seconds, Fetched: 1 row(s)hive> set hive.vectorized.execution.enabled=false;hive> select name, count(*) from hudi_mor_tbl_rt group by name;a1 1Time taken: 17.618 seconds, Fetched: 1 row(s)
使用 Trino 查询 Hudi 表
以下演示如何在 Trino 中查询上述创建的 Hudi 表。
EMR 3.6.0之前版本:
Trino 查询 Hudi 使用 Hive Connector,无需额外配置。
将 /usr/local/service/hudi/hudi-bundle/hudi-hadoop-mr-bundle-$hudi_version.jar 拷贝到集群所有节点的/usr/local/service/trino/plugin/hive/路径下,并重启 Trino 服务。
EMR-3.6.0版本及以后版本:
需要在 Trino 组件配置管理中添加 hudi connector 配置文件。参考配置更新说明进行新增配置文件操作。其中 ${hivemetastore_uri} 需要改为集群的 Hive Metastore 服务地址,具体可以查看 Hive 组件配置文件 hive-site.xml 中的 hive.metastore.uris 配置项。
配置完成后重启Trino服务。
connector.name=hudihive.metastore.uri=${hivemetatore_uri}
完成以上步骤后,可以在 Trino 中查询 Hudi 表:
trino:default> show tables;Table---------------------hudi_cow_nonpcf_tblhudi_cow_pt_tblhudi_mor_tblhudi_mor_tbl_rohudi_mor_tbl_rt(5 rows)Query 20230808_113217_00005_r3tqk, FINISHED, 3 nodesSplits: 12 total, 12 done (100.00%)0.34 [11 rows, 359B] [32 rows/s, 1.03KB/s]trino:default> select uuid, name, price from hudi_cow_nonpcf_tbl;uuid | name | price------+------+-------1 | a1 | 20.0(1 row)Query 20230808_114808_00001_72vwk, FINISHED, 1 nodeSplits: 5 total, 5 done (100.00%)3.48 [1 rows, 440KB] [0 rows/s, 126KB/s]trino:default> select id, name, price from hudi_mor_tbl_ro;id | name | price----+------+-------1 | a1 | 20.0(1 row)Query 20230808_115532_00006_72vwk, FINISHED, 1 nodeSplits: 5 total, 5 done (100.00%)1.75 [1 rows, 440KB] [0 rows/s, 251KB/s]trino:default> select name, count(*) from hudi_mor_tbl_rt group by name;name | _col1------+-------a1 | 1(1 row)Query 20230808_115728_00009_72vwk, FINISHED, 2 nodesSplits: 21 total, 21 done (100.00%)0.63 [1 rows, 440KB] [1 rows/s, 698KB/s]