专栏首页腾讯云流计算 OceanusFlink SQL 状态越来越多?Idle State Retention Time 特性概览
原创

Flink SQL 状态越来越多?Idle State Retention Time 特性概览

上一篇文章中,介绍了 Flink State TTL 机制,这项机制对于应对通用的状态暴增特别有效。然而,这个特性也有其缺陷,例如不能保证一定可以及时清理掉失效的状态,以及目前仅支持 Processing Time 时间模式等等,另外对于旧版本的 Flink(1.6 之前),State TTL 功能也无法使用。

针对 Table API 和 SQL 模块的持续查询/聚合语句,Flink 还提供了另一项失效状态清理机制,这就是本文要提到的 Idle State Retention Time 选项,Flink 很早就提供了这个选项,该特性是借助 Query Configuration 配置项来定义的,但很多人并未启用,也不理解其中隐藏的暗坑。本文将对这一特性做说明,并给出一些使用建议。

问题引入

同样以官网文档的案例为起点,这是一个持续查询的 GROUP BY 语句,它没有时间窗口的定义,理论上会无限地计算下去:

SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;

这就带来了一个问题:随着时间的不断推进,内存中积累的状态会越来越多,因为数据流是无穷无尽、持续流入的,Flink 并不知道如何丢弃旧的数据。在这种情况下,如果放任不管,那么迟早有一天作业的状态数达到了存储系统的容量极限,从而造成作业的崩溃。

针对这个问题,Flink 提出了空闲状态保留时间(Idle State Retention Time)的概念。通过为每个状态设置 Timer,如果这个状态中途被访问过,则重新设置 Timer;否则(如果状态一直未被访问,长期处于 Idle 状态)则在 Timer 到期时做状态清理。这样,就可以确保每个状态都能得到及时的清理。

通过调用 StreamQueryConfig 的 withIdleStateRetentionTime 方法,可以为这个 QueryConfig 对象设置最小和最大的清理周期。这样,Flink 可以保证最早和最晚的状态清理时间。

需要注意的是,旧版本 Flink 允许只指定一个参数,表示最早和最晚清理周期相同,但是这样可能会导致同一时间段有很多状态都到期,从而造成瞬间的处理压力。新版本的 Flink 要求两个参数之间的差距至少要达到 5 分钟,从而避免大量状态瞬间到期,对系统造成的冲击。

StreamQueryConfig qConfig = ...

// set idle state retention time: min = 12 hours, max = 24 hours
qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24));

这里需要注意一点,默认情况下 StreamQueryConfig 的设置并不是全局的。因此当设置了清理周期以后,需要在 StreamTableEnvironment 类调用 toAppendStream 或 toRetractStream 将 Table 转为 DataStream 时,显式传入这个 QueryConfig 对象作为参数,才可以令该功能生效。

新版本的 Flink 提供了一个 QueryConfigProvider 类(它实现了 PlannerConfig 接口,允许嵌入一个 StreamQueryConfig 对象),可以通过对 TableConfig 设置 PlannerConfig 的方式(调用 addPlannerConfig 方法),来传入设置好 StreamQueryConfig 对象的 QueryConfigProvider. 这样,当 StreamPlanner 将定义的 Table 翻译为 Plan 时,可以自动使用之前定义的 StreamQueryConfig,从而实现全局的 StreamQueryConfig 设定。对于旧的 Flink 版本,只能通过修改源码的方式来设置,较为繁琐。

实现方式

Idle State Retention Time 的代码完全位于 flink-table 相关模块下,因此只有 Table API / SQL 的编程方式才可以用到这个特性。

具体来说,在 org.apache.flink.table.plan.nodes.datastream 包下,有三个类:DataStreamGroupAggregateBase(对应无时间窗口限定的 GROUP BY 语句)、DataStreamGroupWindowAggregateBase(对应有时间窗口限定的 GROUP BY 语句)、DataStreamOverAggregate(对应 OVER 语句)。当调用这三个类的 translateToPlan 方法时,如果没有指定 Idle State Retention Time,则会打印一行 WARNING 级别的日志,表明状态会无限增长。

