前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >数据湖(十八):Flink与Iceberg整合SQL API操作

数据湖(十八):Flink与Iceberg整合SQL API操作

原创
作者头像
Lansonli
发布2022-07-22 06:38:50
1.1K0
发布2022-07-22 06:38:50
举报
文章被收录于专栏:Lansonli技术博客

​Flink与Iceberg整合SQL API操作

Flink SQL 在操作Iceberg时,对应的版本为Flink 1.11.x 与Iceberg0.11.1版本,目前,Flink1.14.2版本与Iceberg0.12.1版本对于SQL API 来说兼容有问题,所以这里使用Flink1.11.6版本与Iceberg0.11.1版本来演示Flink SQL API 操作Iceberg。

一、​​​​​​​SQL API 创建Iceberg表并写入数据

1、创建新项目,导入如下maven依赖包

代码语言:javascript
复制
 <properties>
  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  <maven.compiler.source>1.8</maven.compiler.source>
  <maven.compiler.target>1.8</maven.compiler.target>
  <!-- flink 1.11.x 与Iceberg 0.11.1 合适-->
  <flink.version>1.11.6</flink.version>
  <hadoop.version>3.2.2</hadoop.version>
</properties>

<dependencies>
  <!-- Flink 操作Iceberg 需要的Iceberg依赖 -->
  <dependency>
    <groupId>org.apache.iceberg</groupId>
    <artifactId>iceberg-flink-runtime</artifactId>
    <version>0.11.1</version>
  </dependency>

  <!-- java 开发Flink 所需依赖 -->
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>${flink.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>${flink.version}</version>
  </dependency>

  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.11</artifactId>
    <version>${flink.version}</version>
  </dependency>

  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.11</artifactId>
    <version>${flink.version}</version>
  </dependency>

  <!-- Flink Kafka连接器的依赖 -->
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>${flink.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-csv</artifactId>
    <version>${flink.version}</version>
  </dependency>

  <!-- 读取hdfs文件需要jar包-->
  <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>${hadoop.version}</version>
  </dependency>

  <!-- Flink SQL & Table-->
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-runtime-blink_2.11</artifactId>
    <version>${flink.version}</version>
  </dependency>

  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table</artifactId>
    <version>${flink.version}</version>
  </dependency>

  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>${flink.version}</version>
  </dependency>

  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java</artifactId>
    <version>${flink.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge_2.11</artifactId>
    <version>${flink.version}</version>
  </dependency>

  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.11</artifactId>
    <version>${flink.version}</version>
  </dependency>

  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_2.11</artifactId>
    <version>${flink.version}</version>
  </dependency>


  <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.11</version>
    <scope>test</scope>
  </dependency>

  <!-- log4j 和slf4j 包,如果在控制台不想看到日志,可以将下面的包注释掉-->
  <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.25</version>
    <scope>test</scope>
  </dependency>
  <dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.17</version>
  </dependency>
  <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.25</version>
  </dependency>
  <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-nop</artifactId>
    <version>1.7.25</version>
    <scope>test</scope>
  </dependency>
  <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-simple</artifactId>
    <version>1.7.5</version>
  </dependency>
</dependencies>

2、编写Flink SQL 创建Iceberg表并写入数据

代码语言:javascript
复制
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);

        env.enableCheckpointing(1000);

        //1.创建Catalog
        tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +
                "'type'='iceberg'," +
                "'catalog-type'='hadoop'," +
                "'warehouse'='hdfs://mycluster/flink_iceberg')");

        //2.使用当前Catalog
        tblEnv.useCatalog("hadoop_iceberg");

        //3.创建数据库
        tblEnv.executeSql("create database iceberg_db");

        //4.使用数据库
        tblEnv.useDatabase("iceberg_db");

        //5.创建iceberg表 flink_iceberg_tbl
        tblEnv.executeSql("create table hadoop_iceberg.iceberg_db.flink_iceberg_tbl2(id int,name string,age int,loc string) partitioned by (loc)");

        //6.写入数据到表 flink_iceberg_tbl
        tblEnv.executeSql("insert into hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 values (1,'zs',18,'beijing'),(2,'ls',19,'shanghai'),(3,'ww',20,'guangzhou')");

3、在Hive中映射Iceberg表并查询

在Hive中执行如下命令创建对应的Iceberg表:

代码语言:javascript
复制
#在Hive中创建Iceberg表
CREATE TABLE flink_iceberg_tbl2  (
  id int, 
  name string,
  age int,
  loc string
)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' 
LOCATION 'hdfs://mycluster/flink_iceberg/iceberg_db/flink_iceberg_tbl2' 
TBLPROPERTIES ('iceberg.catalog'='location_based_table');

代码语言:javascript
复制
#在Hive中查询Iceberg表中的数据
hive> select * from flink_iceberg_tbl2;
OK
3	ww	20	guangzhou
1	zs	18	beijing
2	ls	19	shanghai

二、​​​​​​​​​​​​​​SQL API 批量查询Iceberg表数据

Flink SQL API 批量查询Iceberg表数据,直接查询显示即可。代码如下:

代码语言:javascript
复制
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);

