前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink SQL 知其所以然(二十五):基础 DML SQL 执行语义!

Flink SQL 知其所以然(二十五):基础 DML SQL 执行语义!

作者头像
公众号:大数据羊说
发布2022-07-07 16:01:26
6850
发布2022-07-07 16:01:26
举报
文章被收录于专栏:大数据羊说大数据羊说

1.DML:With 子句

  1. ⭐ 应用场景(支持 Batch\Streaming):With 语句和离线 Hive SQL With 语句一样的,xdm,语法糖 +1,使用它可以让你的代码逻辑更加清晰。
  2. ⭐ 直接上案例:
代码语言:javascript
复制
-- 语法糖+1
WITH orders_with_total AS (
    SELECT 
        order_id
        , price + tax AS total
    FROM Orders
)
SELECT 
    order_id
    , SUM(total)
FROM orders_with_total
GROUP BY 
    order_id;

2.DML:SELECT & WHERE 子句

  1. ⭐ 应用场景(支持 Batch\Streaming):SELECT & WHERE 语句和离线 Hive SQL 语句一样的,xdm,常用作 ETL,过滤,字段清洗标准化
  2. ⭐ 直接上案例:
代码语言:javascript
复制
INSERT INTO target_table
SELECT * FROM Orders

INSERT INTO target_table
SELECT order_id, price + tax FROM Orders

INSERT INTO target_table
-- 自定义 Source 的数据
SELECT order_id, price FROM (VALUES (1, 2.0), (2, 3.1))  AS t (order_id, price)

INSERT INTO target_table
SELECT price + tax FROM Orders WHERE id = 10

-- 使用 UDF 做字段标准化处理
INSERT INTO target_table
SELECT PRETTY_PRINT(order_id) FROM Orders
-- 过滤条件
Where id > 3
  1. SQL 语义

其实理解一个 SQL 最后生成的任务是怎样执行的,最好的方式就是理解其语义。

以下面的 SQL 为例,我们来介绍下其在离线中和在实时中执行的区别,对比学习一下,大家就比较清楚了

代码语言:javascript
复制
INSERT INTO target_table
SELECT PRETTY_PRINT(order_id) FROM Orders
Where id > 3

这个 SQL 对应的实时任务,假设 Orders 为 kafka,target_table 也为 Kafka,在执行时,会生成三个算子:

  • 数据源算子(From Order):连接到 Kafka topic,数据源算子一直运行,实时的从 Order Kafka 中一条一条的读取数据,然后一条一条发送给下游的 过滤和字段标准化算子
  • 过滤和字段标准化算子(Where id > 3 和 PRETTY_PRINT(order_id)):接收到上游算子发的一条一条的数据,然后判断 id > 3?将判断结果为 true 的数据执行 PRETTY_PRINT UDF 后,一条一条将计算结果数据发给下游 数据汇算子
  • 数据汇算子(INSERT INTO target_table):接收到上游发的一条一条的数据,写入到 target_table Kafka 中

可以看到这个实时任务的所有算子是以一种 pipeline 模式运行的,所有的算子在同一时刻都是处于 running 状态的,24 小时一直在运行,实时任务中也没有离线中常见的分区概念。

select & where

