前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink写入数据到Hudi数据湖的各种方式

Flink写入数据到Hudi数据湖的各种方式

作者头像
大数据真好玩
发布2022-12-05 09:10:07
1.9K0
发布2022-12-05 09:10:07
举报
文章被收录于专栏:暴走大数据暴走大数据

1. 写入方式

1.1 CDC Ingestion

有两种方式同步数据到Hudi

  1. 使用Flink CDC直接将Mysql的binlog日志同步到Hudi
  2. 数据先同步到Kafka/Pulsar等消息系统,然后再使用Flink cdc-format将数据同步到Hudi

注意:

  1. 如果upstream不能保证数据的order,则需要显式指定write.precombine.field
  2. MOR类型的表,还不能处理delete,所以会导致数据不一致。可以通过changelog.enabled转换到change log模式
1.2 Bulk Insert

主要用于数据初始化导入。Bulk Insert不会进行数据去重,需要用户在数据插入前进行数据去重

Bulk Insert在batch execution mode下更高效

使用参数如下:

1.3 Index Bootstrap

用于snapshot data + incremental data数据导入。snapshot data部分使用Bulk insert方式完成。incremental data进行实时导入

使用参数如下:

但是incremental data如何不丢失数据,又不重复导入数据:

  1. incremental data导入部分刚开始可以多导入一部分数据,确保数据不丢失。同时开启index bootstrap function避免数据重复。
  2. 等Flink第一次checkpoint成功,关闭index bootstrap function,从Flink的State恢复状态进行incremental data导入

详细使用步骤如下:

  1. 在flink-conf.yaml中设置一个application允许checkpoint失败的次数:execution.checkpointing.tolerable-failed-checkpoints = n
  2. 在Flink的Catalog创建Hudi表,创建Hudi表的SQL中添加参数index.bootstrap.enabled = true
  3. 启动Application将incremental data导入到Hudi表
  4. 等第一次checkpoint成功,表明index bootstrap完成
  5. 停止Flink的Application,并进行Savepoint
  6. 重新在Flink的Catalog创建Hudi表,创建Hudi表的SQL中添加参数index.bootstrap.enabled = false
  7. 重启Application,从Savepoint或checkpoint恢复状态执行

注意:

  1. index bootstrap是一个阻塞过程,因此在index bootstrap期间无法完成checkpoint
  2. index bootstrap由输入input data触发。用户需要确保每个分区中至少有一条数据
  3. index bootstrap是并发执行的。用户可以在日志文件中通过finish loading the index under partition和Load record form file观察index bootstrap的进度

2. 写入模式

2.1 Changelog Mode

使用参数如下:

保留消息的all changes(I / -U / U / D),Hudi MOR类型的表将all changes append到file log中,但是compaction会对all changes进行merge。如果想消费all changes,需要调整compaction参数:compaction.delta_commitscompaction.delta_seconds

Snapshot读取,永远读取merge后的结果数据

2.2 Append Mode

使用参数如下:

3. write写入速率限制

场景:使用Flink消费历史数据 + 实时增量数据,然后写入到Hudi。会造成写入吞吐量巨大 + 写入分区乱序严重,影响集群和application的稳定性。所以需要限制速率

使用参数如下:

4. 读取方式

4.1 Streaming Query

默认是Batch query,查询最新的Snapshot

Streaming Query需要设置read.streaming.enabled = true。再设置read.start-commit,如果想消费所以数据,设置值为earliest

使用参数如下:

注意:如果开启read.streaming.skip_compaction,但stream reader的速度比clean.retain_commits慢,可能会造成数据丢失

4.2 Incremental Query

有3种使用场景

  • Streaming query: 设置read.start-commit
  • Batch query: 同时设置read.start-commitread.end-commit,start commit和end commit都包含
  • TimeTravel: 设置read.end-commit为大于当前的一个instant time,read.start-commit默认为latest

使用参数如下:

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-09-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据真好玩 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 写入方式
    • 1.1 CDC Ingestion
      • 1.2 Bulk Insert
        • 1.3 Index Bootstrap
        • 2. 写入模式
          • 2.1 Changelog Mode
            • 2.2 Append Mode
            • 3. write写入速率限制
            • 4. 读取方式
              • 4.1 Streaming Query
                • 4.2 Incremental Query
                相关产品与服务
                大数据
                全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档