前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >flink sql 知其所以然(一)| source\sink 原理

flink sql 知其所以然(一)| source\sink 原理

作者头像
公众号:大数据羊说
发布于 2021-08-19 06:32:14
发布于 2021-08-19 06:32:14
2.9K01
代码可运行
举报
文章被收录于专栏:大数据羊说大数据羊说
运行总次数:1
代码可运行

感谢您的关注 + 点赞 + 再看,对博主的肯定,会督促博主持续的输出更多的优质实战内容!!!

1.序篇-本文结构

本文从以下五个小节介绍 flink sql source\sink\format 的概念、原理。

  1. 背景篇-关于 sql
  2. 定义篇-sql source、sink
  3. 实战篇-sql source、sink 的用法
  4. 原理剖析篇-sql source、sink 是怎么跑起来的
  5. 总结与展望篇

2.背景篇-关于 sql

关于 flink sql 的定位。

先聊聊使用 sql 的原因,总结来说就是一切从简。

  • SQL 属于 DSL
  • SQL 易于理解
  • SQL 内置多种查询优化器
  • SQL 稳定的语言
  • SQL 易于管理
  • SQL 利于流批一体

目前 1.13 版本的 SQL 已经集成了大量高效、易用的 feature。本系列教程也是基于 1.13.1。

3.定义篇-sql source、sink

本文会简单介绍一些 flink sql 的 source、sink 的定义、使用方法,会着重切介绍其对应框架设计和实现。详细解析一下从一条 create table sql 到具体的算子层面的整个流程。

Notes:在 flink sql 中,source 有两种表,一种是数据源表,一种是数据维表。数据源表就是有源源不断的数据的表。比如 mq。数据维表就是用来给某些数据扩充维度使用的。比如 redis,mysql,一般都是做扩容维度的维表 join 使用。 本节主要介绍数据源表,数据维表的整个流程和数据源表几乎一样。下文中的 source 默认都为数据源表。

首先在介绍 sql 之前,我们先来看看 datastream 中定义一个 source 需要的最基本的内容。

  1. source、sink 的 connector 连接配置信息。比如 datastream api kafka connector 的 properties,topic 名称。
  2. source、sink 的序列化方式信息。比如 datastream api kafka connector 的 DeserializationSchema,SerializationSchema。
  3. source、sink 的字段信息。比如 datastream api kafka connector 的序列化或者反序列化出来的 Model 所包含的字段信息。
  4. source、sink 对象。比如 datastream api kafka connector source 对应的具体 java 对象。

sql 中的 source、sink 所包含的基本点其实和 datastream 都是相同的,可以将 sql 中的一些语法给映射到 datastream 中来帮助快速理解 sql:

  1. sql source、sink connector\properties。可以对应到 datastream api kafka connector 的 properties,topic 名称。
  2. sql source、sink format。可以对应到 datastream api kafka connector 的 DeserializationSchema,SerializationSchema。
  3. sql source、sink field。可以对应到 datastream api kafka connector 的序列化或者反序列化出来的 Model 所包含的字段信息。
  4. sql source、sink catalog_name、db_name、table_name。可以对应到 datastream api kafka connector source 对应的具体 java 对象。
  5. sql 本身的特性。比如某些场景下需要将 sql schema 持久化,会用到 hive catalog 等,这个可以说是 sql 目前比 datastream api 多的一个特性。但是仔细想想,其实 datastream 也能够拓展这样的能力,其实就是将某个 datastream 注册到外部存储中(可以,但对 datastream 来说没必要)。

