前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >flink教程-详解flink 1.11 中的JDBC Catalog

flink教程-详解flink 1.11 中的JDBC Catalog

作者头像
大数据技术与应用实战
发布2020-09-15 14:20:55
2.7K0
发布2020-09-15 14:20:55
举报
  • 背景
  • 示例
  • 源码解析
    • AbstractJdbcCatalog
    • PostgresCatalog

背景

1.11.0 之前,用户如果依赖 Flink 的 source/sink 读写关系型数据库或读取 changelog 时,必须要手动创建对应的 schema。但是这样会有一个问题,当数据库中的 schema 发生变化时,也需要手动更新对应的 Flink 任务以保持类型匹配,任何不匹配都会造成运行时报错使作业失败。这个操作冗余且繁琐,体验极差。

实际上对于任何和 Flink 连接的外部系统都可能有类似的上述问题,在 1.11.0 中重点解决了和关系型数据库对接的这个问题。提供了 JDBC catalog 的基础接口以及 Postgres catalog 的实现,这样方便后续实现与其它类型的关系型数据库的对接。

1.11.0 版本后,用户使用 Flink SQL 时可以自动获取表的 schema 而不再需要输入 DDL。除此之外,任何 schema 不匹配的错误都会在编译阶段提前进行检查报错,避免了之前运行时报错造成的作业失败。

示例

目前对于jdbc catalog,flink仅提供了postgres catalog,我们基于postgres的catalog讲解一下如何使用flink的catalog ,

  • 引入pom
代码语言:javascript
复制
   <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <version>42.2.5</version>
        </dependency>
  • 新建PostgresCatalog 目前flink通过一个静态类来创建相相应的jdbc  catalog,对于PostgresCatalog,没有提供public类型的构造方法。

通过JdbcCatalogUtils.createCatalog构造PostgresCatalog时这五个参数都是必填项,其中baseUrl要求是不能带有数据库名的

代码语言:javascript
复制
  String catalogName = "mycatalog";
  String defaultDatabase = "postgres";
  String username = "postgres";
  String pwd = "postgres";
  String baseUrl = "jdbc:postgresql://localhost:5432/";

  PostgresCatalog postgresCatalog = (PostgresCatalog) JdbcCatalogUtils.createCatalog(
    catalogName,
    defaultDatabase,
    username,
    pwd,
    baseUrl);

访问postgres 数据库指定表名的时候完整的路径名应该是以下格式:

代码语言:javascript
复制
<catalog>.<db>.`<schema.table>`

其中schema默认是public,如果是使用缺省值,public是可以省略的。比如下面的查询语句:

代码语言:javascript
复制
SELECT * FROM mypg.mydb.test_table;
SELECT * FROM mydb.test_table;
SELECT * FROM test_table;

如果非缺省schema,则不能被省略:

代码语言:javascript
复制
SELECT * FROM mypg.mydb.`custom_schema.test_table2`
SELECT * FROM mydb.`custom_schema.test_table2`;
SELECT * FROM `custom_schema.test_table2`;
  • 常见操作

我们PostgresCatalog将注册到StreamTableEnvironment 的变量tEnv中,然后就可以用tEnv进行一些操作了。

代码语言:javascript
复制
 tEnv.registerCatalog(postgresCatalog.getName(), postgresCatalog);
  tEnv.useCatalog(postgresCatalog.getName());
  1. 列出来所有的数据库:
代码语言:javascript
复制
        System.out.println("list databases :");
  String[] databases = tEnv.listDatabases();
  Stream.of(databases).forEach(System.out::println);
  1. 列出来所有的table
代码语言:javascript
复制
     tEnv.useDatabase(defaultDatabase);
  System.out.println("list tables :");
  String[] tables = tEnv.listTables(); // 也可以使用  postgresCatalog.listTables(defaultDatabase);
  Stream.of(tables).forEach(System.out::println);
  1. 列出所有函数
代码语言:javascript
复制
        System.out.println("list functions :");
  String[] functions = tEnv.listFunctions();
  Stream.of(functions).forEach(System.out::println);
  1. 获取table的schema
代码语言:javascript
复制
 CatalogBaseTable catalogBaseTable = postgresCatalog.getTable(new ObjectPath(
    defaultDatabase,
    "table1"));

  TableSchema tableSchema = catalogBaseTable.getSchema();
  System.out.println("tableSchema --------------------- :");
  System.out.println(tableSchema);
  1. 查询表的数据
代码语言:javascript
复制
  List<Row> results = Lists.newArrayList(tEnv.sqlQuery("select * from table1")
                                             .execute()
                                             .collect());
  results.stream().forEach(System.out::println);
  1. 插入数据
代码语言:javascript
复制
tEnv.executeSql("insert into table1 values (3,'c')");

完整的代码请参考:

https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/catalog/PostgresCatalogTest.java

源码解析

AbstractJdbcCatalog

这个类主要是对jdbc catalog一些公共的操作做了抽象.目前实现了实际功能的只有一个方法:getPrimaryKey,其他方式主要是对于Catalog的一些其他实现类做了特殊处理,比如类似create table 或者 alter table是不支持的,listView只是返回一个空列表,因为我们使用jdbc catalog主要是来做一些DML操作。

代码语言:javascript
复制
 @Override
 public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
  throw new UnsupportedOperationException();
 }

 @Override
 public List<String> listViews(String databaseName) throws DatabaseNotExistException, CatalogException {
  return Collections.emptyList();
 }

PostgresCatalog

在这里面,主要是实现了一些常用的操作数据库的方法,比如getTable、listTables、listDatabases等等,其实简单的来说就是从postgres元数据库里查询出来相应的信息,然后组装成flink的相关对象,返回给调用方。以一个简单的方法listDatabases为例:

从元数据表pg_database中查询所有的tablename,然后去掉内置的数据库,也就是template0和template1,然后封装到一个list对象里,返回。

代码语言:javascript
复制
 @Override
 public List<String> listDatabases() throws CatalogException {
  List<String> pgDatabases = new ArrayList<>();

  try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {

   PreparedStatement ps = conn.prepareStatement("SELECT datname FROM pg_database;");

   ResultSet rs = ps.executeQuery();

   while (rs.next()) {
    String dbName = rs.getString(1);
    if (!builtinDatabases.contains(dbName)) {
     pgDatabases.add(rs.getString(1));
    }
   }

   return pgDatabases;
  } catch (Exception e) {
   throw new CatalogException(
    String.format("Failed listing database in catalog %s", getName()), e);
  }
 }

有不兼容的地方需要做一些转换,比如getTable方法,有些数据类型是不匹配的,要做一些类型的匹配,如postgres里面的serial和int4都会转成flink的int类型,具体的参考下PostgresCatalog#fromJDBCType方法。

代码语言:javascript
复制
 private DataType fromJDBCType(ResultSetMetaData metadata, int colIndex) throws SQLException {
  String pgType = metadata.getColumnTypeName(colIndex);

  int precision = metadata.getPrecision(colIndex);
  int scale = metadata.getScale(colIndex);

  switch (pgType) {
   case PG_BOOLEAN:
    return DataTypes.BOOLEAN();
   case PG_BOOLEAN_ARRAY:
    return DataTypes.ARRAY(DataTypes.BOOLEAN());
   case PG_BYTEA:
    return DataTypes.BYTES();
    .........................

参考资料: [1].https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-08-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据技术与应用实战 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 背景
  • 示例
  • 源码解析
    • AbstractJdbcCatalog
      • PostgresCatalog
      相关产品与服务
      数据库
      云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档