前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >数据湖(六):Hudi与Flink整合

数据湖(六):Hudi与Flink整合

原创
作者头像
Lansonli
发布2022-06-03 09:04:07
9300
发布2022-06-03 09:04:07
举报
文章被收录于专栏:Lansonli技术博客Lansonli技术博客

​Hudi与Flink整合

Hudi0.8.0版本与Flink1.12.x之上版本兼容,目前经过测试,Hudi0.8.0版本开始支持Flink,通过Flink写数据到Hudi时,必须开启checkpoint,至少有5次checkpoint后才能看到对应hudi中的数据。

但是应该是有一些问题,目前问题如下:

  • 在本地执行Flink代码向Flink写数据时,存在“java.lang.AbstractMethodError: Method org/apache/hudi/sink/StreamWriteOperatorCoordinator.notifyCheckpointComplete(J)V is abstract”错误信息,预计是hudi版本支持问题。
  • 写入到Flink中的数据,如果使用Flink读取出来,会有对应的错误:“Exception in thread "main" org.apache.hudi.exception.HoodieException: Get table avro schema error”,这个错误主要是由于上一个错误导致Hudi中没有commit信息,在内部读取时,读取不到Commit信息导致。

一、maven pom.xml导入如下包

代码语言:javascript
复制
<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <flink.version>1.12.1</flink.version>
</properties>

<dependencies>
    <!-- Flink操作Hudi需要的包-->
    <dependency>
        <groupId>org.apache.hudi</groupId>
        <artifactId>hudi-flink-bundle_2.11</artifactId>
        <version>0.8.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <!-- java 开发Flink所需依赖 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <!-- Flink 开发Scala需要导入以下依赖 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <!-- 读取hdfs文件需要jar包-->
    <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.9.2</version>
    </dependency>
    <!-- Flink 状态管理 RocksDB 依赖 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <!-- Flink Kafka连接器的依赖 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-csv</artifactId>
        <version>1.12.1</version>
    </dependency>

    <!-- Flink SQL & Table-->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <!-- Flink SQL中使用Blink 需要导入的包-->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner-blink_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
</dependencies>

二、Flink 写入数据到Hudi代码

代码语言:javascript
复制
//1.创建对象
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env,EnvironmentSettings.newInstance()
    .useBlinkPlanner().inStreamingMode().build())

    import org.apache.flink.streaming.api.scala._

    //2.必须开启checkpoint 默认有5个checkpoint后,hudi目录下才会有数据,不然只有一个.hoodie目录。
    env.enableCheckpointing(2000)
//    env.setStateBackend(new RocksDBStateBackend("hdfs://mycluster/flinkstate"))

    //3.设置并行度
    env.setParallelism(1)

    //4.读取Kakfa 中的数据
    tableEnv.executeSql(
      """
        | create table kafkaInputTable(
        |  id varchar,
        |  name varchar,
        |  age int,
        |  ts varchar,
        |  loc varchar
        | ) with (
        |  'connector' = 'kafka',
        |  'topic' = 'test_tp',
        |  'properties.bootstrap.servers'='node1:9092,node2:9092,node3:9092',
        |  'scan.startup.mode'='latest-offset',
        |  'properties.group.id' = 'testgroup',
        |  'format' = 'csv'
        | )
      """.stripMargin)

    val table: Table = tableEnv.from("kafkaInputTable")

    //5.创建Flink 对应的hudi表
    tableEnv.executeSql(
      """
        |CREATE TABLE t1(
        |  id VARCHAR(20) PRIMARY KEY NOT ENFORCED,--默认主键列为uuid,这里可以后面跟上“PRIMARY KEY NOT ENFORCED”指定为主键列
        |  name VARCHAR(10),
        |  age INT,
        |  ts VARCHAR(20),
        |  loc VARCHAR(20)
        |)
        |PARTITIONED BY (loc)
        |WITH (
        |  'connector' = 'hudi',
        |  'path' = '/flink_hudi_data',
        |  'write.tasks' = '1', -- default is 4 ,required more resource
        |  'compaction.tasks' = '1', -- default is 10 ,required more resource
        |  'table.type' = 'COPY_ON_WRITE' -- this creates a MERGE_ON_READ table, by default is COPY_ON_WRITE
        |)
      """.stripMargin)

    //6.向表中插入数据
    tableEnv.executeSql(
      s"""
         | insert into t1 select id,name,age,ts,loc from ${table}
      """.stripMargin)

    env.execute()

以上代码需要注意“PRIMARY KEY NOT ENFORCED”可以不指定,如果不指定hudi对应的主键列默认是“uuid”,指定后可以使用自定义的列名当做主键。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ​Hudi与Flink整合
    • 一、maven pom.xml导入如下包
      • 二、Flink 写入数据到Hudi代码
      相关产品与服务
      数据湖计算 DLC
      数据湖计算DLC(Data Lake Compute,DLC)提供了敏捷高效的数据湖分析与计算服务。服务采用无服务器架构(Serverless),开箱即用。使用标准SQL语法即可完成数据处理、多源数据联合计算等数据工作,有效降低用户数据分析服务搭建成本及使用成本,提高企业数据敏捷度。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档