本文主要研究一下flink TableEnvironment的scan操作
//Scanning a directly registered table
val tab: Table = tableEnv.scan("tableName")
//Scanning a table from a registered catalog
val tab: Table = tableEnv.scan("catalogName", "dbName", "tableName")
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/TableEnvironment.scala
abstract class TableEnvironment(val config: TableConfig) {
private val internalSchema: CalciteSchema = CalciteSchema.createRootSchema(false, false)
private val rootSchema: SchemaPlus = internalSchema.plus()
//......
@throws[TableException]
@varargs
def scan(tablePath: String*): Table = {
scanInternal(tablePath.toArray) match {
case Some(table) => table
case None => throw new TableException(s"Table '${tablePath.mkString(".")}' was not found.")
}
}
private[flink] def scanInternal(tablePath: Array[String]): Option[Table] = {
require(tablePath != null && !tablePath.isEmpty, "tablePath must not be null or empty.")
val schemaPaths = tablePath.slice(0, tablePath.length - 1)
val schema = getSchema(schemaPaths)
if (schema != null) {
val tableName = tablePath(tablePath.length - 1)
val table = schema.getTable(tableName)
if (table != null) {
return Some(new Table(this, CatalogNode(tablePath, table.getRowType(typeFactory))))
}
}
None
}
private def getSchema(schemaPath: Array[String]): SchemaPlus = {
var schema = rootSchema
for (schemaName <- schemaPath) {
schema = schema.getSubSchema(schemaName)
if (schema == null) {
return schema
}
}
schema
}
//......
}
数组最后一个元素
),调用SchemaPlus的getTable方法查找Table数组最后一个元素
),调用SchemaPlus的getTable方法查找Table