前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink SQL 内置优化参数功能以及适用场景介绍

Flink SQL 内置优化参数功能以及适用场景介绍

作者头像
LakeShen
发布2022-06-23 14:53:43
1.1K0
发布2022-06-23 14:53:43
举报
文章被收录于专栏:数据库和大数据技术原理解析

前言

这几天在看 Flink SQL 内置优化参数的功能和原理,虽然网上会有一些文章介绍,这里还是自己做一个整体的总结和思考,方便自己以后的回顾。

Flink SQL 内置的优化参数是 Blink Planner 里面的功能,也就是 1.9 以后 Blink Planner 自带功能,从 Flink 1.11 开始,Blink Planner 已经成为 Flink 默认的 Planner,目前聚合优化参数是针对无界流非窗口类聚合,窗口类聚合优化参数未来会进行支持。下面开始讲解一下 Flink SQL 优化参数的功能以及其适用场景,官网相关参考:Streaming Aggregation。

结论

  1. 针对无界流非窗口聚合,在数据量非常大的情况下,如果业务方允许一定的时延,那么可以配置 Mini Batch 参数,通过牺牲一点延迟,降低对于状态的频繁操作,换取更大的吞吐量。同时对于可撤回流做二次聚合时,配置该参数,尽可能降低因数据记录撤回导致数据抖动的问题。
  2. 对于 SUM,COUNT, MAX, MIN,AVG 等非 Distinct 聚合时,为了防止数据倾斜对实时作业的影响,可以配置 Local-Global Aggregation 参数,一般公司内部的实时计算平台会支持该参数的配置。另外一种方式,可以在消息记录后面加一个随机数,然后聚合时,group by 后面的 Key 再加上这个随机数,先打散明细记录再聚合,然后再在该聚合流上进行一次聚合,key 还是之前的业务聚合 key,这种情况也需要配置 Mini Batch,防止数据抖动。
  3. 针对 Distinct 类聚合,配置 table.optimizer.distinct-agg.split.enabled 参数,尽可能减低数据倾斜对于实时作业的影响。
  4. 针对 Distinct 类聚合,同时多个指标都是相同的聚合类型和 Key 时,只是聚合条件不同,可以使用 FILTER 代替 CASE WHEN,能够减小对于状态的访问以及状态的存储大小。

一、Mini Batch 优化参数

1.1 Mini Batch 介绍

默认情况下,在无界流聚合场景下,每来一条记录,会经历下面三个步骤:

  1. 会先获取到这条记录的所对应的 Key,从状态后端获取其状态值
  2. 通过聚合函数,结合之前状态,进行结果计算
  3. 将新的结果值写入到状态后端中

当数据量非常大时,由于每条记录都需要经过上面三个步骤,同时还涉及到序列化和反序列化,所以此时这种场景下,实时作业的吞吐量以及 RocksDB StateBackend 负载都会受到很大的影响,所以在 Blink Planer 中就引入了 MiniBatch 功能。

MiniBatch 的本质还是在内存中缓存一批数据,通过周期性时间或者缓存的记录数到达预设值时,会触发计算。简单理解,会将记录存储在一个 HashMap 中,Key 就是业务聚合 Key,Value 是这个 Key 的消息记录集合,之后会遍历内存的数据(通过 Key),先获取该 Key 之前的状态值,将内存中缓存的数据参与到状态计算,最终写入到状态后端中。通过对数据攒批处理后,降低对于状态后端的操作,从而提升实时作业的吞吐量。Mini Batch 功能是 Flink 在吞吐量以及延迟之间做的权衡。

开启 Mini Bathch 功能有三个参数:

代码语言:javascript
复制
// instantiate table environment
TableEnvironment tEnv = ...

// access flink configuration
Configuration configuration = tEnv.getConfig().getConfiguration();
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true");
configuration.setString("table.exec.mini-batch.allow-latency", "5 s"); 
configuration.setString("table.exec.mini-batch.size", "5000");
1.2 Mini Batch 适用场景

个人认为 Mini Batch 参数开启的适用场景有两点:

应用场景为无界流非窗口聚合时,而且实时任务的数据量非常大,业务方能够允许实时作业有一定延迟,这种情况下,你可以牺牲一点点延迟,来换取更大的实时任务的吞吐量。

