原生表(Iceberg)格式说明

最近更新时间:2025-03-07 15:07:52

我的收藏

原生表(Iceberg)原理解析

DLC 原生表(Iceberg)采用 Iceberg 表格式作为底层存储,在兼容开源 Iceberg 能力的基础上,还做了存算分离性能、易用性增强。
Iceberg 表格式通过划分数据文件和元数据文件管理用户的数据。
数据层(data layer):由一系列 data file 组成,用于存放用户表的数据,data file 支持 parquet、avro、orc 格式,DLC 中默认为 parquet 格式。
由于 iceberg 的快照机制,用户在删除数据时,并不会立即将数据从存储上删除,而是写入新的 delete file,用于记录被删除的数据,根据使用的不同,delete file 分为 position delete file 和 equality delete file。
position delete 是位置删除文件,记录某个 data file 的某一行被删除的信息。
equality delete 为等值删除文件,记录某个 key 的值被删除,通常用在 upsert 写入场景。delete file 也属于 data file 的一种类型。
元数据层(metadata layer):由一系列的 manifest、manifest list、metadata 文件组成,manifest file 包含一系列的 data file 的元信息,如文件路径、写入时间、min-max 值、统计值等信息。
manifest list 由 manifest file组成,通常一个 manifest list 包含一个快照的 manifest file。
metadata file 为 json 格式,包含一些列的 manifest list 文件信息和表的元信息,如表 schema、分区、所有快照等。每当表状态发生变化时,都会产生一个新的 metadata file 覆盖原有 metadata 文件,且该过程有 Iceberg 内核的原子性保证。

原生表(Iceberg)表版本

DLC 原生表(iceberg),从使用场景上,可分为 Append 场景表和 Upsert 表,Append 场景表采用的 V1 格式,Upsert 表采用的 V2 格式。
Append 场景表:该场景表仅支持 Append,Overwrite,Merge into 方式写入。
Upsert 场景表:相比 Append,写入能力相比多一种 Upsert 写入模式。

原生表(Iceberg)建表属性

为更好的管理和使用 DLC 原生表(Iceberg),您创建该类型的表时需要携带一些属性,这些属性参考如下。用户在创建表时可以携带上这些属性值,也可以修改表的属性值,详细的操作请参见 DLC 原生表操作配置
属性值
含义
配置指导
format-version
iceberg 表版本,取值范围1、2,默认取值为1
如果用户写入场景有 upsert,该值必须设置为2
write.upsert.enabled
是否开启 upsert,取值为true;不设置则为不开启
如果用户写入场景有 upsert,必须设置为 true
write.update.mode
更新模式
merge-on-read 指定为 MOR 表,缺省为 COW
write.merge.mode
merge 模式
merge-on-read 指定为 MOR 表,缺省为 COW
write.parquet.bloom-filter-enabled.column.{col}
开启 bloom,取值为 true 表示开启,缺省不开启
upsert 场景必须开启,需要根据上游的主键进行配置;如上游有多个主键,最多取前两个;开启后可提升 MOR 查询和小文件合并性能
write.distribution-mode
写实模式
建议取值为 hash,当取值为 hash 时,当数据写入时会自行进行 repartition,缺点是影响部分写入性能
write.metadata.delete-after-commit.enabled
开始 metadata 文件自动清理
强烈建议设置为 true,开启后 iceberg 在产生快照时会自动清理历史的 metadata 文件,可避免大量的 metadata 文件堆积
write.metadata.previous-versions-max
设置默认保留的 metadata 文件数量
默认值为100,在某些特殊的情况下,用户可适当调整该值,需要配合 write.metadata.delete-after-commit.enabled 一起使用
write.metadata.metrics.default
设置列 metrices 模型
必须取值为 full

原生表(Iceberg)核心能力

ACID 事务

Iceberg 写入支持在单个操作中删除和新增,且不会部分对用户可见,从而提供原子性写入操作。
Iceberg 使用乐观并发锁来确保写入数据不会导致数据不一致,用户只能看到读视图中已经提交成功的数据。
Iceberg 使用快照机制和可序列化隔离级确保读取和写入是隔离的。
Iceberg 确保事务是持久化的,一旦移交成功就是永久性的。

写入

写入过程遵循乐观并发控制,写入者首先假设当前表版本在提交更新之前不会发生变更,对表数据进行更新/删除/新增,并创建新版本的元数据文件,之后尝试替换当前版本的元数据文件到新版本上来,但是在替换过程中,Iceberg 会检查当前的更新是否是基于当前版本的快照进行的,
如果不是则表示发生了写冲突,有其他写入者先更新了当前 metadata,此时写入必须基于当前的 metadata 版本从新更新,之后再次提交更新,整个提交替换过程由元数据锁保证操作的原子性。

读取

Iceberg 读取和写入是独立的过程,读者始终只能看到已经提交成功的快照,通过获取版本的 metadata 文件,获取的快照信息,从而读取当前表的数据,由于在未完成写入时,并不会更新 metadata 文件,从而确保始终从已经完成的操作中读取数据,无法从正在写入的操作中获取数据。

冲突参数配置

