专栏首页大数据技术与应用实战flink教程-详解flink 1.11 中的JDBC Catalog

flink教程-详解flink 1.11 中的JDBC Catalog

  • 背景
  • 示例
  • 源码解析
    • AbstractJdbcCatalog
    • PostgresCatalog

背景

1.11.0 之前,用户如果依赖 Flink 的 source/sink 读写关系型数据库或读取 changelog 时,必须要手动创建对应的 schema。但是这样会有一个问题,当数据库中的 schema 发生变化时,也需要手动更新对应的 Flink 任务以保持类型匹配,任何不匹配都会造成运行时报错使作业失败。这个操作冗余且繁琐,体验极差。

实际上对于任何和 Flink 连接的外部系统都可能有类似的上述问题,在 1.11.0 中重点解决了和关系型数据库对接的这个问题。提供了 JDBC catalog 的基础接口以及 Postgres catalog 的实现,这样方便后续实现与其它类型的关系型数据库的对接。

1.11.0 版本后,用户使用 Flink SQL 时可以自动获取表的 schema 而不再需要输入 DDL。除此之外,任何 schema 不匹配的错误都会在编译阶段提前进行检查报错,避免了之前运行时报错造成的作业失败。

示例

目前对于jdbc catalog,flink仅提供了postgres catalog,我们基于postgres的catalog讲解一下如何使用flink的catalog ,

  • 引入pom
   <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <version>42.2.5</version>
        </dependency>
  • 新建PostgresCatalog 目前flink通过一个静态类来创建相相应的jdbc  catalog,对于PostgresCatalog,没有提供public类型的构造方法。

通过JdbcCatalogUtils.createCatalog构造PostgresCatalog时这五个参数都是必填项,其中baseUrl要求是不能带有数据库名的

  String catalogName = "mycatalog";
  String defaultDatabase = "postgres";
  String username = "postgres";
  String pwd = "postgres";
  String baseUrl = "jdbc:postgresql://localhost:5432/";

  PostgresCatalog postgresCatalog = (PostgresCatalog) JdbcCatalogUtils.createCatalog(
    catalogName,
    defaultDatabase,
    username,
    pwd,
    baseUrl);

访问postgres 数据库指定表名的时候完整的路径名应该是以下格式:

<catalog>.<db>.`<schema.table>`

其中schema默认是public,如果是使用缺省值,public是可以省略的。比如下面的查询语句:

SELECT * FROM mypg.mydb.test_table;
SELECT * FROM mydb.test_table;
SELECT * FROM test_table;

如果非缺省schema,则不能被省略:

SELECT * FROM mypg.mydb.`custom_schema.test_table2`
SELECT * FROM mydb.`custom_schema.test_table2`;
SELECT * FROM `custom_schema.test_table2`;
  • 常见操作

我们PostgresCatalog将注册到StreamTableEnvironment 的变量tEnv中,然后就可以用tEnv进行一些操作了。

 tEnv.registerCatalog(postgresCatalog.getName(), postgresCatalog);
  tEnv.useCatalog(postgresCatalog.getName());
  1. 列出来所有的数据库:
        System.out.println("list databases :");
  String[] databases = tEnv.listDatabases();
  Stream.of(databases).forEach(System.out::println);
  1. 列出来所有的table
     tEnv.useDatabase(defaultDatabase);
  System.out.println("list tables :");
  String[] tables = tEnv.listTables(); // 也可以使用  postgresCatalog.listTables(defaultDatabase);
  Stream.of(tables).forEach(System.out::println);
  1. 列出所有函数
        System.out.println("list functions :");
  String[] functions = tEnv.listFunctions();
  Stream.of(functions).forEach(System.out::println);
  1. 获取table的schema
 CatalogBaseTable catalogBaseTable = postgresCatalog.getTable(new ObjectPath(
    defaultDatabase,
    "table1"));

  TableSchema tableSchema = catalogBaseTable.getSchema();
  System.out.println("tableSchema --------------------- :");
  System.out.println(tableSchema);
  1. 查询表的数据
  List<Row> results = Lists.newArrayList(tEnv.sqlQuery("select * from table1")
                                             .execute()
                                             .collect());
  results.stream().forEach(System.out::println);
  1. 插入数据
