前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >实操 | Flink1.12.1通过Table API / Flink SQL读取HBase2.4.0

实操 | Flink1.12.1通过Table API / Flink SQL读取HBase2.4.0

作者头像
王知无-import_bigdata
发布2021-05-07 10:49:07
2.5K1
发布2021-05-07 10:49:07
举报

昨天群里有人问 Flink 1.12 读取Hbase的问题,于是看到这篇文章分享给大家。本文作者Ashiamd。

1. 环境

废话不多说,这里用到的环境如下(不确定是否都必要,但是至少我是这个环境)

  • zookeeper 3.6.2
  • Hbase 2.4.0
  • Flink 1.12.1
2. HBase表
代码语言:javascript
复制
# 创建表
create 'u_m_01' , 'u_m_r'

# 插入数据
put 'u_m_01', 'a,A', 'u_m_r:r' , '1'
put 'u_m_01', 'a,B', 'u_m_r:r' , '3'
put 'u_m_01', 'b,B', 'u_m_r:r' , '3'
put 'u_m_01', 'b,C', 'u_m_r:r' , '4'
put 'u_m_01', 'c,A', 'u_m_r:r' , '2'
put 'u_m_01', 'c,C', 'u_m_r:r' , '5'
put 'u_m_01', 'c,D', 'u_m_r:r' , '1'
put 'u_m_01', 'd,B', 'u_m_r:r' , '5'
put 'u_m_01', 'd,D', 'u_m_r:r' , '2'
put 'u_m_01', 'e,A', 'u_m_r:r' , '3'
put 'u_m_01', 'e,B', 'u_m_r:r' , '2'
put 'u_m_01', 'f,A', 'u_m_r:r' , '1'
put 'u_m_01', 'f,B', 'u_m_r:r' , '2'
put 'u_m_01', 'f,D', 'u_m_r:r' , '3'
put 'u_m_01', 'g,C', 'u_m_r:r' , '1'
put 'u_m_01', 'g,D', 'u_m_r:r' , '4'
put 'u_m_01', 'h,A', 'u_m_r:r' , '1'
put 'u_m_01', 'h,B', 'u_m_r:r' , '2'
put 'u_m_01', 'h,C', 'u_m_r:r' , '4'
put 'u_m_01', 'h,D', 'u_m_r:r' , '5'
3. pom依赖
  • jdk1.8
  • Flink1.12.1 使用的pom依赖如下(有些是多余的)
代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flink-hive-hbase</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.12.1</flink.version>
        <scala.binary.version>2.12</scala.binary.version>
        <hive.version>3.1.2</hive.version>
        <mysql.version>8.0.19</mysql.version>
        <hbase.version>2.4.0</hbase.version>
    </properties>


    <dependencies>

        <!-- Flink -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- HBase -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-hbase-2.2_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
        </dependency>


        <!--        &lt;!&ndash; JDBC &ndash;&gt;-->
<!--        <dependency>-->
<!--            <groupId>org.apache.flink</groupId>-->
<!--            <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>-->
<!--            <version>${flink.version}</version>-->
<!--        </dependency>-->

<!--        &lt;!&ndash; mysql &ndash;&gt;-->
<!--        <dependency>-->
<!--            <groupId>mysql</groupId>-->
<!--            <artifactId>mysql-connector-java</artifactId>-->
<!--            <version>${mysql.version}</version>-->
<!--        </dependency>-->

<!--        &lt;!&ndash; Hive Dependency &ndash;&gt;-->
<!--        <dependency>-->
<!--            <groupId>org.apache.hive</groupId>-->
<!--            <artifactId>hive-exec</artifactId>-->
<!--            <version>${hive.version}</version>-->
<!--        </dependency>-->
<!--        <dependency>-->
<!--            <groupId>org.apache.flink</groupId>-->
<!--            <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>-->
<!--            <version>${flink.version}</version>-->
<!--        </dependency>-->

        <!-- Table API -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- csv -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>

<!--        &lt;!&ndash; Lombok &ndash;&gt;-->
<!--        <dependency>-->
<!--            <groupId>org.projectlombok</groupId>-->
<!--            <artifactId>lombok</artifactId>-->
<!--            <version>1.18.18</version>-->
<!--        </dependency>-->

    </dependencies>
</project>
4. Flink-Java代码

用到的pojo类

代码语言:javascript
复制
package entity;
import java.io.Serializable;

public class UserMovie implements Serializable {
    @Override
    public String toString() {
        return "UserMovie{" +
                "userId='" + userId + '\'' +
                ", movieId='" + movieId + '\'' +
                ", ratting=" + ratting +
                '}';
    }

    public static long getSerialVersionUID() {
        return serialVersionUID;
    }

    public String getUserId() {
        return userId;
    }

    public void setUserId(String userId) {
        this.userId = userId;
    }

    public String getMovieId() {
        return movieId;
    }

    public void setMovieId(String movieId) {
        this.movieId = movieId;
    }

    public Double getRatting() {
        return ratting;
    }

