首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Table API&SQL的基本概念及使用介绍

Table API&SQL的基本概念及使用介绍

作者头像
Spark学习技巧
发布2018-01-31 10:22:20
6.2K0
发布2018-01-31 10:22:20
举报
文章被收录于专栏:Spark学习技巧Spark学习技巧

Table API和SQL集成在共同API中。这个API的中心概念是一个用作查询的输入和输出的表。本文档显示了具有表API和SQL查询的程序的常见结构,如何注册表,如何查询表以及如何发出表。

Table API和SQL捆绑在flink-table Maven工程中。 为了使用Table API和SQL,必须将以下依赖项添加到您的项目中:

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-table_2.10</artifactId>
   <version>1.3.2</version>
</dependency>

此外,您需要为Flink的Scala批处理或流式API添加依赖关系。 对于批量查询,您需要添加:

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-scala_2.10</artifactId>
   <version>1.3.2</version>
</dependency>

对于流式查询,您需要添加:

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-streaming-scala_2.10</artifactId>
   <version>1.3.2</version>
</dependency>

注意:由于Apache Calcite中的一个问题,阻止用户类加载器被垃圾回收,我们不建议构建一个包含flink-table依赖项的fat-jar。相反,我们建议将Flink配置为在系统类加载器中包含flink-table依赖关系。这可以通过将./opt文件夹中的flink-table.jar文件复制到./lib文件夹来完成。

一,Table API&Sql项目的结构

用于批处理和流式处理的所有Table API和SQL程序都遵循相同的模式。以下代码示例显示了Table API和SQL程序的通用结构。

// for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment
val env = StreamExecutionEnvironment.getExecutionEnvironment

// create a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// register a Table
tableEnv.registerTable("table1", ...)           // or
tableEnv.registerTableSource("table2", ...)     // or
tableEnv.registerExternalCatalog("extCat", ...)

// create a Table from a Table API query
val tapiResult = tableEnv.scan("table1").select(...)
// Create a Table from a SQL query
val sqlResult  = tableEnv.sql("SELECT ... FROM table2 ...")

// emit a Table API result Table to a TableSink, same for SQL result
tapiResult.writeToSink(...)

// execute
env.execute()

注意:Table API和SQL查询可以轻松集成到DataStream或DataSet程序中。

二,创建一个TableEnvironment

TableEnvironment是Table API和SQL集成的核心概念。 它负责:

A),在内部catalog中注册表

B),注册外部catalog

C),执行SQL查询

D),注册用户定义(标量,表或聚合)函数

E),将DataStream或DataSet转换为表

F),持有对ExecutionEnvironment或StreamExecutionEnvironment的引用

表总是绑定到一个特定的TableEnvironment。不可能在同一个查询中组合不同TableEnvironments的表,例如,join或union它们。通过使用StreamExecutionEnvironment或ExecutionEnvironment和可选的TableConfig调用静态TableEnvironment.getTableEnvironment()方法创建TableEnvironment。TableConfig可用于配置TableEnvironment或自定义查询优化和翻译过程(请参阅查询优化)。

// ***************
// STREAMING QUERY
// ***************
val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
// create a TableEnvironment for streaming queries
val sTableEnv = TableEnvironment.getTableEnvironment(sEnv)

// ***********
// BATCH QUERY
// ***********
val bEnv = ExecutionEnvironment.getExecutionEnvironment
// create a TableEnvironment for batch queries
val bTableEnv = TableEnvironment.getTableEnvironment(bEnv)

三,在Catalog中注册一张表

TableEnvironment具有一个表的内部Catalog,按表名组织。 表API或SQL查询可以通过名称引用来访问Catalog中注册的表。

TableEnvironment允许您从各种来源注册表:

A),一个现有的Table对象,通常是一个Table API或SQL查询的结果。

B),一个TableSource,用于访问外部数据,如文件,数据库或消息系统。

C),来自DataStream或DataSet程序的DataStream或DataSet。

1,注册一张表

表在TableEnvironment中注册如下:

// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// Table is the result of a simple projection query 
val projTable: Table = tableEnv.scan("X").project(...)

// register the Table projTable as table "projectedX"
tableEnv.registerTable("projectedTable", projTable)

注意:注册的表格与从关系数据库系统所知道的VIEW类似,即定义该表的查询未被优化,但是当另一个查询引用已注册的表时将被内联处理。如果多个查询引用相同的注册表,则每个引用查询将被内联并执行多次,即注册表的结果将不会被共享。

2,注册TableSource