tEnv.executeSql("insert into table1 values (3,'c')");

完整的代码请参考:

https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/catalog/PostgresCatalogTest.java

源码解析

AbstractJdbcCatalog

这个类主要是对jdbc catalog一些公共的操作做了抽象.目前实现了实际功能的只有一个方法:getPrimaryKey,其他方式主要是对于Catalog的一些其他实现类做了特殊处理,比如类似create table 或者 alter table是不支持的,listView只是返回一个空列表,因为我们使用jdbc catalog主要是来做一些DML操作。

 @Override
 public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
  throw new UnsupportedOperationException();
 }

 @Override
 public List<String> listViews(String databaseName) throws DatabaseNotExistException, CatalogException {
  return Collections.emptyList();
 }

PostgresCatalog

在这里面,主要是实现了一些常用的操作数据库的方法,比如getTable、listTables、listDatabases等等,其实简单的来说就是从postgres元数据库里查询出来相应的信息,然后组装成flink的相关对象,返回给调用方。以一个简单的方法listDatabases为例:

从元数据表pg_database中查询所有的tablename,然后去掉内置的数据库,也就是template0和template1,然后封装到一个list对象里,返回。

 @Override
 public List<String> listDatabases() throws CatalogException {
  List<String> pgDatabases = new ArrayList<>();

  try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {

   PreparedStatement ps = conn.prepareStatement("SELECT datname FROM pg_database;");

   ResultSet rs = ps.executeQuery();

   while (rs.next()) {
    String dbName = rs.getString(1);
    if (!builtinDatabases.contains(dbName)) {
     pgDatabases.add(rs.getString(1));
    }
   }

   return pgDatabases;
  } catch (Exception e) {
   throw new CatalogException(
    String.format("Failed listing database in catalog %s", getName()), e);
  }
 }