DLC 托管表(Iceberg)在写入并发变高时将会触发写入冲突,为降低冲突频率,用户可从如下方面对业务进行合理的调整。
进入合并的表结构设置,如分区,合理规划作业写入范围,减少任务写入时间,在一定程度上降低并发冲突概率。
作业进行一定程度的合并,减小写入并发量。
DLC 还支持一些列冲突并发重试的参数设置,在一定程度可提供重试操作的成功率,减小对业务的影响,参数含义及配置指导如下。
属性值
系统默认值
含义
配置指导
commit.retry.num-retries
4
提交失败后的重试次数
发生重试时,可尝试提大次数
commit.retry.min-wait-ms
100
重试前的最小等待时间,单位为毫秒
当时冲突十分频繁,如等待一段时间后依然冲突,可尝试调整该值,加大重试之间的间隔
commit.retry.max-wait-ms
60000(1 min)
重试前的最大等待时间,单位为毫秒
结合commit.retry.min-wait-ms一起调整使用
commit.retry.total-timeout-ms
1800000(30 min)
整个重试提交的超时时间
-

隐藏式分区

DLC 原生表(Iceberg)隐藏分区是将分区信息隐藏起来,开发人员只需要在建表的时候指定分区策略,Iceberg 会根据分区策略维护表字段与数据文件之间的逻辑关系,在写入和查询时无需关注分区布局,Iceberg 在写入数据是根据分区策略找到分区信息,并将其记录在元数据中,查询时也会更具元数据记录过滤到不需要扫描的文件。DLC 原生表(Iceberg)提供的分区策略如下表所示。
转换策略
描述
原始字段类型
转换后类型
identity
不转换
所有类型
与原类型一致
bucket[N, col]
hash分桶
int, long, decimal, date, time, timestamp, timestamptz, string, uuid, fixed, binary
int
truncate[col]
截取固定长度
int, long, decimal, string
与原类型一致
year
提取字段 year 信息
date, timestamp, timestamptz
int
month
提取字段 mouth 信息
date, timestamp, timestamptz
int
day
提取字段 day 信息
date, timestamp, timestamptz
int
hour
提取字段 hour 信息
timestamp, timestamptz
int

元数据查询和存储过程

DLC 原生表(Iceberg)可调用存储过程语句查询各类型表信息,如文件合并、快照过期等,如下表格提供部分常用的查询方法,具体语法请参见 Iceberg 表语法
场景
CALL 语句
执行引擎
查询 history
select * from `DataLakeCatalog`.`db`.`sample$history`
SuperSQL引擎 spark(sql)、SuperSQL presto 引擎
select * from `DataLakeCatalog`.`db`.`sample`.`history`
SuperSQL引擎 spark(作业)、标准引擎 spark
查询快照
select * from `DataLakeCatalog`.`db`.`sample$snapshots`
SuperSQL引擎 spark(sql)、SuperSQL presto 引擎
select * from `DataLakeCatalog`.`db`.`sample`.`snapshots`
SuperSQL引擎 spark(作业)、标准引擎 spark
查询 data 文件
select * from `DataLakeCatalog`.`db`.`sample$files`
SuperSQL引擎 spark(sql)、SuperSQL presto 引擎
select * from `DataLakeCatalog`.`db`.`sample`.`files`
SuperSQL引擎 spark(作业)、标准引擎 spark
查询manifests
select * from `DataLakeCatalog`.`db`.`sample$manifests`
SuperSQL引擎 spark(sql)、SuperSQL presto 引擎
select * from `DataLakeCatalog`.`db`.`sample`.`manifests`
SuperSQL引擎 spark(作业)、标准引擎 spark
查询分区
select * from `DataLakeCatalog`.`db`.`sample$partitions`
SuperSQL引擎 spark(sql)、SuperSQL presto 引擎
select * from `DataLakeCatalog`.`db`.`sample`.`partitions`
SuperSQL引擎 spark(作业)、标准引擎 spark
回滚指定快照
CALL DataLakeCatalog.`system`.rollback_to_snapshot('db.sample', 1)
SuperSQL引擎 spark、标准引擎 spark
回滚到某个时间点
CALL DataLakeCatalog.`system`.rollback_to_timestamp('db.sample', TIMESTAMP '2021-06-30 00:00:00.000')
SuperSQL引擎 spark、标准引擎 spark
设置当前快照
CALL DataLakeCatalog.`system`.set_current_snapshot('db.sample', 1)
SuperSQL引擎 spark、标准引擎 spark
合并文件
CALL DataLakeCatalog.`system`.rewrite_data_files(table => 'db.sample', strategy => 'sort', sort_order => 'id DESC NULLS LAST,name ASC NULLS FIRST')
SuperSQL引擎 spark、标准引擎 spark
快照过期
CALL DataLakeCatalog.`system`.expire_snapshots('db.sample', TIMESTAMP '2021-06-30 00:00:00.000', 100)
SuperSQL引擎 spark、标准引擎 spark
移除孤立文件
CALL DataLakeCatalog.`system`.remove_orphan_files(table => 'db.sample', dry_run => true)
SuperSQL引擎 spark、标准引擎 spark
重新元数据
CALL DataLakeCatalog.`system`.rewrite_manifests('db.sample')
SuperSQL引擎 spark、标准引擎 spark