前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >hudi时间旅行查询完整版

hudi时间旅行查询完整版

作者头像
从大数据到人工智能
发布2022-09-09 11:21:56
2K0
发布2022-09-09 11:21:56
举报
文章被收录于专栏:大数据-BigData

本文从头开始讲述使用Flink引擎实现hudi数据湖基于commit_time的查询语义。基本使用可参考前面文章hudi时间旅行查询

基本要求:

  • 有一台机器部署docker用于安装数据生成工具datafaker
  • MySQL数据库开启binlog
  • Flink 1.13.6
  • Flink CDC 2.2.0
  • Hudi 0.11.0

基本流程:

  • 使用datafaker生成测试数据写入MySQL表中
  • 使用Flink CDC工具将MySQL中的数据写到Hudi表
  • 查询Hudi表中的数据
  • 手动修改MySQL中的数据
  • 区间查询hudi中的数据
  • 流式写入数据到hudi表中
  • 再次查询数据

使用datafaker生成测试数据

假定MySQL连接信息为:

  • hostname:192.168.1.2
  • port:3306
  • username:root
  • password:Pass-123-root

MySQL中创建hudi数据库和student表。

代码语言:javascript
复制
create database hudi;
CREATE TABLE `hudi`.`student` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增id',
  `name` varchar(20) NOT NULL COMMENT '学生名字',
  `school` varchar(20) NOT NULL COMMENT '学校名字',
  `nickname` varchar(20) NOT NULL COMMENT '学生小名',
  `age` int(11) NOT NULL COMMENT '学生年龄',
  `class_num` int(11) NOT NULL COMMENT '班级人数',
  `score` decimal(4,2) NOT NULL COMMENT '成绩',
  `phone` bigint(20) NOT NULL COMMENT '电话号码',
  `email` varchar(64) DEFAULT NULL COMMENT '家庭网络邮箱',
  `ip` varchar(32) DEFAULT NULL COMMENT 'IP地址',
  `address` text COMMENT '家庭地址',
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=11 DEFAULT CHARSET=utf8;COPY

新建测试数据生成工具元数据文件,保存如下数据为meta.txt

代码语言:javascript
复制
id||int||自增id[:inc(id,1)]
name||varchar(20)||学生名字
school||varchar(20)||学校名字[:enum(zhongda, beida, xidian, huagong)]
nickname||varchar(20)||学生小名[:enum(鬼泣, 高小王子, 歌神, 逗比)]
age||int||学生年龄[:age]
class_num||int||班级人数[:int(10, 100)]
score||decimal(4,2)||成绩[:decimal(4,2,1)]
phone||bigint||电话号码[:phone_number]
email||varchar(64)||家庭网络邮箱[:email]
ip||varchar(32)||IP地址[:ipv4]
address||text||家庭地址[:address]COPY

生成10条测试数据并写入MySQL表

(在装有docker的服务器执行,由于自增ID从1开始,第二次执行需要修改自增ID起始值)

代码语言:javascript
复制
docker run -v /root/xzc/datafaker/meta.txt:/opt/meta.txt xiaozhch5/datafaker:v1.0 rdb mysql+mysqldb://root:Pass-123-root@192.168.1.2:3306/hudi?charset=utf8 student 10 --meta /opt/meta.txtCOPY

查询student表中的数据

代码语言:javascript
复制
select * from hudi.student limit 10;
COPY
hudi时间旅行查询完整版
hudi时间旅行查询完整版

删除id为1的数据以及更新id为2的数据

代码语言:javascript
复制
delete from hudi.student where id = 1;

update hudi.student set name = "BQmBbNb" where id = 2;
COPY

将测试数据同步到hudi表

yarn上启动flink session

代码语言:javascript
复制
export HADOOP_CLASSPATH
bin/yarn-session.sh -s 8 -jm 2048 -tm 2048 -nm flink-hudi-test -dCOPY

启动flink sql

代码语言:javascript
复制
bin/sql-client.shCOPY

以hdfs上的/user/hudi/warehouse为目录创建catalog

代码语言:javascript
复制
create catalog hudi with('type'='hudi','catalog.path'='hdfs://bigdata:8020/user/hudi/warehouse');COPY

创建test数据库和student表:

代码语言:javascript
复制
create database hudi.test;

drop table if exists hudi.test.student;

