首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >湖仓一体电商项目(十八):业务实现之编写写入DWD层业务代码

湖仓一体电商项目(十八):业务实现之编写写入DWD层业务代码

原创
作者头像
Lansonli
发布2022-10-23 06:47:22
发布2022-10-23 06:47:22
39900
代码可运行
举报
文章被收录于专栏:Lansonli技术博客Lansonli技术博客
运行总次数:0
代码可运行

​业务实现之编写写入DWD层业务代码

一、代码编写

Flink读取Kafka topic “KAFKA-ODS-TOPIC” 数据写入Iceberg-DWD层也是复用第一个业务代码,这里只需要在代码中加入写入Iceberg-DWD层代码即可,代码如下:

代码语言:javascript
代码运行次数:0
运行
复制
//插入 iceberg - dwd 层 会员浏览商品日志信息 :DWD_BROWSELOG
tblEnv.executeSql(
  s"""
    |insert into hadoop_iceberg.icebergdb.DWD_BROWSELOG
    |select
    | log_time,
    | user_id2,
    | user_ip,
    | front_product_url,
    | browse_product_url,
    | browse_product_tpcode,
    | browse_product_code,
    | obtain_points
    | from ${table} where iceberg_ods_tbl_name = 'ODS_BROWSELOG'
  """.stripMargin)

另外,在Flink处理此topic中每条数据时都有获取对应写入后续Kafka topic信息,本业务对应的每条用户日志数据写入的kafka topic为“KAFKA-DWD-BROWSE-LOG-TOPIC”,所以代码可以复用。

二、​​​​​​​​​​​​​​创建Iceberg-DWD层表

代码在执行之前需要在Hive中预先创建对应的Iceberg表,创建Icebreg表方式如下:

1、在Hive中添加Iceberg表格式需要的包

启动HDFS集群,node1启动Hive metastore服务,在Hive客户端启动Hive添加Iceberg依赖包:

代码语言:javascript
代码运行次数:0
运行
复制
#node1节点启动Hive metastore服务
[root@node1 ~]# hive --service metastore &

#在hive客户端node3节点加载两个jar包
add jar /software/hive-3.1.2/lib/iceberg-hive-runtime-0.12.1.jar;
add jar /software/hive-3.1.2/lib/libfb303-0.9.3.jar;

2、创建Iceberg表

这里创建Iceberg-DWD表有“DWD_BROWSELOG”,创建语句如下:

代码语言:javascript
代码运行次数:0
运行
复制
CREATE TABLE DWD_BROWSELOG  (
 log_time string,
 user_id string,
 user_ip string,
 front_product_url string,
 browse_product_url string,
 browse_product_tpcode string,
 browse_product_code string,
 obtain_points string
)STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' 
LOCATION 'hdfs://mycluster/lakehousedata/icebergdb/DWD_BROWSELOG/' 
TBLPROPERTIES ('iceberg.catalog'='location_based_table',
'write.metadata.delete-after-commit.enabled'= 'true',
'write.metadata.previous-versions-max' = '3'
);

三、代码测试

以上代码编写完成后,代码执行测试步骤如下:

1、在Kafka中创建对应的topic

代码语言:javascript
代码运行次数:0
运行
复制
#在Kafka 中创建 KAFKA-DWD-BROWSE-LOG-TOPIC topic
./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-DWD-BROWSE-LOG-TOPIC --partitions 3 --replication-factor 3

#监控以上topic数据
[root@node1 bin]# ./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic KAFKA-DWD-BROWSE-LOG-TOPIC

2、将代码中消费Kafka数据改成从头开始消费

代码中Kafka Connector中属性“scan.startup.mode”设置为“earliest-offset”,从头开始消费数据。

这里也可以不设置从头开始消费Kafka数据,而是直接启动向日志采集接口模拟生产日志代码“RTMockUserLogData.java”,需要启动日志采集接口及Flume。

3、执行代码,查看对应结果

以上代码执行后在,在对应的Kafka “KAFKA-DWD-BROWSE-LOG-TOPIC” topic中都有对应的数据。在Iceberg-DWD层中对应的表中也有数据。

Kafka中结果如下:

Iceberg-DWD层表”DWD_BROWSELOG”中的数据如下:

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ​业务实现之编写写入DWD层业务代码
    • 一、代码编写
    • 二、​​​​​​​​​​​​​​创建Iceberg-DWD层表
      • 1、在Hive中添加Iceberg表格式需要的包
      • 2、创建Iceberg表
    • 三、代码测试
      • 1、在Kafka中创建对应的topic
      • 2、将代码中消费Kafka数据改成从头开始消费
      • 3、执行代码,查看对应结果
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档