前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >数据湖(十九):SQL API 读取Kafka数据实时写入Iceberg表

数据湖(十九):SQL API 读取Kafka数据实时写入Iceberg表

原创
作者头像
Lansonli
发布2022-07-24 02:13:14
8170
发布2022-07-24 02:13:14
举报
文章被收录于专栏:Lansonli技术博客Lansonli技术博客

SQL API 读取Kafka数据实时写入Iceberg表

从Kafka中实时读取数据写入到Iceberg表中,操作步骤如下:

一、首先需要创建对应的Iceberg表

代码语言:javascript
复制
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);
env.enableCheckpointing(1000);
//1.创建Catalog
tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +
        "'type'='iceberg'," +
        "'catalog-type'='hadoop'," +
        "'warehouse'='hdfs://mycluster/flink_iceberg')");
//2.创建iceberg表 flink_iceberg_tbl
tblEnv.executeSql("create table hadoop_iceberg.iceberg_db.flink_iceberg_tbl3(id int,name string,age int,loc string) partitioned by (loc)");

二、编写代码读取Kafka数据实时写入Iceberg

代码语言:javascript
复制
public class ReadKafkaToIceberg {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);
        env.enableCheckpointing(1000);

        /**
         * 1.需要预先创建 Catalog 及Iceberg表
         */
        //1.创建Catalog
        tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +
                "'type'='iceberg'," +
                "'catalog-type'='hadoop'," +
                "'warehouse'='hdfs://mycluster/flink_iceberg')");

        //2.创建iceberg表 flink_iceberg_tbl
//        tblEnv.executeSql("create table hadoop_iceberg.iceberg_db.flink_iceberg_tbl3(id int,name string,age int,loc string) partitioned by (loc)");

        //3.创建 Kafka Connector,连接消费Kafka中数据
        tblEnv.executeSql("create table kafka_input_table(" +
                " id int," +
                " name varchar," +
                " age int," +
                " loc varchar" +
                ") with (" +
                " 'connector' = 'kafka'," +
                " 'topic' = 'flink-iceberg-topic'," +
                " 'properties.bootstrap.servers'='node1:9092,node2:9092,node3:9092'," +
                " 'scan.startup.mode'='latest-offset'," +
                " 'properties.group.id' = 'my-group-id'," +
                " 'format' = 'csv'" +
                ")");

        //4.配置 table.dynamic-table-options.enabled
        Configuration configuration = tblEnv.getConfig().getConfiguration();
        // 支持SQL语法中的 OPTIONS 选项
        configuration.setBoolean("table.dynamic-table-options.enabled", true);

        //5.写入数据到表 flink_iceberg_tbl3
        tblEnv.executeSql("insert into hadoop_iceberg.iceberg_db.flink_iceberg_tbl3 select id,name,age,loc from kafka_input_table");

        //6.查询表数据
        TableResult tableResult = tblEnv.executeSql("select * from hadoop_iceberg.iceberg_db.flink_iceberg_tbl3 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/");
        tableResult.print();
    }
}

启动以上代码,向Kafka topic中生产如下数据:

代码语言:javascript
复制
1,zs,18,beijing
2,ls,19,shanghai
3,ww,20,beijing
4,ml,21,shanghai

我们可以看到控制台上有对应实时数据输出,查看对应的Icberg HDFS目录,数据写入成功。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ​SQL API 读取Kafka数据实时写入Iceberg表
    • 一、首先需要创建对应的Iceberg表
      • 二、编写代码读取Kafka数据实时写入Iceberg
      相关产品与服务
      数据湖计算 DLC
      数据湖计算DLC(Data Lake Compute,DLC)提供了敏捷高效的数据湖分析与计算服务。服务采用无服务器架构(Serverless),开箱即用。使用标准SQL语法即可完成数据处理、多源数据联合计算等数据工作,有效降低用户数据分析服务搭建成本及使用成本,提高企业数据敏捷度。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档