而在 org.apache.flink.table.runtime.aggregate 包下,Flink 定义了名为 CleanupState 的 Scala Trait, 代码如下:

trait CleanupState {

  def registerProcessingCleanupTimer(
      cleanupTimeState: ValueState[JLong],  // 上次注册的 Timer 时间戳
      currentTime: Long,                    // 当前时间戳
      minRetentionTime: Long,               // 空闲状态最短保留时间
      maxRetentionTime: Long,               // 空闲状态最长保留时间
      timerService: TimerService): Unit = {

    // 获取本状态上次注册的 Timer 时间戳
    val curCleanupTime = cleanupTimeState.value()

    // 检查是否注册过清理的 Timer, 如果注册过则检查是否还未到期
    if (curCleanupTime == null || (currentTime + minRetentionTime) > curCleanupTime) {
      // 如果没有注册过 Timer, 或者注册过但是还没到期, 那就更新一个新的 Timer
      val cleanupTime = currentTime + maxRetentionTime
      timerService.registerProcessingTimeTimer(cleanupTime)
      // 删除旧的 Timer
      if (curCleanupTime != null) {
        timerService.deleteProcessingTimeTimer(curCleanupTime)
      }
      cleanupTimeState.update(cleanupTime)
    }
  }
}

可以看到,在新版本的 Flink 内部实现中,Timer 的时间戳也是作为一种 ValueState 来保存的,这样可以和其他的 Keyed 状态一起,统一管理。同时也能得到 Flink 刷新时间戳的逻辑。

从 Flink 的实现原理上我们知道,对于 KeyedProcessFunction,都有一个

public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}

方法,这个方法会被 Flink 的 InternalTimerService 所间接调用,从而当向 timerService 注册的 Timer 到期后,会进入处理逻辑。

为了支持 CleanupState 功能,Flink 还提供了一个名为 ProcessFunctionWithCleanupState 的抽象基类,它实现了 CleanupState 特性(接口),并继承了 KeyedProcessFunction 类,用来处理 GROUP BY 和 OVER 等语句。在这个类中,提供了若干公共的状态定时器的注册和清理方法:

abstract class ProcessFunctionWithCleanupState[KEY, IN,OUT](queryConfig: StreamQueryConfig)
  extends KeyedProcessFunction[KEY, IN, OUT]
  with CleanupState {

  protected val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime
  protected val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime
  protected val stateCleaningEnabled: Boolean = minRetentionTime > 1

  // 保存最近注册过的清理 Timer 时间戳
  protected var cleanupTimeState: ValueState[JLong] = _

  // 初始化指定 stateName 的状态清理 Timer 状态, 例如在 GroupAggProcessFunction 的 open() 方法里被调用
  protected def initCleanupTimeState(stateName: String) {
    if (stateCleaningEnabled) {
      val cleanupTimeDescriptor: ValueStateDescriptor[JLong] = new ValueStateDescriptor[JLong](stateName, Types.LONG)
      cleanupTimeState = getRuntimeContext.getState(cleanupTimeDescriptor)
    }
  }
  
  
  // 注册或更新状态清理 Timer, 例如在 GroupAggProcessFunction 的 processElement() 方法里被调用
  protected def processCleanupTimer(
    ctx: KeyedProcessFunction[KEY, IN, OUT]#Context,
    currentTime: Long): Unit = {
    if (stateCleaningEnabled) {
      registerProcessingCleanupTimer(
        cleanupTimeState,
        currentTime,
        minRetentionTime,
        maxRetentionTime,
        ctx.timerService()
      )
    }
  }

  // 判断当前是 Processing Time 还是 Event Time 时间模式, 两者触发方式不同
  // Processing Time 由定时任务触发, 而 Event Time 由 Watermark 步进触发, 而状态清理仅在 Processing Time 触发时启动
  protected def isProcessingTimeTimer(ctx: OnTimerContext): Boolean = {
    ctx.timeDomain() == TimeDomain.PROCESSING_TIME
  }

  // 状态列表清理
  protected def cleanupState(states: State*): Unit = {
    // clear all state
    states.foreach(_.clear())
    this.cleanupTimeState.clear()
  }
}

