前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Apache Hudi 流转批 场景实践

Apache Hudi 流转批 场景实践

作者头像
ApacheHudi
发布2023-02-28 11:20:12
8060
发布2023-02-28 11:20:12
举报
文章被收录于专栏:ApacheHudi

背景

在某些业务场景下,我们需要一个标志来衡量hudi数据写入的进度,比如:Flink 实时向 Hudi 表写入数据,然后使用这个 Hudi 表来支持批量计算并通过一个 flag 来评估它的分区数据是否完整从而进一步写入分区数据进行分区级别的ETL,这也就是我们通常说的流转批

EventTime计算原理

图中Flink Sink包含了两个算子。第一个writer 算子,它负责把数据写入文件,writer在checkpoint触发时,会把自己写入的最大的一个时间传到commit算子中,然后commit算子从多个上游传过来的时间中选取一个最小值作为这一批提交数据的时间,并写入HUDI表的元数据中。

社区相关工作参考: https://issues.apache.org/jira/browse/HUDI-5095

案例使用

我们的方案是将这个进度值(EventTime)存储为 hudi 提交(版本)元数据的属性里,然后通过访问这个元数据属性获取这个进度值。在下游的批处理任务之前加一个监控任务去监控最新快照元数据。如果它的时间已经超过了当前的分区时间,就认为这个表的数据已经完备了,这个监控任务就会成功触发下游的批处理任务进行计算,这样可以防止在异常场景下数据管道或者批处理任务空跑的情况。

下图是一个flink 1分钟级别入库到HUDI ODS表, 然后通过流转批计算写入HUDI DWD表的一个执行过程。

US调度系统轮询逻辑

如何解决乱序到来问题, 我们可以通过设置spedGapTime来设置允许延迟到来的范围默认是0 不会延迟到来。

Maven pom 依赖

针对此功能特性的Hudi依赖版本如下

代码语言:javascript
复制

<dependencies>
  <dependency>
    <groupId>org.apache.hudi</groupId>
    <artifactId>hudi-flink1.13-bundle</artifactId>
    <version>0.12.1</version>
  </dependency>
</dependencies>

<dependencies>
  <dependency>
    <groupId>org.apache.hudi</groupId>
    <artifactId>hudi-flink1.15-bundle</artifactId>
    <version>0.12.1</version>
  </dependency>
</dependencies>

如何设置EventTime

能够解析的字段类型及格式如下:

类型

示例

TIMESTAMP(3)

2012-12-12T12:12:12

TIMESTAMP(3)

2012-12-12 12:12:12

DATE

2012-12-12

BIGINT

100L

INT

100

Flink API

用户只需要设置flink conf指定时间字段作为时间推进字段

代码语言:javascript
复制
Map<String, String> options = new HashMap<>();
// 这里省略其他表字段
options.put(FlinkOptions.EVENT_TIME_FIELD.key(), "ts");
HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable)
     .column("id int not null")
     .column("ts string")
     .column("dt string")
     .pk("id")
     .partition("dt")
     .options(options);

Flink SQL

通过设置hoodie.payload.event.time.field指定需要计算的eventtime的字段

代码语言:javascript
复制
create table hudi_cow_01(\n" +
"  uuid varchar(20),\n" +
"  name varchar(10),\n" +
"  age int,\n" +
"  ts timestamp(3),\n" +
"  PRIMARY KEY(uuid) NOT ENFORCED\n" +
")\n" +
" with (\n" +
 // 这里省略其他参数
"  'hoodie.payload.event.time.field' = 'ts'\n"
")

如何读取EventTime

Spark SQL

代码语言:javascript
复制
call show_commit_extra_metadata(table => 'hudi_tauth_test.hudi_cow_01', metadata_key => 'hoodie.payload.event.time.field');

Java API

代码获取片段如下

代码语言:javascript
复制
Option<HoodieCommitMetadata> commitMetadataOption = MetadataConversionUtils.getHoodieCommitMetadata(metaClient, currentInstant);
if (!commitMetadataOption.isPresent()) {
    throw new HoodieException(String.format("Commit %s not found commitMetadata in Commits %s.", currentInstant, timeline));
}
// 获取到当前版本的时间进度
String eventTime = commitMetadataOption.get().getExtraMetadata().get(FlinkOptions.EVENT_TIME_FIELD.key());
System.out.println("current eventTime: " + eventTime);

输出结果如下

代码语言:javascript
复制
current eventTime: 1667971364742
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-02-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 ApacheHudi 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 背景
  • EventTime计算原理
  • 案例使用
  • US调度系统轮询逻辑
  • Maven pom 依赖
  • 如何设置EventTime
    • Flink API
      • Flink SQL
      • 如何读取EventTime
        • Spark SQL
          • Java API
          相关产品与服务
          大数据
          全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档