TableSource提供对存储在诸如数据库(MySQL,HBase,...)的存储系统中的外部数据的访问,具有特定编码的文件(CSV,Apache [Parquet,Avro,ORC],...))或消息系统 (Apache Kafka,RabbitMQ,...)。

Flink旨在为通用数据格式和存储系统提供TableSources。后面还会出文章讲解TablesSources和Sinks。

TableSource在TableEnvironment中注册如下:

// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// create a TableSource
val csvSource: TableSource = new CsvTableSource("/path/to/file", ...)

// register the TableSource as table "CsvTable"
tableEnv.registerTableSource("CsvTable", csvSource)

四,注册一个外部Catalog

外部Catalog可以提供有关外部数据库和表的信息,例如其名称,模式,统计信息和有关如何访问存储在外部数据库,表或文件中的数据的信息。可以通过实现ExternalCatalog界面创建外部目录,并在TableEnvironment中注册如下:

// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// create an external catalog
val catalog: ExternalCatalog = new InMemoryExternalCatalog

// register the ExternalCatalog catalog
tableEnv.registerExternalCatalog("InMemCatalog", catalog)

一旦注册到TableEnvironment中,可以通过指定其完整路径(如catalog.database.table)从Table API或SQL查询中访问ExternalCatalog中定义的所有表。目前,Flink为演示和测试提供了一个InMemoryExternalCatalog。但是,ExternalCatalog界面也可用于将目录(如HCatalog或Metastore)连接到Table API。

五,查询表

1,Table API

Table API是用于Scala和Java的语言集成查询API。与SQL相反,查询没有被指定为字符串,而是在主机语言中逐步构建。后面会出文章详细介绍这个特性。

该API基于Table类,代表一张表(Streaming或者batch),提供使用相关操作的方法。这些方法返回一个新的Table对象,它表示在输入表中应用关系操作的结果。一些关系操作由多个方法调用组成,如table.groupBy(...).select(),其中groupBy(...)指定分组表,select(...) 从分组表中选取结果。

以下示例显示了一个简单的Table API聚合查询:

// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// register Orders table

// scan registered Orders table
Table orders = tableEnv.scan("Orders")
// compute revenue for all customers from France
Table revenue = orders
  .filter('cCountry === "FRANCE")
  .groupBy('cID, 'cName)
  .select('cID, 'cName, 'revenue.sum AS 'revSum)

注意:Scala Table API使用Scala符号,它以单个tick(')开始引用表的属性。 Table API使用Scala隐含。 确保导入org.apache.flink.api.scala._和org.apache.flink.table.api.scala._以便使用Scala隐式转换。

2,SQL

Flink的SQL集成基于实现SQL标准的Apache Calcite。 SQL查询被指定为常规字符串。后面会出文章详细介绍这个特性。

以下示例显示如何指定查询并将结果作为表返回。

// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// register Orders table

// compute revenue for all customers from France
Table revenue = tableEnv.sql(""" 
                               |SELECT cID, cName, SUM(revenue) AS revSum
                               |FROM Orders
                               |WHERE cCountry = 'FRANCE'
                               |GROUP BY cID, cName
                             """.stripMargin)

// emit or convert Table
// execute query

3,混合Table API和SQL

Table API查询可以在SQL查询返回的Table对象上进行操作。

通过将Table API返回的对象注册成表也可以进行一个SQL查询请求,在SQL查询的FROM子句中引用它。

六,输出一张表

为了输出一个表,可以将它写入一个TableSink。TableSink是支持各种文件格式(例如CSV,Apache Parquet,Apache Avro),存储系统(例如JDBC,Apache HBase,Apache Cassandra,Elasticsearch)或消息传递系统(例如Apache Kafka,RabbitMQ的)。

批处理表只能写入BatchTableSink,而streaming table需要一个AppendStreamTableSink,一个RetractStreamTableSink或一个UpsertStreamTableSink。

有关Table source和sink的详细信息及如何自定义一个TableSink后面会给出详细的文章。使用例子:

// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// compute a result Table using Table API operators and/or SQL queries
val result: Table = ...

// create a TableSink
val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|")

// write the result Table to the TableSink
result.writeToSink(sink)

// execute the program

七,翻译并执行查询

Table API和SQL查询根据输入是流式还是批量输入而被转换为DataStream或DataSet程序。 查询内部表示为逻辑查询计划,并分为两个阶段:

A),优化逻辑计划

B),翻译成DataStream或DataSet程序。

Table API或者SQL查询在一下情况下被翻译:

