前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊flink TableEnvironment的scan操作

聊聊flink TableEnvironment的scan操作

原创
作者头像
code4it
发布2019-01-22 10:35:56
9750
发布2019-01-22 10:35:56
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下flink TableEnvironment的scan操作

实例

代码语言:javascript
复制
//Scanning a directly registered table
val tab: Table = tableEnv.scan("tableName")
​
//Scanning a table from a registered catalog
val tab: Table = tableEnv.scan("catalogName", "dbName", "tableName")
  • scan操作用于从schema读取指定的table,也可以传入catalogName及dbName从指定的catalog及db读取

TableEnvironment.scan

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/TableEnvironment.scala

代码语言:javascript
复制
abstract class TableEnvironment(val config: TableConfig) {
​
  private val internalSchema: CalciteSchema = CalciteSchema.createRootSchema(false, false)
  private val rootSchema: SchemaPlus = internalSchema.plus()
​
  //......
​
  @throws[TableException]
  @varargs
  def scan(tablePath: String*): Table = {
    scanInternal(tablePath.toArray) match {
      case Some(table) => table
      case None => throw new TableException(s"Table '${tablePath.mkString(".")}' was not found.")
    }
  }
​
  private[flink] def scanInternal(tablePath: Array[String]): Option[Table] = {
    require(tablePath != null && !tablePath.isEmpty, "tablePath must not be null or empty.")
    val schemaPaths = tablePath.slice(0, tablePath.length - 1)
    val schema = getSchema(schemaPaths)
    if (schema != null) {
      val tableName = tablePath(tablePath.length - 1)
      val table = schema.getTable(tableName)
      if (table != null) {
        return Some(new Table(this, CatalogNode(tablePath, table.getRowType(typeFactory))))
      }
    }
    None
  }
​
  private def getSchema(schemaPath: Array[String]): SchemaPlus = {
    var schema = rootSchema
    for (schemaName <- schemaPath) {
      schema = schema.getSubSchema(schemaName)
      if (schema == null) {
        return schema
      }
    }
    schema
  }
​
  //......
}
  • scan方法内部调用的是scanInternal,scanInternal首先读取catalog及db信息,然后调用getSchema方法来获取schema
  • getSchema是使用SchemaPlus的getSubSchema来按层次获取SchemaPlus,如果没有指定catalog及db,那么这里返回的是rootSchema
  • 获取到schema之后,就可以从tablePath数组获取tableName(数组最后一个元素),调用SchemaPlus的getTable方法查找Table

小结

  • TableEnvironment的scan操作就是从Schema中查找Table,可以使用tableName,或者额外指定catalog及db来查找
  • getSchema是使用SchemaPlus的getSubSchema来按层次获取SchemaPlus,如果没有指定catalog及db,那么这里返回的是rootSchema
  • 获取到schema之后,就可以从tablePath数组获取tableName(数组最后一个元素),调用SchemaPlus的getTable方法查找Table

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 实例
  • TableEnvironment.scan
  • 小结
  • doc
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档