前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Apache Flink Table Api&SQL 介绍与使用

Apache Flink Table Api&SQL 介绍与使用

作者头像
CainGao
发布2020-04-14 09:21:21
7620
发布2020-04-14 09:21:21
举报
文章被收录于专栏:指尖数虫指尖数虫

Apache Flink,Spark,Hadoop包括其他计算框架都趋向于使用SQL的方式对数据进行检索。很少再有通过代码的方式进行数据的操作。数据计算框架使用SQL解释器的方式对数据进行检索。Apache Flink提供了Table API 与SQL的方式实现统一的流处理与批处理的数据计算。使用DataFrame关系型编程接口,其强大且灵活的表达能力、丰富的接口有效降低用户的使用成本。

Apache Flink提供了关系型编程接口Table API以及基于Table API的SQL API,让用户能够基于Table API、SQL API实现Flink应用。Table API与SQL API能够统一的实现批处理与流处理的计算业务。能够通过一套代码实现批数据的处理与流数据的处理。做到真正的批流统一 (批处理与流处理使用相同的代码实现相同的处理逻辑)

TableEnviroment

TableEnviroment与DataStream API一样,在开发时首先需要创建TableEnviroment关系型编程环境。才能在的程序中使用Table API与SQL API。SQL API与Table API使用的都是相同的编程模型。而且两者可以在程序中同时使用。

Flink SQL基于Apache Calcite框架实现SQL标准协议。Apache Calcite是Java编写的开源SQL解析工具,当前较多的项目使用该框架。如:Hive、Drill、Flink、Phoenix 等。Apache Calcite的主要功能有SQL解析、SQL校验、查询优化、SQL生成器、数据连接等。

TableEnviroment是Table API与SQL API的核心,主要负责:

  1. 在内部 catelog 中注册表
  2. 注册外部 catelog
  3. 执行SQL查询
  4. 注册用户自定义函数
  5. 将 DataStream 或 DataSet 转换为表
  6. 持有 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用

Table API 与 SQL API的实现

代码语言:javascript
复制
val env = ExecutionEnvironment.getExecutionEnvironment
    val tableEnv = BatchTableEnvironment.create(env)
    //注册DataSet
    val dataset:DataSet[User] = env.fromElements(
      User("CainGao",10),
      User("CainGao",1),
      User("CainGao",65)
    )
    
    val table:Table = tableEnv.fromDataSet(dataset)
    //输出Table名称
    table.printSchema()
    //Table API进行查询.
    table
      .groupBy("name")
      .select("name,age.sum as ages")
      .toDataSet[Row]
      .print()
      
    //注册Table表结构
    tableEnv.registerDataSet("USER",dataset,'name,'age)
    //执行SQL API进行查询
    val result = tableEnv.sqlQuery("SELECT name,sum(age) FROM `USER` GROUP BY name")
    
    result.toDataSet[Row].print()

以上代码通过Table API与SQL API分别对数据实现了对user表的计算。其中包含了 表注册、Table API查询、SQL API查询、DataSet与表转换等。

TableEnviroment中的Register接口完成表的注册,注册相应的数据源和数据表信息。所有数据库和表的元数据信息都存储在Flink Catalog内部目录结构中。

registerDataSet时,可以看到已经设置了Schema信息,如果不设置Schema信息Apache Flink会默认使用索引位置作为Table的字段名称:_1,_2。当然更好的方式还是使用字段名称进行映射,相对于字段索引位置的映射,名称映射显然更加的灵活。

今天大致了解这些,大致的先了解一下概念。Apache Flink利用其Table API与SQL API实现更灵活更加方便的对数据的操作。实现真正的批流统一。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-09-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 指尖数虫 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档