A),表被输出到TableSink,即当调用Table.writeToSink()时。

B),该表被转化为DataStream或者DataSet。

转换结束之后,Table API或SQL查询就像常规DataStream或DataSet程序一样处理,并在调用StreamExecutionEnvironment.execute()或ExecutionEnvironment.execute()时执行。

八,与DataStream和DataSet API集成

Table API和SQL查询可以轻松地集成到DataStream和DataSet程序中并嵌入到其中。表API和SQL查询可以轻松地集成到DataStream和DataSet程序中并嵌入到其中。 例如,可以查询外部表(例如来自RDBMS),进行一些预处理,例如过滤,投影,聚合或与元数据连接,然后使用DataStream或 DataSet API(以及任何构建在这些API之上的库,如CEP或Gelly)。 相反,Table API或SQL查询也可以应用于DataStream或DataSet程序的结果。相反,Table API或SQL查询也可以应用于DataStream或DataSet程序的结果。

这种交互可以通过将DataStream或DataSet转换为Table来实现,反之亦然。在本节中,我们将介绍如何完成这些转换。

1,Scala的隐式转换

Scala Table API提供DataSet,DataStream和Table类的隐式转换。通过导入包org.apache.flink.table.api.scala._除了用于Scala DataStream API的org.apache.flink.api.scala._之外还可以启用这些转换。

2,将DataStream或DataSet注册为表

结果表的schema 取决于注册的DataStream或DataSet的数据类型。有关详细信息,请查看有关将数据类型映射到表模式的部分。

// get TableEnvironment 
// registration of a DataSet is equivalent
val tableEnv = TableEnvironment.getTableEnvironment(env)

val stream: DataStream[(Long, String)] = ...

// register the DataStream as Table "myTable" with fields "f0", "f1"
tableEnv.registerDataStream("myTable", stream)

// register the DataStream as table "myTable2" with fields "myLong", "myString"
tableEnv.registerDataStream("myTable2", stream, 'myLong, 'myString)

3,将DataStream或DataSet转换为表

不仅仅可以在TableEnvironment中注册DataStream或DataSet,也可以直接转换为Table。 如果要在Table API查询中使用Table,这很方便。

// get TableEnvironment
// registration of a DataSet is equivalent
val tableEnv = TableEnvironment.getTableEnvironment(env)

val stream: DataStream[(Long, String)] = ...

// convert the DataStream into a Table with default fields '_1, '_2
val table1: Table = tableEnv.fromDataStream(stream)

// convert the DataStream into a Table with fields 'myLong, 'myString
val table2: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString)

4,将表转换为DataStream或DataSet

表可以转换为DataStream或DataSet。以这种方式,可以基于Table API或SQL查询的结果运行自定义DataStream或DataSet程序。

将表转换为DataStream或DataSet时,需要指定生成的DataStream或DataSet的数据类型,即要转换表的行的数据类型。通常最方便的转换类型是Row。以下列表概述了不同选项的功能:

Row:字段通过位置,任意数量的字段映射,支持空值,无类型安全访问。

POJO:按名称映射字段(POJO字段必须命名为表字段),任意字段数,支持空值,类型安全访问。

Case Class:字段按位置映射,不支持空值,类型安全访问。

Tuple:字段通过位置映射,限制为22(Scala)或25(Java)字段,不支持空值,类型安全访问。

Atomic Type:表必须有单个字段,不支持空值,类型安全访问。

4.1 将表转换为DataStream

作为流式查询的结果的表将被动态地更新,即当新记录到达查询的输入流时,它会改变。因此,转换此动态查询的DataStream需要对表的更新进行编码。

将Table转换为DataStream有两种模式:

Append Mode:仅当动态表仅由INSERT更改修改时,才能使用此模式,即只是附加的,并且以前发布的结果永远不会被更新。

Retract Mode:始终可以使用此模式。 它使用布尔标志来编码INSERT和DELETE更改。

// get TableEnvironment. 
// registration of a DataSet is equivalent
val tableEnv = TableEnvironment.getTableEnvironment(env)

// Table with two fields (String name, Integer age)
val table: Table = ...

// convert the Table into an append DataStream of Row
val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)

// convert the Table into an append DataStream of Tuple2[String, Int]
val dsTuple: DataStream[(String, Int)] dsTuple =
  tableEnv.toAppendStream[(String, Int)](table)

// convert the Table into a retract DataStream of Row.
//   A retract stream of type X is a DataStream[(Boolean, X)]. 
//   The boolean field indicates the type of the change. 
//   True is INSERT, false is DELETE.
val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)