来看看官网的文档 create table schema 的描述,可以发现就是围绕着上面这五点展开的。https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/create/#create-table。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
  (
    { <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n]
    [ <watermark_definition> ]
    [ <table_constraint> ][ , ...n]
  )
  [COMMENT table_comment]
  [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
  WITH (key1=val1, key2=val2, ...)
  [ LIKE source_table [( <like_options> )] ]
   
<physical_column_definition>:
  column_name column_type [ <column_constraint> ] [COMMENT column_comment]
  
<column_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED

<table_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED

<metadata_column_definition>:
  column_name column_type METADATA [ FROM metadata_key ] [ VIRTUAL ]

<computed_column_definition>:
  column_name AS computed_column_expression [COMMENT column_comment]

<watermark_definition>:
  WATERMARK FOR rowtime_column_name AS watermark_strategy_expression

<source_table>:
  [catalog_name.][db_name.]table_name

<like_options>:
{
   { INCLUDING | EXCLUDING } { ALL | CONSTRAINTS | PARTITIONS }
 | { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS | WATERMARKS } 
}[, ...]

结合我们刚刚说的 sql source、sink 中主要包含 5 点解释一下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
CREATE TABLE [IF NOT EXISTS] -- sql source、sink catalog_name、db_name、table_name
(
   -- sql source、sink field 字段信息
) WITH 
( 
   -- sql source、sink connector\properties 连接配置
   -- sql source、sink format
)

来个 kafka source 的例子:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
CREATE TABLE KafkaTable ( -- sql source、sink catalog_name、db_name、table_name
  `f0` STRING, -- sql source、sink 的字段信息
  `f1` STRING
) WITH (
  'connector' = 'kafka', -- sql source、sink 的 connector 连接配置
  'topic' = 'topic', -- sql source、sink 的 connector 连接配置
  'properties.bootstrap.servers' = 'localhost:9092', -- sql source、sink 的 connector 连接配置
  'properties.group.id' = 'testGroup', -- sql source、sink 的 connector 连接配置
  'format' = 'json' -- sql source、sink 的序列化方式信息
)

其对应的 datastream 写法如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "testGroup");

DeserializationSchema<Tuple2<String, String>> d = new AbstractDeserializationSchema<Tuple2<String, String>>() {

    @Override
    public Tuple2<String, String> deserialize(byte[] message) throws IOException {
        return json 解析为 tuple2 此处省略;
    }
};

DataStream<Tuple2<String, String>> stream = env
        .addSource(new FlinkKafkaConsumer<>("topic", d, properties));

将 sql source 和 datastream source 的组成部分互相映射起来可以得到下图,其中 datastream、sql 中颜色相同的属性互相对应:

2

可以看到,将所有的 sql 关系代数都映射到 datastream api 上,会有助于我们快速理解。

4.实战篇-sql source、sink 的用法

直接见官网 Table API Connectors。已经描述的非常详细了,本文侧重原理,所以此处不多赘述。

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/

https://www.alibabacloud.com/help/zh/faq-list/62516.htm?spm=a2c63.p38356.b99.212.3c1a1442x9AY7m

5.原理剖析篇-sql source、sink 是怎么跑起来的

关于 sql 具体工作原理可以参考 https://zhuanlan.zhihu.com/p/157265381。

3

但是很多刚接触 flink sql 的读者看完这篇文章,会感觉到还没准备好就来了这么大一堆密集的信息。那么

  1. 我到底应该从哪里看起呢?
  2. 能理解 sql 会映射到具体的算子执行。但是它具体是怎么对应到具体的算子上的呢?

博主会从以下两个角度去帮大家理清楚整个流程。

  1. 先抛开 flink sql、datastream 提供的能力来说,如果你在自己的一个程序中去接入一个数据源,你最关心的是哪些组件?

答:消费一个数据源最重要的就是 connector(负责链接外部组件,消费数据) + serde(负责序列化成 flink 认识的变量形式)。

  1. 结合第一个问题 + 一段简单的 flink sql 代码来看看 flink 是怎么去做这件事情的。

代码(基于 1.13.1):

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class KafkaSourceTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        env.setParallelism(1);

        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner()
                .inStreamingMode().build();

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

        tEnv.executeSql(
                "CREATE TABLE KafkaSourceTable (\n"
                        + "  `f0` STRING,\n"
                        + "  `f1` STRING\n"
                        + ") WITH (\n"
                        + "  'connector' = 'kafka',\n"
                        + "  'topic' = 'topic',\n"
                        + "  'properties.bootstrap.servers' = 'localhost:9092',\n"
                        + "  'properties.group.id' = 'testGroup',\n"
                        + "  'format' = 'json'\n"
                        + ")"
        );

        Table t = tEnv.sqlQuery("SELECT * FROM KafkaSourceTable");

        tEnv.toAppendStream(t, Row.class).print();

        env.execute();
    }
}

可以看到这段代码很简单,就是创建一个数据源表之后 select 数据 print。

通过上面这段 sql 映射出的 transformations 中发现,其实 flink 中最关键变量的也就是我们刚刚提出的第一个问题中的那两点:

  1. sql source connectorFlinkKafkaConsumer
  2. sql source formatJsonRowDataDeserializationSchema

19

