前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Apache Flink Table API的Catalog

Apache Flink Table API的Catalog

作者头像
CainGao
发布2020-04-14 14:45:19
1.8K0
发布2020-04-14 14:45:19
举报
文章被收录于专栏:指尖数虫指尖数虫

Apache Flink的Table API提供了对数据注册为Table的方式, 实现把数据通过SQL的方式进行计算。Table API与SQL API实现了Apache Flink的批流统一的实现方式。Table API与SQL API的核心概念就是TableEnviroment。TableEnviroment对象提供方法注册数据源与数据表信息。那么数据源与数据表的信息则存储在CataLog中。所以,CataLog是TableEnviroment的重要组成部分。

Apache Flink在获取TableEnviroment对象后,可以通过Register实现对数据源与数据表进行注册。注册完成后数据库与数据表的原信息则存储在CataLog中。CataLog中保存了所有的表结构信息、数据目录信息等。

内部CataLog注册

  1. 内部Table注册 通过TableEnviroment的Register接口对数据进行注册。TableEnviroment的registerTable包含两个参数("tableName",table)。tableName就是注册在CataLog中的表名。第二个参数table则是对应的Table对象。Table则是由TableEnviroment生成得来或者是通过DataSet、DataStream转换的来。
代码语言:javascript
复制
...
tableEnv.registerDataSet("USER",dataset,'name,'age)

val result = tableEnv.sqlQuery("SELECT name,sum(age) FROM `USER` GROUP BY name")
//使用Table对象注册Table
tableEnv.registerTable("TABLE_RES",table)
//输出注册的Table中的内容
tableEnv.sqlQuery("SELECT * FROM `TABLE_RES`").toDataSet[Row].print()
...
  1. TableSource注册 通过TableSource对数据外部的数据源注册为Table数据结构。例如常见的有csv,Text,Parquet等文件格式。例如下代码,通过外部csv数据源注册为Table数据。然后可以通过SQL API对数据进行检索。
代码语言:javascript
复制
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)
//使用CSV的方式进行注册表结构.
// 参数(path:数据的路径地址,fieldNames:字段名称,fieldTypes:字段类型,fieldDelim:csv分隔符,rowDelim:行分割方式)
val csvTableSource:CsvTableSource =new CsvTableSource("../datas.csv",Array("exitcode","count"),Array(Types.STRING,Types.INT),",","\n")

tableEnv.registerTableSource("csv",csvTableSource)

以上使用的是StreamExecutionEnvironment进行的处理,当然也可以使用Batch的方式对数据进行注册可以自己进行尝试。

  1. TableSink注册 当数据处理、计算完成后就需要写入到外部的数据中。外部数据包括文本(CSV、Apache[Parquet、ORC、Avro])、数据库、KV库、消息队列等。Apche Flink通过Table Sink用于支持常见的数据存储格式与存储系统。
代码语言:javascript
复制
val csvPath = "D:/flink.csv"
val fieldNames = Array[String]("user","age")
val fieldTypes = Array[TypeInformation[_]](Types.STRING,Types.INT)

val csvSink:CsvTableSink = new CsvTableSink(csvPath,",")

tableEnv.registerTableSink("csv",fieldNames,fieldTypes,csvSink)

tableEnv.sqlQuery("SELECT * FROM `USER` ").insertInto("csv")

外部CataLog注册

Apache Flink除了实现内部的CataLog作为所有Table的元数据存储介质之外还可以把CataLog放到其他的存储介质中。外部的CataLog可以自定义实现,然后在TableEnvironment中注册实现。Apache Flink官方提供了InMemoryCataLog的实现,开发者可以参考来实现其他的存储介质的CataLog。

代码语言:javascript
复制
val memoryCataLog:ExternalCatalog = new InMemoryExternalCatalog("UserCataLog")

tableEnv.registerExternalCatalog("user",memoryCataLog)

以上为Apache Flink的CataLog的实现。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-09-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 指尖数虫 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档