可以很清晰地看到,Flink 每收到上游传来的一条记录(element),就会更新这个算子所对应的状态。因此如果一个状态持续被读取,那么并不会被标记为空闲,也就不会被清理掉。这点和 State TTL 的 OnReadAndWrite 更新类型很一致(写入和读取时都更新时间戳),而区别于 OnCreateAndWrite 更新类型(只在写入时更新时间戳,而读取时不更新时间戳)。

另外还有一个 CoProcessFunctionWithCleanupState 类,这个类的作用和上述类似,只是为 JOIN 相关的处理逻辑服务,这里不再详细展开。

下面我们以 ProcTimeBoundedRangeOver 类(该类继承了 ProcessFunctionWithCleanupState)的 onTimer 方法为例,讲解 Timer 到期后是如何清理的:

override def onTimer(
    timestamp: Long,
    ctx: KeyedProcessFunction[K, CRow, CRow]#OnTimerContext,
    out: Collector[CRow]): Unit = {

    if (stateCleaningEnabled) {
        val cleanupTime = cleanupTimeState.value()
        if (null != cleanupTime && timestamp == cleanupTime) {
            // 清理状态, 然后即可返回(无需执行后续的逻辑)
            cleanupState(rowMapState, accumulatorState)
            function.cleanup()
            return
        }
    }

    // ... 其他处理逻辑 ...
}

可以看到,当 Timer 到期后,onTimer 方法会被 Flink 的 InternalTimerService 调用,随后判断是否启用了状态清理逻辑,如果启用的话,获取要清理的时间戳。如果时间戳吻合,那么调用父类的 cleanupState 方法,执行具体清理逻辑。

对于 Window 而言,Flink 还提供了一个 StateCleaningCountTrigger,它可以对 Tumbling(滚动)窗口的元素做统计并清理过期的行。感兴趣的同学可以自行阅读其实现逻辑,与上述介绍也很类似。

实现优化

Flink 的空闲状态清理 Timer 也有其不足之处,例如状态清理 Timer 本身就是 ValueState 对象,当 Timer 数目过多时,会对内存造成很大的压力,甚至导致作业的提前崩溃。而且针对 Timer 的快照只能是全量、同步的,和其他 Keyed 状态的实现方式不统一,增加了改进的难度。针对这些问题,社区提出了将 Timer 保存到 RocksDB State Backend 的思路并进行了实现。

另外在旧的实现逻辑里,HeapInternalTimerService 的 Timer 的清理时间复杂度是 O(n),当状态数目超多时同样会造成性能影响。通过引入优先级队列(Priority Queue)和 HashSet,可以做到更高效的 Timer 删除操作。

通过我们的使用经验来看,目前 Idle State Retention Time 的实现还不够成熟,有些特殊情况下反而会加重问题。例如它读取时也会更新时间戳,导致如果一个状态持续被读取,而很久未写入,那么仍然不会被清理掉,即使它已经逻辑上过期了,但 Flink 并不知道。但是瑕不掩瑜,通过与 State TTL 功能配合使用,可以对大状态下任务的崩溃起到很好的预防效果。

参考文章

[FLINK-9485] Improving Flink’s timer management for large state