所以我们就可以从下面这三个方向(多出来的一个是配置信息)的问题去了解具体是怎么对应到具体的算子上的。

  1. sql source connector:用户指定了 connector = kafka,flink 是怎么自动映射到 FlinkKafkaConsumer 的?
  2. sql source format:用户指定了 format = json,字段信息,flink 是怎么自动映射到 JsonRowDataDeserializationSchema,以及字段解析的?
  3. sql source properties:flink 是怎么自动将配置加载到 FlinkKafkaConsumer 中的?

5.1.connector 怎样映射到具体算子?

引用官网图:

22

Notes:其中 LookupTableSource 为数据维表。

先说下结论,再跟一遍源码。

结论:

  1. MetaData:将 sql create source table 转化为实际的 CatalogTable、翻译为 RelNode
  2. Planning:创建 RelNode 的过程中使用 SPI 将所有的 source(DynamicTableSourceFactory)\sink(DynamicTableSinkFactory) 工厂动态加载,获取到 connector = kafka,然后从所有 source 工厂中过滤出名称为 kafka + 继承自 DynamicTableSourceFactory.class 的工厂类 KafkaDynamicTableFactory,使用 KafkaDynamicTableFactory 创建出 KafkaDynamicSource
  3. RuntimeKafkaDynamicSource 创建出 FlinkKafkaConsumer,负责 flink 程序实际运行。

源码:

debug 代码,既然创建的是 FlinkKafkaConsumer,那我们就将断点打在 FlinkKafkaConsumer 的构造函数中。

5

如图可以发现当 debug 到当前断点时,已经进入 FlinkKafkaConsumer source 的创建阶段了,执行到这里的时候已经是完成了 sql connector 和具体实际 connector 的映射了。那么 connector 怎样映射到具体算子的过程呢?

我们往前回溯一下,定位到 CatalogSourceTable 中的 82 行(源码基于 1.13.1),发现 tableSource 已经是 KafkaDynamicSource,因此可以确定就是这一行代码将 connector = kafka 映射到 FlinkKafkaConsumer 的。

6

可以发现这段代码将包含了所有 sql create source table 中信息的 catalogTable 变量传入了。

7

进入这个方法后,可以看到是使用了 FactoryUtil 创建了 DynamicTableSource

8

进入 FactoryUtil.createTableSource 后可以看到,就是最重要的两步操作。

  1. 先获取 kafka 工厂对象。
  2. 使用 kafka 工厂对象创建出 kafka source。

9

进入 FactoryUtil.getDynamicTableFactory 后:

  1. flink 是使用了 SPI 机制动态(SPI 机制天然插件化)的加载到了所有继承了 Factory 的工厂实例。通过截图可以看到有好多 source\sink\format Factory。关于 SPI 可以参考 https://www.jianshu.com/p/3a3edbcd8f24
  2. 通过 connector = kafka + DynamicTableSourceFactory.class 的标识去过滤出 KafkaDynamicTableFactory

然后 KafkaDynamicTableFactory.createDynamicTableSource 去创建对应的 source。

13

可以看到 KafkaDynamicTableFactory.createDynamicTableSource 中调用 KafkaDynamicTableFactory.createKafkaTableSource 来创建 KafkaDynamicSource

基本上整个创建 Source 的流程就结束了。

5.2.format 怎样映射到具体 serde?

结论:

  1. MetaData:和 connector 都一样
  2. Planning:format 是在创建 RelNode 的过程中,使用 KafkaDynamicTableFactory 创建出 KafkaDynamicSource 时,通过 SPI 去动态过滤出 format = json 并且继承自 DeserializationFormatFactory.class 的 format 工厂类 JsonFormatFactory
  3. RuntimeKafkaDynamicSource 创建出 FlinkKafkaConsumer 时,实例化 serde 即 JsonRowDataDeserializationSchema,负责 flink 程序实际运行时的反序列化。

源码:

15

KafkaDynamicTableFactory.createDynamicTableSource 中获取反序列化 schema 定义。

18

  1. flink 是使用了 SPI 机制动态(SPI 机制天然插件化)的加载到了所有继承了 Factory 的 format 工厂实例。
  2. 通过 format = json 的标识并且继承自 DeserializationFormatFactory.class 去过滤出 JsonFormatFactory

20

5.3.其他配置属性怎么加载?

结论:

KafkaDynamicTableFactory 创建 KafkaDynamicTable 的过程中初始化。

源码:

14

21

6.总结与展望篇

本文作为 flink sql 知其然系列的第一节,基于 1.13.1 版本 flink 介绍了 flink sql 的 source\sink\format 从 sql 变为可执行代码的原理。带大家过了一下源码。希望可以喜欢。