注意:有关动态表及其配置属性的介绍后面会出文章。

4.2 将Table转化为DataSet

将表转换为DataSet,如下所示:

// get TableEnvironment 
// registration of a DataSet is equivalent
val tableEnv = TableEnvironment.getTableEnvironment(env)

// Table with two fields (String name, Integer age)
val table: Table = ...

// convert the Table into a DataSet of Row
val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table)

// convert the Table into a DataSet of Tuple2[String, Int]
val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)

九,数据类型和表schema映射

Flink的DataStream和DataSet API支持非常多样化的类型,例如Tuples(内置Scala和Flink Java元组),POJO,Case Class和原子类型。下面我们将介绍Table API如何将这些类型转换为内部行表示,并显示将DataStream转换为Table的示例。

1,原子类型

Flink将原始(Integer,Double,String)或通用类型(无法分析和分解的类型)视为原子类型。属性的类型是从原子类型推断的,必须指定属性的名称。

// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

val stream: DataStream[Long] = ...
// convert DataStream into Table with field 'myLong
val table: Table = tableEnv.fromDataStream(stream, 'myLong)

2,元组(Scala和Java)和Case Class(仅限Scala)

Flink支持Scala的内置元组,并为Java提供自己的元组类。两种元组的DataStreams和DataSet可以转换成表。可以通过为所有字段提供名称(基于位置的映射)来重命名字段。如果未指定字段名称,则使用默认字段名称。

// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

val stream: DataStream[(Long, String)] = ...

// convert DataStream into Table with field names 'myLong, 'myString
val table1: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString)

// convert DataStream into Table with default field names '_1, '_2
val table2: Table = tableEnv.fromDataStream(stream)

// define case class
case class Person(name: String, age: Int)
val streamCC: DataStream[Person] = ...

// convert DataStream into Table with default field names 'name, 'age
val tableCC1 = tableEnv.fromDataStream(streamCC)

// convert DataStream into Table with field names 'myName, 'myAge
val tableCC1 = tableEnv.fromDataStream(streamCC, 'myName, 'myAge)

3,POJO (Java and Scala)

Flink支持POJO作为复合类型。在这里记录了确定POJO的规则。将POJO DataStream或DataSet转换为Table而不指定字段名称时,将使用原始POJO字段的名称。重命名原始POJO字段需要关键字AS,因为POJO字段没有固有的顺序。名称映射需要原始名称,不能通过位置来完成。

// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// Person is a POJO with field names "name" and "age"
val stream: DataStream[Person] = ...

// convert DataStream into Table with field names 'name, 'age
val table1: Table = tableEnv.fromDataStream(stream)

// convert DataStream into Table with field names 'myName, 'myAge
val table2: Table = tableEnv.fromDataStream(stream, 'name as 'myName, 'age as 'myAge)

4,Row

Row数据类型支持任意数量的具有空值的字段和字段。字段名称可以通过RowTypeInfo指定,也可以将Row DataStream或DataSet转换为Table(基于位置)。

// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo`
val stream: DataStream[Row] = ...

// convert DataStream into Table with field names 'name, 'age
val table1: Table = tableEnv.fromDataStream(stream)

// convert DataStream into Table with field names 'myName, 'myAge
val table2: Table = tableEnv.fromDataStream(stream, 'myName, 'myAge)

十,查询优化

Apache Flink利用Apache Calcite来优化和翻译查询。目前执行的优化包括投影和过滤器下推,子查询去相关等各种查询重写。Flink还没有优化连接的顺序,而是按照查询中定义的顺序执行它们(FROM子句中的表的顺序和/或WHERE子句中的连接谓词的顺序)。

可以通过提供一个CalciteConfig对象来调整在不同阶段应用的优化规则集。这可以通过调用CalciteConfig.createBuilder())通过构建器创建,并通过调用tableEnv.getConfig.setCalciteConfig(calciteConfig)提供给TableEnvironment。

Table API提供了一种解释计算表的逻辑和优化查询计划的机制。这通过TableEnvironment.explain(table)方法完成。它返回一个描述三个计划的字符串:

1,关系查询的抽象语法树,即未优化的逻辑查询计划,

2,优化的逻辑查询计划

3,物理执行计划

以下代码显示了一个示例和相应的输出:

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)

val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
val table = table1
  .where('word.like("F%"))
  .unionAll(table2)

val explanation: String = tEnv.explain(table)
println(explanation)

执行计划如下:

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2017-09-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 浪尖聊大数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档