    public void setRatting(Double ratting) {
        this.ratting = ratting;
    }

    public UserMovie() {
    }

    public UserMovie(String userId, String movieId, Double ratting) {
        this.userId = userId;
        this.movieId = movieId;
        this.ratting = ratting;
    }

    private static final long serialVersionUID = 256158274329337559L;

    private String userId;

    private String movieId;

    private Double ratting;

}

实际测试代码

代码语言:javascript
复制
package hbase;

import com.nimbusds.jose.util.IntegerUtils;
import entity.UserMovie;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.StringUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;

public class HBaseTest_01   {
    public static void main(String[] args) throws Exception {
        // 批执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 表环境
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inBatchMode()
                .build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);

        // 创建用户-电影表 u_m
        TableResult tableResult = tableEnv.executeSql(
                "CREATE TABLE u_m (" +
                        " rowkey STRING," +
                        " u_m_r ROW<r STRING>," +
                        " PRIMARY KEY (rowkey) NOT ENFORCED" +
                        " ) WITH (" +
                        " 'connector' = 'hbase-2.2' ," +
                        " 'table-name' = 'default:u_m_01' ," +
                        " 'zookeeper.quorum' = '127.0.0.1:2181'" +
                        " )");

        // 查询是否能获取到HBase里的数据
//        Table table = tableEnv.sqlQuery("SELECT rowkey, u_m_r FROM u_m");

        // 相当于 scan
        Table table = tableEnv.sqlQuery("SELECT * FROM u_m");

        // 查询的结果
        TableResult executeResult = table.execute();

        // 获取查询结果
        CloseableIterator<Row> collect = executeResult.collect();

        // 输出 (执行print或者下面的 Consumer之后,数据就被消费了。两个只能留下一个)
        executeResult.print();

        List<UserMovie> userMovieList = new ArrayList<>();

        collect.forEachRemaining(new Consumer<Row>() {
            @Override
            public void accept(Row row) {
                String field0 = String.valueOf(row.getField(0));
                String[] user_movie = field0.split(",");
                Double ratting = Double.valueOf(String.valueOf(row.getField(1)));
                userMovieList.add(new UserMovie(user_movie[0],user_movie[1],ratting));
            }
        });


        System.out.println("................");

        for(UserMovie um : userMovieList){
            System.out.println(um);
        }
    }
}
5. 输出
  1. 没有注解掉第59行代码executeResult.print();时
代码语言:javascript
复制
+--------------------------------+--------------------------------+
|                         rowkey |                          u_m_r |
+--------------------------------+--------------------------------+
|                            a,A |                              1 |
|                            a,B |                              3 |
|                            b,B |                              3 |
|                            b,C |                              4 |
|                            c,A |                              2 |
|                            c,C |                              5 |
|                            c,D |                              1 |
|                            d,B |                              5 |
|                            d,D |                              2 |
|                            e,A |                              3 |
|                            e,B |                              2 |
|                            f,A |                              1 |
|                            f,B |                              2 |
|                            f,D |                              3 |
|                            g,C |                              1 |
|                            g,D |                              4 |
|                            h,A |                              1 |
|                            h,B |                              2 |
|                            h,C |                              4 |
|                            h,D |                              5 |
+--------------------------------+--------------------------------+
20 rows in set
................
  1. 注解掉第59行代码executeResult.print();时
代码语言:javascript
复制
................
UserMovie{userId='a', movieId='A', ratting=1.0}
UserMovie{userId='a', movieId='B', ratting=3.0}
UserMovie{userId='b', movieId='B', ratting=3.0}
UserMovie{userId='b', movieId='C', ratting=4.0}
UserMovie{userId='c', movieId='A', ratting=2.0}
UserMovie{userId='c', movieId='C', ratting=5.0}
UserMovie{userId='c', movieId='D', ratting=1.0}
UserMovie{userId='d', movieId='B', ratting=5.0}
UserMovie{userId='d', movieId='D', ratting=2.0}
UserMovie{userId='e', movieId='A', ratting=3.0}
UserMovie{userId='e', movieId='B', ratting=2.0}
UserMovie{userId='f', movieId='A', ratting=1.0}
UserMovie{userId='f', movieId='B', ratting=2.0}
UserMovie{userId='f', movieId='D', ratting=3.0}
UserMovie{userId='g', movieId='C', ratting=1.0}
UserMovie{userId='g', movieId='D', ratting=4.0}
UserMovie{userId='h', movieId='A', ratting=1.0}
UserMovie{userId='h', movieId='B', ratting=2.0}
UserMovie{userId='h', movieId='C', ratting=4.0}
UserMovie{userId='h', movieId='D', ratting=5.0}
注意

这里我们在Flink在SQL里面定义HBase的Table时,指定的字段都是用的STRING类型,虽然本来应该是INT,但是用INT的时候,报错了,改成INT就ok了。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-04-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据技术与架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 环境
  • 2. HBase表
  • 3. pom依赖
  • 4. Flink-Java代码
  • 5. 输出
  • 注意
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档