[PROPOSAL] Improving Flink’s timer management for large state.

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Flink 快照分析:定位大状态和数据倾斜的算子

    在 Flink 作业中,无论是 SQL 还是 JAR 模式,常常会直接或者间接地使用到状态(State)。当 Flink 进行快照时,用户定义的这些状态数据可以...

    KyleMeow
  • Flink 状态管理详解(State TTL、Operator state、Keyed state)

    Flink官网的自我介绍:Apache Flink® — Stateful Computations over Data Streams,可以看出状态计算是 F...

    kk大数据
  • Flink 快照分析:定位大状态和数据倾斜的算子

    在 Flink 作业中,无论是 SQL 还是 JAR 模式,常常会直接或者间接地使用到状态(State)。当 Flink 进行快照时,用户定义的这些状态数据可...

    腾讯QQ大数据
  • 《从0到1学习Flink》—— Flink 配置文件详解

    前面文章我们已经知道 Flink 是什么东西了,安装好 Flink 后,我们再来看下安装路径下的配置文件吧。

    zhisheng
  • Flink SQL空闲状态保留时间实现原理

    如果要列举Flink SQL新手有可能犯的错误,笔者认为其中之一就是忘记设置空闲状态保留时间导致状态爆炸。

    王知无-import_bigdata
  • Flink SQL Client初探

    Flink Table & SQL的API实现了通过SQL语言处理实时技术算业务,但还是要编写部分Java代码(或Scala),并且还要编译构建才能提交到Fli...

    程序员欣宸
  • 如何应对飞速增长的状态?Flink State TTL 概述

    在流计算作业中,经常会遇到一些状态数不断累积,导致状态量越来越大的情形。例如,作业中定义了超长的时间窗口,或者在动态表上应用了无限范围的 GROUP BY 语句...

    KyleMeow
  • 数仓大法好!跨境电商 Shopee 的实时数仓之路

    Shopee 是东南亚与台湾领航电商平台,覆盖新加坡、马来西亚、菲律宾、台湾、印度尼西亚、泰国及越南七大市场,同时在中国深圳、上海和香港设立跨境业务办公室。

    大数据学习与分享
  • 带你走入 Flink 的世界

    在 18 年时,就听说过 Flink 流式计算引擎,是阿里调研选型选择的新一代大数据框计算架,当时就记住了这个新框架。

    纯洁的微笑
  • 【极客说第一期】面向未来的数据处理--实时流处理平台的实践分享

    随着移动设备、物联网设备的持续增长,流式数据呈现了爆发式增长,同时,越来越多的业务场景对数据处理的实时性有了更高的要求,基于离线批量计算的数据处理平台已经无法满...

    极客说
  • 为什么要学 Flink,Flink 香在哪?

    知道大数据的同学也应该知道 Flink 吧,最近在中国的热度比较高,在社区的推动下,Flink 技术栈在越来越多的公司开始得到应用。

    数据社
  • ​从 Spark Streaming 到 Apache Flink:bilibili 实时平台的架构与实践

    摘要:本文由 bilibili 大数据实时平台负责人郑志升分享,基于对 bilibili 实时计算的痛点分析,详细介绍了 bilibili Saber 实时计算...

    Spark学习技巧
  • 大数据计算引擎,你 pick 哪个?

    我是 2018 年 6 月加入公司,一直负责监控平台的告警系统。之后,我们的整个监控平台架构中途换过两次,其中一次架构发生了巨大的变化。我们监控告警平台最早的架...

    用户1737318
  • Flink Forward Asia 2019 会议所有 PPT 下载

    11 月 28 - 30 日,北京迎来了入冬以来的第一场雪,2019 Flink Forward Asia(FFA)也在初雪的召唤下顺利拉开帷幕。尽管天气寒冷,...

    zhisheng
  • Flink源码走读(一):Flink工程目录

    导语 | Flink已经成为未来流计算趋势,目前在很多大厂已经有了大规模的使用。最近在学习Flink源码,就想把自己学习的过程分享出来,希望能帮助到志同道合的朋...

    2011aad
  • 独家 | 一文读懂Apache Flink技术

    本文来自9月1日在成都举行的Apache Flink China Meetup,分享来自于云邪。

    数据派THU
  • www6669988com请拨18687679362_环球国际Flink源码走读(一):Flink工程目录

    导语 | Flink已经成为未来流计算趋势,目前在很多大厂已经有了大规模的使用。最近在学习Flink源码,就想把自己学习的过程分享出来,希望能帮助到志同道合的朋...

    用户7106032
  • 一文让你彻底了解大数据实时计算引擎 Flink

    在上一篇文章 你公司到底需不需要引入实时计算引擎? 中我讲解了日常中常见的实时需求,然后分析了这些需求的实现方式,接着对比了实时计算和离线计算。随着这些年大数据...

    zhisheng
  • Apache Flink 零基础入门(一):基础概念解析

    Apache Flink 是一个分布式大数据处理引擎,可对有限数据流和无限数据流进行有状态或无状态的计算,能够部署在各种集群环境,对各种规模大小的数据进行快速计...

    Java帮帮

扫码关注云+社区

领取腾讯云代金券