对于可撤回流的二次聚合,引入该参数,尽可能降低聚合值突然变小而后又恢复正常值的抖动。比如下面统计最新 word 的次数:

代码语言:javascript
复制
select word,count(*) as 
cnt from (select name,last_value(word) as new_word from source group name) as t group word

由于内层逻辑是一个聚合场景,同时实时数据也可能一直在变,所以内层结果存在撤回情况。当外层聚合逻辑遇到撤回记录时,会减去撤回消息记录 key 的相关结果值,然后在根据新发送的记录进行统计,所以就可能导致结果抖动,尤其在大促期间,大屏实时统计类任务,这种会造成业务方的疑问和担心,为什么结果值变小了,数据会不会丢失等等。引入 Mini Batch 参数,可以对一批数据进行计算后,在进行结果更新,尽可能减少这种数据抖动的情形。

二、Local-Global Aggregation

Local-Global 聚合参数主要解决非 Distinct 聚合场景下,比如 SUM, COUNT, MAX, MIN, AVG,数据倾斜问题。Flink 在进行 keyBy 时,相同的 Key 肯定会到同一 TaskManager 中,所以如果某类 Key 数据量过多时,会造成某个 TaskManager 负载过高,极端情况可能会导致实时作业反压,Checkpoint 超时失败等问题。

Flink Local-Global 聚合类似 Hadoop MapReduce 任务的 Combine,先在上游将结果本地聚合好,在发送聚合后的数据到下游,大大降低了发送到下游的数据量(将明细数据转换成聚合后数据),从而解决数据倾斜问题。下面是 Flink Local-Global 聚合示意图:

使用 Local-Global 聚合优化的前提,需要开启 Mini Batch 功能,下面是代码使用 Local-Global 功能:

代码语言:javascript
复制
// instantiate table environment
TableEnvironment tEnv = ...

// access flink configuration
Configuration configuration = tEnv.getConfig().getConfiguration();
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true"); // local-global aggregation depends on mini-batch is enabled
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
configuration.setString("table.exec.mini-batch.size", "5000");
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");

三、Split Distinct Aggregation

Local-Global 聚合类参数,能够解决非 Distinct 类的聚合场景数据倾斜问题,却无法解决 Distinct 类聚合场景,因为 Distinct 需要记住之前的原始数据,进行去重。下面是可能存在 Distinct 类数据倾斜聚合的 SQL 语句:

代码语言:javascript
复制
SELECT day, COUNT(DISTINCT user_id)
FROM T
GROUP BY day

由于 day 一般是当天的日期,所以这种情况,day 相同的数据都会到同一个 TaskManager 上面去,最终造成实时任务热点。Flink 内置的 Distinct 聚合优化参数table.optimizer.distinct-agg.split.enabled,通过将 Key 相同的记录,分到不同的 BUCKET(桶) 中去,BUCKET 默认数量为 1024,可以通过参数table.optimizer.distinct-agg.split.bucket-num 配置,配置 Split Distinct 聚合优化参数后,上面 SQL 会被转成:

代码语言:javascript
复制
SELECT day, SUM(cnt)
FROM (
    SELECT day, COUNT(DISTINCT user_id) as cnt
    FROM T
    GROUP BY day, MOD(HASH_CODE(user_id), 1024)
)
GROUP BY day

在 day 相同的情况下,通过对 user_Id hash 取模,尽可能把消息打散到多个桶中,多个桶有分散在不同的 TaskManager,可以确定的是,user_id 相同的记录肯定会到同一 TaskManager 上面进行进行聚合。

下图是使用 Local Global 聚合参数和Split Distinct 聚合优化参数示意图:

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-03-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 LakeShen 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 结论
  • 一、Mini Batch 优化参数
    • 1.1 Mini Batch 介绍
      • 1.2 Mini Batch 适用场景
      • 二、Local-Global Aggregation
      • 三、Split Distinct Aggregation
      相关产品与服务
      批量计算
      批量计算(BatchCompute,Batch)是为有大数据计算业务的企业、科研单位等提供高性价比且易用的计算服务。批量计算 Batch 可以根据用户提供的批处理规模,智能地管理作业和调动其所需的最佳资源。有了 Batch 的帮助,您可以将精力集中在如何分析和处理数据结果上。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档