有不兼容的地方需要做一些转换,比如getTable方法,有些数据类型是不匹配的,要做一些类型的匹配,如postgres里面的serial和int4都会转成flink的int类型,具体的参考下PostgresCatalog#fromJDBCType方法。

 private DataType fromJDBCType(ResultSetMetaData metadata, int colIndex) throws SQLException {
  String pgType = metadata.getColumnTypeName(colIndex);

  int precision = metadata.getPrecision(colIndex);
  int scale = metadata.getScale(colIndex);

  switch (pgType) {
   case PG_BOOLEAN:
    return DataTypes.BOOLEAN();
   case PG_BOOLEAN_ARRAY:
    return DataTypes.ARRAY(DataTypes.BOOLEAN());
   case PG_BYTEA:
    return DataTypes.BYTES();
    .........................

参考资料: [1].https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html

本文分享自微信公众号 - 大数据技术与应用实战(bigdata_bigdata),作者:zhangjun

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-08-06

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Flink 1.11:更好用的流批一体 SQL 引擎

    许多的数据科学家,分析师和 BI 用户依赖交互式 SQL 查询分析数据。Flink SQL 是 Flink 的核心模块之一。作为一个分布式的 SQL 查询引擎。...

    数据社
  • Flink 1.11中对接Hive新特性及如何构建数仓体系

    导读:Flink从1.9.0开始提供与Hive集成的功能,随着几个版本的迭代,在最新的Flink 1.11中,与Hive集成的功能进一步深化,并且开始尝试将流计...

    Spark学习技巧
  • Hive 终于等来了 Flink

    其实比较也没啥意义,不同社区发展的目标总是会有差异,而且 Flink 在真正的实时流计算方面投入的精力很多。不过笔者想表达的是,Apache Hive 已经成为...

    Fayson
  • 基于 Flink SQL CDC 的实时数据同步方案

    Flink 1.11 引入了 Flink SQL CDC,CDC 能给我们数据和业务间能带来什么变化?本文由 Apache Flink PMC,阿里巴巴技术专家...

    Spark学习技巧
  • Nebula Flink Connector 的原理和实践

    摘要:本文所介绍 Nebula Graph 连接器 Nebula Flink Connector,采用类似 Flink 提供的 Flink Connector ...

    NebulaGraph
  • FlinkSQL演进过程,解析原理及一些优化策略

    flink 1.9之前的版本,对于Table API和SQL的底层实现结构如下图,可以看处流处理和批处理有各自独立的api (流处理DataStream,批处理...

    Spark学习技巧
  • Flink集成Iceberg小小实战

    Apache Iceberg is an open table format for huge analytic datasets. Iceberg adds ...

    大数据真好玩
  • flink教程-详解flink 1.11 中的CDC (Change Data Capture)

    CDC,Change Data Capture,变更数据获取的简称,使用CDC我们可以从数据库中获取已提交的更改并将这些更改发送到下游,供下游使用。这些变更可以...

    大数据技术与应用实战
  • 干货 | 五千字长文带你快速入门FlinkSQL

    最近几天因为工作比较忙,已经几天没有及时更新文章了,在这里先给小伙伴们说声抱歉…临近周末,再忙再累,我也要开始发力了。接下来的几天,菌哥将为大家...

    大数据梦想家
  • 详解flink中Look up维表的使用

    在流式计算中,维表是一个很常见的概念,一般用于sql的join中,对流式数据进行数据补全,比如我们的source stream是来自日志的订单数据,但是日志中我...

    大数据技术与应用实战
  • Flink SQL Blink Planner 执行流程解析(上)

    大数据领域SQL化的风潮方兴未艾(所谓"Everybody knows SQL"),Flink自然也不能“免俗”。Flink SQL是Flink系统内部最高级别...

    zhisheng
  • 深入分析 Flink SQL 工作机制

    摘要:本文整理自 Flink Forward 2020 全球在线会议中文精华版,由 Apache Flink PMC 伍翀(云邪)分享,社区志愿者陈婧敏(清樾)...

    Spark学习技巧
  • Flink SQL CDC 上线!我们总结了 13 条生产实践经验

    摘要:7月,Flink 1.11 新版发布,在生态及易用性上有大幅提升,其中 Table & SQL 开始支持 Change Data Capture(CDC)...

    zhisheng
  • 《你问我答》第四期 | 进一步讲解SuperSQL、Oceanus以及Tbase

    ? 各位小伙伴们大家好,我们又见面啦~ 这里是《你问我答》栏目第四期 上周推送了一篇关于腾讯SuperSQL的文章 《「解耦」方能「专注」——腾讯天穹Sup...

    腾讯大数据
  • CSA1.4新功能

    3 月底,作为 Cloudera Streaming Analytics 1.3 的一部分,我们发布了Cloudera SQL Stream Builder的第...

    大数据杂货铺
  • 当 TiDB 与 Flink 相结合:高效、易用的实时数仓

    随着互联网飞速发展,企业业务种类会越来越多,业务数据量会越来越大,当发展到一定规模时,传统的数据存储结构逐渐无法满足企业需求,实时数据仓库就变成了一个必要的基础...

    PingCAP
  • Flink1.12集成Hive打造自己的批流一体数仓

    小编在去年之前分享过参与的实时数据平台的建设,关于实时数仓也进行过分享。客观的说,我们当时做不到批流一体,小编当时的方案是将实时消息数据每隔15分钟文件同步到离...

    王知无-import_bigdata
  • Flink 与 TiDB 构建高效易用的实时数仓

    随着互联网飞速发展,企业业务种类会越来越多,业务数据量会越来越大,当发展到一定规模时,传统的数据存储结构逐渐无法满足企业需求,实时数据仓库就变成了一个必要的基础...

    zhisheng
  • Flink SQL 客户端如何使用

    Flink 的 Table & SQL API 可以处理 SQL 语言编写的查询语句,但是这些查询需要嵌入用 Java 或 Scala 编写的 Table 程序...

    smartsi

扫码关注云+社区

领取腾讯云代金券