关于看如何看一段 Flink SQL 最终的执行计划: 最好的方法就如上图,看 Flink web ui 的算子图,算子图上详细的标记清楚了每一个算子做的事情。以上图来说,我们可以看到主要有三个算子:

  1. ⭐ Source 算子:Source: TableSourceScan(table=[[default_catalog, default_database, Orders]], fields=[order_id, name]) -> Calc(select=[order_id, name, CAST(CURRENT_TIMESTAMP()) AS row_time]) -> WatermarkAssigner(rowtime=[row_time], watermark=[(row_time - 5000:INTERVAL SECOND)]) ,其中 Source 表名称为 table=[[default_catalog, default_database, Orders],字段为 select=[order_id, name, CAST(CURRENT_TIMESTAMP()) AS row_time],Watermark 策略为 rowtime=[row_time], watermark=[(row_time - 5000:INTERVAL SECOND)]
  2. ⭐ 过滤算子:Calc(select=[order_id, name, row_time], where=[(order_id > 3)]) -> NotNullEnforcer(fields=[order_id]),其中过滤条件为 where=[(order_id > 3)],结果字段为 select=[order_id, name, row_time]
  3. ⭐ Sink 算子:Sink: Sink(table=[default_catalog.default_database.target_table], fields=[order_id, name, row_time]),其中最终产出的表名称为 table=[default_catalog.default_database.target_table],表字段为 fields=[order_id, name, row_time]

可以看到 Flink SQL 具体执行了哪些操作是非常详细的标记在算子图上。所以小伙伴萌一定要学会看算子图,这是掌握 debug、调优前最基础的一个技巧。

那么如果这个 SQL 放在 Hive 中执行时,假设其中 Orders 为 Hive 表,target_table 也为 Hive 表,其也会生成三个类似的算子(虽然实际可能会被优化为一个算子,这里为了方便对比,划分为三个进行介绍),离线和实时任务的执行方式完全不同:

  • 数据源算子(From Order):数据源从 Order Hive 表(通常都是读一天、一小时的分区数据)中一次性读取所有的数据,然后将读到的数据全部发给下游 过滤和字段标准化算子,然后 数据源算子 就运行结束了,释放资源了
  • 过滤和字段标准化算子(Where id > 3 和 PRETTY_PRINT(order_id)):接收到上游算子的所有数据,然后遍历所有数据判断 id > 3?将判断结果为 true 的数据执行 PRETTY_PRINT UDF 后,将所有数据发给下游 数据汇算子,然后 过滤和字段标准化算子 就运行结束了,释放资源了
  • 数据汇算子(INSERT INTO target_table):接收到上游的所有数据,将所有数据都写到 target_table Hive 表中,然后整个任务就运行结束了,整个任务的资源也就都释放了

可以看到离线任务的算子是分阶段(stage)进行运行的,每一个 stage 运行结束之后,然后下一个 stage 开始运行,全部的 stage 运行完成之后,这个离线任务就跑结束了。

注意: 很多小伙伴都是之前做过离线数仓的,熟悉了离线的分区、计算任务定时调度运行这两个概念,所以在最初接触 Flink SQL 时,会以为 Flink SQL 实时任务也会存在这两个概念,这里博主做一下解释

  1. 分区概念:离线由于能力限制问题,通常都是进行一批一批的数据计算,每一批数据的数据量都是有限的集合,这一批一批的数据自然的划分方式就是时间,比如按小时、天进行划分分区。但是 在实时任务中,是没有分区的概念的,实时任务的上游、下游都是无限的数据流。
  2. 计算任务定时调度概念:同上,离线就是由于计算能力限制,数据要一批一批算,一批一批输入、产出,所以要按照小时、天定时的调度和计算。但是 在实时任务中,是没有定时调度的概念的,实时任务一旦运行起来就是 24 小时不间断,不间断的处理上游无限的数据,不简单的产出数据给到下游。

flink sql 知其所以然(七):不会连最适合 flink sql 的 ETL 和 group agg 场景都没见过吧?

3.DML:SELECT DISTINCT 子句

  1. ⭐ 应用场景(支持 Batch\Streaming):语句和离线 Hive SQL SELECT DISTINCT 语句一样的,xdm,用作根据 key 进行数据去重
  2. ⭐ 直接上案例:
代码语言:javascript
复制
INSERT into target_table
SELECT 
    DISTINCT id 
FROM Orders
  1. SQL 语义

也是拿离线和实时做对比。

这个 SQL 对应的实时任务,假设 Orders 为 kafka,target_table 也为 Kafka,在执行时,会生成三个算子:

  • 数据源算子(From Order):连接到 Kafka topic,数据源算子一直运行,实时的从 Order Kafka 中一条一条的读取数据,然后一条一条发送给下游的 去重算子
  • 去重算子(DISTINCT id):接收到上游算子发的一条一条的数据,然后判断这个 id 之前是否已经来过了,判断方式就是使用 Flink 中的 state 状态,如果状态中已经有这个 id 了,则说明已经来过了,不往下游算子发,如果状态中没有这个 id,则说明没来过,则往下游算子发,也是一条一条发给下游 数据汇算子
  • 数据汇算子(INSERT INTO target_table):接收到上游发的一条一条的数据,写入到 target_table Kafka 中

select distinct

注意: 对于实时任务,计算时的状态可能会无限增长。 状态大小取决于不同 key(上述案例为 id 字段)的数量。为了防止状态无限变大,我们可以设置状态的 TTL。但是这可能会影响查询结果的正确性,比如某个 key 的数据过期从状态中删除了,那么下次再来这么一个 key,由于在状态中找不到,就又会输出一遍。

那么如果这个 SQL 放在 Hive 中执行时,假设其中 Orders 为 Hive 表,target_table 也为 Hive 表,其也会生成三个相同的算子(虽然可能会被优化为一个算子,这里为了方便对比,划分为三个进行介绍),但是其和实时任务的执行方式完全不同:

  • 数据源算子(From Order):数据源从 Order Hive 表(通常都有天、小时分区限制)中一次性读取所有的数据,然后将读到的数据全部发给下游 去重算子,然后 数据源算子 就运行结束了,释放资源了
  • 去重算子(DISTINCT id):接收到上游算子的所有数据,然后遍历所有数据进行去重,将去重完的所有结果数据发给下游 数据汇算子,然后 去重算子 就运行结束了,释放资源了
  • 数据汇算子(INSERT INTO target_table):接收到上游的所有数据,将所有数据都写到 target_table Hive 中,然后整个任务就运行结束了,整个任务的资源也就都释放了

往期推荐

(上)史上最全干货!Flink SQL 成神之路(全文 18 万字、138 个案例、42 张图)

(中)史上最全干货!Flink SQL 成神之路(全文 18 万字、138 个案例、42 张图)

(下)史上最全干货!Flink SQL 成神之路(全文 18 万字、138 个案例、42 张图)

flink sql 知其所以然(十九):Table 与 DataStream 的转转转(附源码)

flink sql 知其所以然(十八):在 flink 中还能使用 hive udf?附源码

flink sql 知其所以然(十七):flink sql 开发利器之 Zeppelin

flink sql 知其所以然(十六):flink sql 开发企业级利器之 Dlink

flink sql 知其所以然(十五):改了改源码,实现了个 batch lookup join(附源码)

flink sql 知其所以然(十四):维表 join 的性能优化之路(上)附源码

flink sql 知其所以然(十三):流 join 很难嘛???(下)

flink sql 知其所以然(十二):流 join 很难嘛???(上)

flink sql 知其所以然(十一):去重不仅仅有 count distinct 还有强大的 deduplication

flink sql 知其所以然(十):大家都用 cumulate window 计算累计指标啦

flink sql 知其所以然(九):window tvf tumble window 的奇思妙解

flink sql 知其所以然(八):flink sql tumble window 的奇妙解析之路

flink sql 知其所以然(七):不会连最适合 flink sql 的 ETL 和 group agg 场景都没见过吧?

flink sql 知其所以然(六)| flink sql 约会 calcite(看这篇就够了)

flink sql 知其所以然(五)| 自定义 protobuf format

flink sql 知其所以然(四)| sql api 类型系统

flink sql 知其所以然(三)| 自定义 redis 数据汇表(附源码)

flink sql 知其所以然(二)| 自定义 redis 数据维表(附源码)

flink sql 知其所以然(一)| source\sink 原理

揭秘字节跳动埋点数据实时动态处理引擎(附源码)

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-05-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据羊说 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.DML:With 子句
  • 2.DML:SELECT & WHERE 子句
  • 3.DML:SELECT DISTINCT 子句
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档