env.enableCheckpointing(1000);

//1.创建Catalog
tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +
    "'type'='iceberg'," +
    "'catalog-type'='hadoop'," +
    "'warehouse'='hdfs://mycluster/flink_iceberg')");
//2.批量读取表数据
TableResult tableResult = tblEnv.executeSql("select * from hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 ");

tableResult.print();

结果如下:

三、​​​​​​​​​​​​​​SQL API 实时查询Iceberg表数据

Flink SQL API 实时查询Iceberg表数据时需要设置参数“table.dynamic-table-options.enabled”为true,以支持SQL语法中的“OPTIONS”选项,代码如下:

代码语言:javascript
复制
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);

env.enableCheckpointing(1000);

Configuration configuration = tblEnv.getConfig().getConfiguration();
// 支持SQL语法中的 OPTIONS 选项
configuration.setBoolean("table.dynamic-table-options.enabled", true);

//1.创建Catalog
tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +
        "'type'='iceberg'," +
        "'catalog-type'='hadoop'," +
        "'warehouse'='hdfs://mycluster/flink_iceberg')");

//2.从Iceberg表当前快照读取所有数据,并继续增量读取数据
// streaming指定为true支持实时读取数据,monitor_interval 监控数据的间隔,默认1s
TableResult tableResult = tblEnv.executeSql("select * from hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/");

tableResult.print();

启动以上代码后,可以看到会将目前存在于Iceberg表中的数据读取出来,向Hive中对应的Iceberg表中插入数据,可以看到控制台实时获取数据。

代码语言:javascript
复制
#在向Hive的Iceberg表中插入数据之前需要加入以下两个包:
add jar /software/hive-3.1.2/lib/iceberg-hive-runtime-0.12.1.jar;
add jar /software/hive-3.1.2/lib/libfb303-0.9.3.jar;

#向Hive 中Iceberg 表插入两条数据
hive> insert into flink_iceberg_tbl2 values (4,'ml',30,'shenzhen'),(5,'tq',31,'beijing');

在控制台可以看到实时新增数据

四、​​​​​​​​​​​​​​SQL API指定基于快照实时增量读取数据

Flink SQL API 还支持基于某个snapshot-id来继续实时获取数据,代码如下:

代码语言:javascript
复制
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);
env.enableCheckpointing(1000);

Configuration configuration = tblEnv.getConfig().getConfiguration();
// 支持SQL语法中的 OPTIONS 选项
configuration.setBoolean("table.dynamic-table-options.enabled", true);

//1.创建Catalog
tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +
      "'type'='iceberg'," +
      "'catalog-type'='hadoop'," +
      "'warehouse'='hdfs://mycluster/flink_iceberg')");

//2.从Iceberg 指定的快照继续实时读取数据,快照ID从对应的元数据中获取
//start-snapshot-id :快照ID
TableResult tableResult2 = tblEnv.executeSql("SELECT * FROM hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-snapshot-id'='3821550127947089987')*/");
tableResult2.print();

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ​Flink与Iceberg整合SQL API操作
    • 一、​​​​​​​SQL API 创建Iceberg表并写入数据
      • 1、创建新项目,导入如下maven依赖包
      • 2、编写Flink SQL 创建Iceberg表并写入数据
      • 3、在Hive中映射Iceberg表并查询
    • 二、​​​​​​​​​​​​​​SQL API 批量查询Iceberg表数据
      • 三、​​​​​​​​​​​​​​SQL API 实时查询Iceberg表数据
        • 四、​​​​​​​​​​​​​​SQL API指定基于快照实时增量读取数据
        相关产品与服务
        数据湖计算 DLC
        数据湖计算DLC(Data Lake Compute,DLC)提供了敏捷高效的数据湖分析与计算服务。服务采用无服务器架构(Serverless),开箱即用。使用标准SQL语法即可完成数据处理、多源数据联合计算等数据工作,有效降低用户数据分析服务搭建成本及使用成本,提高企业数据敏捷度。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档