下节预告:flink sql 自定义 source\sink。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-08-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据羊说 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Flink SQL 知其所以然(二十四):SQL DDL!
CREATE 语句用于向当前或指定的 Catalog 中注册库、表、视图或函数。注册后的库、表、视图和函数可以在 SQL 查询中使用。
公众号:大数据羊说
2022/07/07
1.3K0
Flink的sink实战之三:cassandra3
本文是《Flink的sink实战》系列的第三篇,主要内容是体验Flink官方的cassandra connector,整个实战如下图所示,我们先从kafka获取字符串,再执行wordcount操作,然后将结果同时打印和写入cassandra:
程序员欣宸
2020/05/26
1.2K0
Flink Table API & SQL 基本操作
本文主要展示了 Table API 和 SQL 程序的常见结构,如何创建注册 Table,查询 Table,以及如何输出 Table。
smartsi
2022/04/17
3.5K0
Flink1.9新特性解读:通过Flink SQL查询Pulsar
问题导读 1.Pulsar是什么组件? 2.Pulsar作为Flink Catalog,有哪些好处? 3.Flink是否直接使用Pulsar原始模式? 4.Flink如何从Pulsar读写数据? Flink1.9新增了很多的功能,其中一个对我们非常实用的特性通过Flink SQL查询Pulsar给大家介绍。 我们以前可能遇到过这样的问题。通过Spark读取Kafka,但是如果我们想查询kafka困难度有点大的,当然当前Spark也已经实现了可以通过Spark sql来查询kafka的数据。那么Flink 1.9又是如何实现通过Flink sql来查询Pulsar。 可能我们大多对kafka的比较熟悉的,但是对于Pulsar或许只是听说过,所以这里将Pulsar介绍下。 Pulsar简介 Pulsar由雅虎开发并开源的一个多租户、高可用,服务间的消息系统,目前是Apache软件基金会的孵化器项目。 Apache Pulsar是一个开源的分布式pub-sub消息系统,用于服务器到服务器消息传递的多租户,高性能解决方案,包括多个功能,例如Pulsar实例中对多个集群的本机支持,跨集群的消息的无缝geo-replication,非常低的发布和端到端 - 延迟,超过一百万个主题的无缝可扩展性,以及由Apache BookKeeper等提供的持久消息存储保证消息传递。 Pulsar已经在一些名企应用,比如腾讯用它类计费。而且它的扩展性是非常优秀的。下面是实际使用用户对他的认识。
用户1410343
2020/02/13
2.1K0
Flink1.9新特性解读:通过Flink SQL查询Pulsar
重要|Flink SQL与kafka整合的那些事儿
flink与kafka整合是很常见的一种实时处理场景,尤其是kafka 0.11版本以后生产者支持了事务,使得flink与kafka整合能实现完整的端到端的仅一次处理,虽然这样会有checkpoint周期的数据延迟,但是这个仅一次处理也是很诱人的。可见的端到端的使用案例估计就是前段时间oppo的案例分享吧。关注浪尖微信公众号(bigdatatip)输入 oppo 即可获得。
Spark学习技巧
2019/06/03
3.3K0
Flink SQL DDL 和 窗口函数实战
2019 年 8 月 22 日,Flink 发布了 1.9 版本,社区版本的 Flink 新增 了一个 SQL DDL 的新特性,但是暂时还不支持流式的一些概念的定义,比如说水位。
kk大数据
2019/12/19
5.2K0
Flink SQL DDL 和 窗口函数实战
2021年大数据Flink(三十七):​​​​​​​Table与SQL ​​​​​​案例四
Apache Flink 1.12 Documentation: Table API & SQL
Lansonli
2021/10/11
3020
Flink SQL 知其所以然(三十):Explain、Show、Load、Set 子句
大家好,我是老羊,今天我们来学习 Flink SQL 中的的 Explain、Show、Load、Set 共 4 个子句。
公众号:大数据羊说
2022/12/16
6480
flink 1.11.2 学习笔记(2)-Source/Transform/Sink
从上一节wordcount的示例可以看到,flink的处理过程分为下面3个步骤:
菩提树下的杨过
2020/11/24
1.2K0
flink 1.11.2 学习笔记(2)-Source/Transform/Sink
flink sql 知其所以然(五)| 自定义 protobuf format
protobuf 作为目前各大公司中最广泛使用的高效的协议数据交换格式工具库,会大量作为流式数据传输的序列化方式,所以在 flink sql 中如果能实现 protobuf 的 format 会非常有用(目前社区已经有对应的实现,不过目前还没有 merge,预计在 1.14 系列版本中能 release)。
公众号:大数据羊说
2022/04/04
1.4K0
flink sql 知其所以然(五)| 自定义 protobuf format
零基础学Flink:Data Source & Data Sink
在上一篇讲述CEP的文章里,直接使用了自定义Source和Sink,我翻阅了一下以前的文章,似乎没有对这部分进行一个梳理,那么今天我们来就这上次的代码,来说说 Data Source 和 Data Sink吧。
麒思妙想
2020/07/10
2.4K0
(中)史上最全干货!Flink SQL 成神之路(全文 18 万字、138 个案例、42 张图)
CREATE 语句用于向当前或指定的 Catalog 中注册库、表、视图或函数。注册后的库、表、视图和函数可以在 SQL 查询中使用。
公众号:大数据羊说
2022/04/04
6.2K0
(中)史上最全干货!Flink SQL 成神之路(全文 18 万字、138 个案例、42 张图)
Flink 1.11:更好用的流批一体 SQL 引擎
许多的数据科学家,分析师和 BI 用户依赖交互式 SQL 查询分析数据。Flink SQL 是 Flink 的核心模块之一。作为一个分布式的 SQL 查询引擎。Flink SQL 提供了各种异构数据源的联合查询。开发者可以很方便地在一个程序中通过 SQL 编写复杂的分析查询。通过 CBO 优化器、列式存储、和代码生成技术,Flink SQL 拥有非常高的查询效率。同时借助于 Flink runtime 良好的容错和扩展性,Flink SQL 可以轻松处理海量数据。
数据社
2020/07/14
1.6K0
项目实践|基于Flink的用户行为日志分析系统
用户行为日志分析是实时数据处理很常见的一个应用场景,比如常见的PV、UV统计。本文将基于Flink从0到1构建一个用户行为日志分析系统,包括架构设计与代码实现。本文分享将完整呈现日志分析系统的数据处理链路,通过本文,你可以了解到:
Spark学习技巧
2020/09/08
2.3K0
项目实践|基于Flink的用户行为日志分析系统
flink sql 知其所以然(三)| 自定义 redis 数据汇表(附源码)
目前在实时计算的场景中,熟悉 datastream 的同学在很多场景下都会将结果数据写入到 redis 提供数据服务。
公众号:大数据羊说
2021/08/19
9190
flink sql 知其所以然(三)| 自定义 redis 数据汇表(附源码)
Apache-Flink深度解析-DataStream-Connectors之Kafka
Apache Kafka是一个分布式发布-订阅消息传递系统。 它最初由LinkedIn公司开发,LinkedIn于2010年贡献给了Apache基金会并成为顶级开源项目。Kafka用于构建实时数据管道和流式应用程序。它具有水平扩展性、容错性、极快的速度,目前也得到了广泛的应用。
王知无-import_bigdata
2019/03/19
1.2K0
(上)史上最全干货!Flink SQL 成神之路(全文 18 万字、138 个案例、42 张图)
Apache Flink 提供了两种关系型 API 用于统一流和批处理,Table 和 SQL API。
公众号:大数据羊说
2022/04/04
3.3K0
(上)史上最全干货!Flink SQL 成神之路(全文 18 万字、138 个案例、42 张图)
Flink Data Source
Flink Data Source 用于定义 Flink 程序的数据来源,Flink 官方提供了多种数据获取方法,用于帮助开发者简单快速地构建输入流,具体如下:
每天进步一点点
2022/07/27
1.1K0
Flink Data Source
Flink SQL 知其所以然(二十七):TopN、Order By、Limit 操作
大家好,我是老羊,今天我们来学习 Flink SQL 中的 TopN、Order By、Limit 3个操作。
公众号:大数据羊说
2022/07/07
3K0
flink中如何自定义Source和Sink?
动态表(Dynamic tables)是Flink的Table&SQL API的核心概念,用于以统一方式处理有界和无界数据。
山行AI
2021/02/19
5.1K0
flink中如何自定义Source和Sink?
推荐阅读
相关推荐
Flink SQL 知其所以然(二十四):SQL DDL!
更多 >
领券
社区富文本编辑器全新改版!诚邀体验~
全新交互,全新视觉,新增快捷键、悬浮工具栏、高亮块等功能并同时优化现有功能,全面提升创作效率和体验
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文