原生表(Iceberg)原理解析
DLC 原生表(Iceberg)采用 Iceberg 表格式作为底层存储,在兼容开源 Iceberg 能力的基础上,还做了存算分离性能、易用性增强。
Iceberg 表格式通过划分数据文件和元数据文件管理用户的数据。
数据层(data layer):由一系列 data file 组成,用于存放用户表的数据,data file 支持 parquet、avro、orc 格式,DLC 中默认为 parquet 格式。
由于 iceberg 的快照机制,用户在删除数据时,并不会立即将数据从存储上删除,而是写入新的 delete file,用于记录被删除的数据,根据使用的不同,delete file 分为 position delete file 和 equality delete file。
position delete 是位置删除文件,记录某个 data file 的某一行被删除的信息。
equality delete 为等值删除文件,记录某个 key 的值被删除,通常用在 upsert 写入场景。delete file 也属于 data file 的一种类型。
元数据层(metadata layer):由一系列的 manifest、manifest list、metadata 文件组成,manifest file 包含一系列的 data file 的元信息,如文件路径、写入时间、min-max 值、统计值等信息。
manifest list 由 manifest file组成,通常一个 manifest list 包含一个快照的 manifest file。
metadata file 为 json 格式,包含一些列的 manifest list 文件信息和表的元信息,如表 schema、分区、所有快照等。每当表状态发生变化时,都会产生一个新的 metadata file 覆盖原有 metadata 文件,且该过程有 Iceberg 内核的原子性保证。
原生表(Iceberg)表版本
DLC 原生表(iceberg),从使用场景上,可分为 Append 场景表和 Upsert 表,Append 场景表采用的 V1 格式,Upsert 表采用的 V2 格式。
Append 场景表:该场景表仅支持 Append,Overwrite,Merge into 方式写入。
Upsert 场景表:相比 Append,写入能力相比多一种 Upsert 写入模式。
原生表(Iceberg)建表属性
为更好的管理和使用 DLC 原生表(Iceberg),您创建该类型的表时需要携带一些属性,这些属性参考如下。用户在创建表时可以携带上这些属性值,也可以修改表的属性值,详细的操作请参见 DLC 原生表操作配置。
属性值 | 含义 | 配置指导 |
format-version | iceberg 表版本,取值范围1、2,默认取值为1 | 如果用户写入场景有 upsert,该值必须设置为2 |
write.upsert.enabled | 是否开启 upsert,取值为true;不设置则为不开启 | 如果用户写入场景有 upsert,必须设置为 true |
write.update.mode | 更新模式 | merge-on-read 指定为 MOR 表,缺省为 COW |
write.merge.mode | merge 模式 | merge-on-read 指定为 MOR 表,缺省为 COW |
write.parquet.bloom-filter-enabled.column.{col} | 开启 bloom,取值为 true 表示开启,缺省不开启 | upsert 场景必须开启,需要根据上游的主键进行配置;如上游有多个主键,最多取前两个;开启后可提升 MOR 查询和小文件合并性能 |
write.distribution-mode | 写实模式 | 建议取值为 hash,当取值为 hash 时,当数据写入时会自行进行 repartition,缺点是影响部分写入性能 |
write.metadata.delete-after-commit.enabled | 开始 metadata 文件自动清理 | 强烈建议设置为 true,开启后 iceberg 在产生快照时会自动清理历史的 metadata 文件,可避免大量的 metadata 文件堆积 |
write.metadata.previous-versions-max | 设置默认保留的 metadata 文件数量 | 默认值为100,在某些特殊的情况下,用户可适当调整该值,需要配合 write.metadata.delete-after-commit.enabled 一起使用 |
write.metadata.metrics.default | 设置列 metrices 模型 | 必须取值为 full |
原生表(Iceberg)核心能力
ACID 事务
Iceberg 写入支持在单个操作中删除和新增,且不会部分对用户可见,从而提供原子性写入操作。
Iceberg 使用乐观并发锁来确保写入数据不会导致数据不一致,用户只能看到读视图中已经提交成功的数据。
Iceberg 使用快照机制和可序列化隔离级确保读取和写入是隔离的。
Iceberg 确保事务是持久化的,一旦移交成功就是永久性的。
写入
写入过程遵循乐观并发控制,写入者首先假设当前表版本在提交更新之前不会发生变更,对表数据进行更新/删除/新增,并创建新版本的元数据文件,之后尝试替换当前版本的元数据文件到新版本上来,但是在替换过程中,Iceberg 会检查当前的更新是否是基于当前版本的快照进行的,
如果不是则表示发生了写冲突,有其他写入者先更新了当前 metadata,此时写入必须基于当前的 metadata 版本从新更新,之后再次提交更新,整个提交替换过程由元数据锁保证操作的原子性。
读取
Iceberg 读取和写入是独立的过程,读者始终只能看到已经提交成功的快照,通过获取版本的 metadata 文件,获取的快照信息,从而读取当前表的数据,由于在未完成写入时,并不会更新 metadata 文件,从而确保始终从已经完成的操作中读取数据,无法从正在写入的操作中获取数据。
冲突参数配置
DLC 托管表(Iceberg)在写入并发变高时将会触发写入冲突,为降低冲突频率,用户可从如下方面对业务进行合理的调整。
进入合并的表结构设置,如分区,合理规划作业写入范围,减少任务写入时间,在一定程度上降低并发冲突概率。
作业进行一定程度的合并,减小写入并发量。
DLC 还支持一些列冲突并发重试的参数设置,在一定程度可提供重试操作的成功率,减小对业务的影响,参数含义及配置指导如下。
属性值 | 系统默认值 | 含义 | 配置指导 |
commit.retry.num-retries | 4 | 提交失败后的重试次数 | 发生重试时,可尝试提大次数 |
commit.retry.min-wait-ms | 100 | 重试前的最小等待时间,单位为毫秒 | 当时冲突十分频繁,如等待一段时间后依然冲突,可尝试调整该值,加大重试之间的间隔 |
commit.retry.max-wait-ms | 60000(1 min) | 重试前的最大等待时间,单位为毫秒 | 结合commit.retry.min-wait-ms一起调整使用 |
commit.retry.total-timeout-ms | 1800000(30 min) | 整个重试提交的超时时间 | - |
隐藏式分区
DLC 原生表(Iceberg)隐藏分区是将分区信息隐藏起来,开发人员只需要在建表的时候指定分区策略,Iceberg 会根据分区策略维护表字段与数据文件之间的逻辑关系,在写入和查询时无需关注分区布局,Iceberg 在写入数据是根据分区策略找到分区信息,并将其记录在元数据中,查询时也会更具元数据记录过滤到不需要扫描的文件。DLC 原生表(Iceberg)提供的分区策略如下表所示。
转换策略 | 描述 | 原始字段类型 | 转换后类型 |
identity | 不转换 | 所有类型 | 与原类型一致 |
bucket[N, col] | hash分桶 | int, long, decimal, date, time, timestamp, timestamptz, string, uuid, fixed, binary | int |
truncate[col] | 截取固定长度 | int, long, decimal, string | 与原类型一致 |
year | 提取字段 year 信息 | date, timestamp, timestamptz | int |
month | 提取字段 mouth 信息 | date, timestamp, timestamptz | int |
day | 提取字段 day 信息 | date, timestamp, timestamptz | int |
hour | 提取字段 hour 信息 | timestamp, timestamptz | int |
元数据查询和存储过程
场景 | CALL 语句 | 执行引擎 |
查询 history | select * from `DataLakeCatalog`.`db`.`sample$history` | SuperSQL引擎 spark(sql)、SuperSQL presto 引擎 |
| select * from `DataLakeCatalog`.`db`.`sample`.`history` | SuperSQL引擎 spark(作业)、标准引擎 spark |
查询快照 | select * from `DataLakeCatalog`.`db`.`sample$snapshots` | SuperSQL引擎 spark(sql)、SuperSQL presto 引擎 |
| select * from `DataLakeCatalog`.`db`.`sample`.`snapshots` | SuperSQL引擎 spark(作业)、标准引擎 spark |
查询 data 文件 | select * from `DataLakeCatalog`.`db`.`sample$files` | SuperSQL引擎 spark(sql)、SuperSQL presto 引擎 |
| select * from `DataLakeCatalog`.`db`.`sample`.`files` | SuperSQL引擎 spark(作业)、标准引擎 spark |
查询manifests | select * from `DataLakeCatalog`.`db`.`sample$manifests` | SuperSQL引擎 spark(sql)、SuperSQL presto 引擎 |
| select * from `DataLakeCatalog`.`db`.`sample`.`manifests` | SuperSQL引擎 spark(作业)、标准引擎 spark |
查询分区 | select * from `DataLakeCatalog`.`db`.`sample$partitions` | SuperSQL引擎 spark(sql)、SuperSQL presto 引擎 |
| select * from `DataLakeCatalog`.`db`.`sample`.`partitions` | SuperSQL引擎 spark(作业)、标准引擎 spark |
回滚指定快照 | CALL DataLakeCatalog.`system`.rollback_to_snapshot('db.sample', 1) | SuperSQL引擎 spark、标准引擎 spark |
回滚到某个时间点 | CALL DataLakeCatalog.`system`.rollback_to_timestamp('db.sample', TIMESTAMP '2021-06-30 00:00:00.000') | SuperSQL引擎 spark、标准引擎 spark |
设置当前快照 | CALL DataLakeCatalog.`system`.set_current_snapshot('db.sample', 1) | SuperSQL引擎 spark、标准引擎 spark |
合并文件 | CALL DataLakeCatalog.`system`.rewrite_data_files(table => 'db.sample', strategy => 'sort', sort_order => 'id DESC NULLS LAST,name ASC NULLS FIRST') | SuperSQL引擎 spark、标准引擎 spark |
快照过期 | CALL DataLakeCatalog.`system`.expire_snapshots('db.sample', TIMESTAMP '2021-06-30 00:00:00.000', 100) | SuperSQL引擎 spark、标准引擎 spark |
移除孤立文件 | CALL DataLakeCatalog.`system`.remove_orphan_files(table => 'db.sample', dry_run => true) | SuperSQL引擎 spark、标准引擎 spark |
重新元数据 | CALL DataLakeCatalog.`system`.rewrite_manifests('db.sample') | SuperSQL引擎 spark、标准引擎 spark |