create table hudi.test.student (
  `id` INT COMMENT '自增id',
  `name` STRING NOT NULL COMMENT '学生名字',
  `school` STRING NOT NULL COMMENT '学校名字',
  `nickname` STRING NOT NULL COMMENT '学生小名',
  `age` INT NOT NULL COMMENT '学生年龄',
  `class_num` INT NOT NULL COMMENT '班级人数',
  `score` decimal(4,2) NOT NULL COMMENT '成绩',
  `phone` INT NOT NULL COMMENT '电话号码',
  `email` STRING NULL COMMENT '家庭网络邮箱',
  `ip` STRING NULL COMMENT 'IP地址',
  `address` STRING COMMENT '家庭地址',
  `update_time` timestamp NOT NULL COMMENT '更新时间',
  `create_time` timestamp NOT NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
PARTITIONED BY (`school`)
WITH (
  'connector' = 'hudi',
  'table.type' = 'MERGE_ON_READ'
);COPY

Flink SQL中创建stu表读取数据

代码语言:javascript
复制
create table student_mysql (
  `id` INT COMMENT '自增id',
  `name` STRING NOT NULL COMMENT '学生名字',
  `school` STRING NOT NULL COMMENT '学校名字',
  `nickname` STRING NOT NULL COMMENT '学生小名',
  `age` INT NOT NULL COMMENT '学生年龄',
  `class_num` INT NOT NULL COMMENT '班级人数',
  `score` decimal(4,2) NOT NULL COMMENT '成绩',
  `phone` INT NOT NULL COMMENT '电话号码',
  `email` STRING NULL COMMENT '家庭网络邮箱',
  `ip` STRING NULL COMMENT 'IP地址',
  `address` STRING COMMENT '家庭地址',
  `update_time` timestamp NOT NULL COMMENT '更新时间',
  `create_time` timestamp NOT NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
WITH (
  'connector'='mysql-cdc',
  'hostname'='192.168.1.2',
  'port'='3306',
  'username'='root',
  'password'='Pass-123-root',
  'database-name'='hudi',
  'table-name'='student'
);COPY

将mysql数据同步到hudi表

代码语言:javascript
复制
insert into hudi.test.student select * from student_mysql;COPY

查看hudi的commit_time

hudi每次数据写入时都会生成一个时间戳,用于表示数据写入的时间,基于该特性,在进行数据查询时可使用该时间对hudi中数据进行查询。

查看所有时间戳数据:

代码语言:javascript
复制
create table student_commit_time_view(
  `_hoodie_commit_time` string,
  `id` INT COMMENT '自增id',
  `name` STRING NOT NULL COMMENT '学生名字',
  `school` STRING NOT NULL COMMENT '学校名字',
  `nickname` STRING NOT NULL COMMENT '学生小名',
  `age` INT NOT NULL COMMENT '学生年龄',
  `class_num` INT NOT NULL COMMENT '班级人数',
  `score` decimal(4,2) NOT NULL COMMENT '成绩',
  `phone` INT NOT NULL COMMENT '电话号码',
  `email` STRING NULL COMMENT '家庭网络邮箱',
  `ip` STRING NULL COMMENT 'IP地址',
  `address` STRING COMMENT '家庭地址',
  `update_time` timestamp NOT NULL COMMENT '更新时间',
  `create_time` timestamp NOT NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
PARTITIONED BY (`school`)
with (
  'connector' = 'hudi',
  'table.type' = 'MERGE_ON_READ',
  'path' = 'hdfs://bigdata:8020/user/hudi/warehouse/test/student'
  )
COPY
代码语言:javascript
复制
select distinct _hoodie_commit_time from student_commit_time_view
COPY

查到有两个时间戳,分别为:

hudi时间旅行查询完整版
hudi时间旅行查询完整版
  • 20220622152536101 这个时间点为数据写入操作
  • 20220622152707516 这个时间点为数据删除和更新操作

现在查询时间戳小于20220622152536101的数据,再次创建hudi表

代码语言:javascript
复制
create table student_view_1(
  `id` INT COMMENT '自增id',
  `name` STRING NOT NULL COMMENT '学生名字',
  `school` STRING NOT NULL COMMENT '学校名字',
  `nickname` STRING NOT NULL COMMENT '学生小名',
  `age` INT NOT NULL COMMENT '学生年龄',
  `class_num` INT NOT NULL COMMENT '班级人数',
  `score` decimal(4,2) NOT NULL COMMENT '成绩',
  `phone` INT NOT NULL COMMENT '电话号码',
  `email` STRING NULL COMMENT '家庭网络邮箱',
  `ip` STRING NULL COMMENT 'IP地址',
  `address` STRING COMMENT '家庭地址',
  `update_time` timestamp NOT NULL COMMENT '更新时间',
  `create_time` timestamp NOT NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
PARTITIONED BY (`school`)
with (
  'connector' = 'hudi',
  'table.type' = 'MERGE_ON_READ',
  'path' = 'hdfs://bigdata:8020/user/hudi/warehouse/test/student',
  'read.start-commit' = 'earliest',
  'read.end-commit' = '20220622152536101'
  );COPY
代码语言:javascript
复制
SET 'sql-client.execution.result-mode' = 'tableau';

select * from student_view_1;
COPY

得到如下10条数据

hudi时间旅行查询完整版
hudi时间旅行查询完整版

现在查询时间戳小于20220622152707516的数据,再次创建hudi表

代码语言:javascript
复制
create table student_view_11(
  `id` INT COMMENT '自增id',
  `name` STRING NOT NULL COMMENT '学生名字',
  `school` STRING NOT NULL COMMENT '学校名字',
  `nickname` STRING NOT NULL COMMENT '学生小名',
  `age` INT NOT NULL COMMENT '学生年龄',
  `class_num` INT NOT NULL COMMENT '班级人数',
  `score` decimal(4,2) NOT NULL COMMENT '成绩',
  `phone` INT NOT NULL COMMENT '电话号码',
  `email` STRING NULL COMMENT '家庭网络邮箱',
  `ip` STRING NULL COMMENT 'IP地址',
  `address` STRING COMMENT '家庭地址',
  `update_time` timestamp NOT NULL COMMENT '更新时间',
  `create_time` timestamp NOT NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
PARTITIONED BY (`school`)
with (
  'connector' = 'hudi',
  'table.type' = 'MERGE_ON_READ',
  'path' = 'hdfs://bigdata:8020/user/hudi/warehouse/test/student',
  'read.start-commit' = 'earliest',
  'read.end-commit' = '20220622152707516'
  );COPY

查询数据,得到:

代码语言:javascript
复制
select * from student_view_11;COPY
hudi时间旅行查询完整版
hudi时间旅行查询完整版

重点

但是,这个_hoodie_commit_time是有保存时间限制的,保存时间由clean.retain_commits配置项和checkpoint interval确定。

clean.retain_commits表示数据写入到hudi的次数

checkpoint interval表示每次checkpoint的时间间隔

当_hoodie_commit_time距离现在的时间超过clean.retain_commits * checkpoint interval,那么数据会被清除。

比如说,现在设置checkpoint的时间间隔为3分钟,设置clean.retain_commits的次数为10次,那么通过_hoodie_commit_time最多只能查询到30分钟以前的数据,再之前的数据查不到。

在本次实验中,设置的checkpoint时间间隔为20秒,clean.retain_commits的次数为30次,也就是说,如果有数据持续写入的话,通过时间戳的方式,最多查询到600秒以内的数据。

changelog 模式下,这个参数可以控制 changelog 的保留时间.

再次往hudi表中持续插入100万条数据。我们发现在数据持续插入600秒之后,之前数据查不到了:

代码语言:javascript
复制
select * from student_view_11;COPY
hudi时间旅行查询完整版
hudi时间旅行查询完整版
代码语言:javascript
复制
select * from student_view_1;COPY
hudi时间旅行查询完整版
hudi时间旅行查询完整版

现在的时间戳是2022062216460000,我们查询这个时间之前600秒的数据

代码语言:javascript
复制
create table student_view_33(
  `id` INT COMMENT '自增id',
  `name` STRING NOT NULL COMMENT '学生名字',
  `school` STRING NOT NULL COMMENT '学校名字',
  `nickname` STRING NOT NULL COMMENT '学生小名',
  `age` INT NOT NULL COMMENT '学生年龄',
  `class_num` INT NOT NULL COMMENT '班级人数',
  `score` decimal(4,2) NOT NULL COMMENT '成绩',
  `phone` INT NOT NULL COMMENT '电话号码',
  `email` STRING NULL COMMENT '家庭网络邮箱',
  `ip` STRING NULL COMMENT 'IP地址',
  `address` STRING COMMENT '家庭地址',
  `update_time` timestamp NOT NULL COMMENT '更新时间',
  `create_time` timestamp NOT NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
PARTITIONED BY (`school`)
with (
  'connector' = 'hudi',
  'table.type' = 'MERGE_ON_READ',
  'path' = 'hdfs://bigdata:8020/user/hudi/warehouse/test/student',
  'read.start-commit' = '20220622163600000',
  'read.end-commit' = '20220622164600000'
  );COPY

查询数据总量

代码语言:javascript
复制
select count(*) from student_view_33;COPY

得到:

hudi时间旅行查询完整版
hudi时间旅行查询完整版

本文为从大数据到人工智能博主「xiaozhch5」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

原文链接:https://lrting.top/backend/bigdata/hudi/hudi-basic/6828/

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 使用datafaker生成测试数据
  • 将测试数据同步到hudi表
  • 查看hudi的commit_time
  • 重点
相关产品与服务
云数据库 SQL Server
腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档