前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >如何应对飞速增长的状态?Flink State TTL 概述

如何应对飞速增长的状态?Flink State TTL 概述

原创
作者头像
KyleMeow
修改2021-09-29 20:48:55
14.6K0
修改2021-09-29 20:48:55
举报

在流计算作业中,经常会遇到一些状态数不断累积,导致状态量越来越大的情形。例如,作业中定义了超长的时间窗口,或者在动态表上应用了无限范围的 GROUP BY 语句,以及执行了没有时间窗口限制的双流 JOIN 等等操作。对于这些情况,旧版本的 Flink 并不能很好应对,经常导致堆内存出现 OOM,或者堆外内存(RocksDB)用量持续增长导致超出容器的配额上限,造成作业的频繁崩溃,业务不能正常运行。

从 Flink 1.6 版本开始,社区引入了 State TTL 特性,该特性可以允许对作业中定义的 Keyed 状态进行超时自动清理(通常情况下,Flink 中大多数状态都是 Keyed 状态,只有少数地方会用到 Operator 状态,因此本文的“状态”均指的是 Keyed 状态),并且提供了多个设置参数,可以灵活地设定时间戳更新的时机、过期状态的可见性等,以应对不同的需求场景。

本质上来讲,State TTL 功能给每个 Flink 的 Keyed 状态增加了一个“时间戳”,而 Flink 在状态创建、写入或读取(可选)时更新这个时间戳,并且判断状态是否过期。如果状态过期,还会根据可见性参数,来决定是否返回已过期但还未清理的状态等等。状态的清理并不是即时的,而是使用了一种 Lazy 的算法来实现,从而减少状态清理对性能的影响。

当前最新的 Flink 1.8 版本对 State TTL 功能做了进一步的完善,增加了若干新特性。本文将对这些特性和 Flink 内部对 State TTL 的实现方式做介绍。

State TTL 功能的用法

为了熟悉一个功能特性,最直观的方式是了解它的用法。在 Flink 的官方文档 中,用法示例如下:

代码语言:java
复制
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();
    
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);

可以看到,要使用 State TTL 功能,首先要定义一个 StateTtlConfig 对象。这个 StateTtlConfig 对象可以通过构造器模式(Builder Pattern)来创建,典型地用法是传入一个 Time 对象作为 TTL 时间,然后设置更新类型(Update Type)和状态可见性(State Visibility),这两个功能的含义将在下面的文章中详细描述。当 StateTtlConfig 对象构造完成后,即可在后续声明的状态描述符(State Descriptor)中启用 State TTL 功能了。

从上述的代码也可以看到,State TTL 功能所指定的过期时间并不是全局生效的,而是和某个具体的状态所绑定。换而言之,如果希望对所有状态都生效,那么就需要对所有用到的状态定义都传入 StateTtlConfig 对象。对 Flink 源码感兴趣的同学,可以尝试为 Flink 增加一个默认的 StateTTL 选项,实现起来很简单,这里不再展开说明了。

State TTL 使用的更多案例,可以参见官方的 flink-stream-state-ttl-test 包,它提供了很多测试用例可以参考。

StateTtlConfig 的参数说明

  • TTL:表示状态的过期时间,是一个 org.apache.flink.api.common.time.Time 对象。一旦设置了 TTL,那么如果上次访问的时间戳 + TTL 超过了当前时间,则表明状态过期了(这是一个简化的说法,严谨的定义请参考 org.apache.flink.runtime.state.ttl.TtlUtils 类中关于 expired 的实现) 。
  • UpdateType:表示状态时间戳的更新的时机,是一个 Enum 对象。如果设置为 Disabled,则表明不更新时间戳;如果设置为 OnCreateAndWrite,则表明当状态创建或每次写入时都会更新时间戳;如果设置为 OnReadAndWrite,则除了在状态创建和写入时更新时间戳外,读取也会更新状态的时间戳。
  • StateVisibility:表示对已过期但还未被清理掉的状态如何处理,也是 Enum 对象。如果设置为 ReturnExpiredIfNotCleanedUp,那么即使这个状态的时间戳表明它已经过期了,但是只要还未被真正清理掉,就会被返回给调用方;如果设置为 NeverReturnExpired,那么一旦这个状态过期了,那么永远不会被返回给调用方,只会返回空状态,避免了过期状态带来的干扰。
  • TimeCharacteristic 以及 TtlTimeCharacteristic:表示 State TTL 功能所适用的时间模式,仍然是 Enum 对象。前者已经被标记为 Deprecated(废弃),推荐新代码采用新的 TtlTimeCharacteristic 参数。截止到 Flink 1.8,只支持 ProcessingTime 一种时间模式,对 EventTime 模式的 State TTL 支持还在开发中
  • CleanupStrategies:表示过期对象的清理策略,目前来说有三种 Enum 值。当设置为 FULL_STATE_SCAN_SNAPSHOT 时,对应的是 EmptyCleanupStrategy 类,表示对过期状态不做主动清理,当执行完整快照(Snapshot / Checkpoint)时,会生成一个较小的状态文件,但本地状态并不会减小。唯有当作业重启并从上一个快照点恢复后,本地状态才会实际减小,因此可能仍然不能解决内存压力的问题。为了应对这个问题,Flink 还提供了增量清理的枚举值,分别是针对 Heap StateBackend 的 INCREMENTAL_CLEANUP(对应 IncrementalCleanupStrategy 类),以及对 RocksDB StateBackend 有效的 ROCKSDB_COMPACTION_FILTER(对应 RocksdbCompactFilterCleanupStrategy 类). 对于增量清理功能,Flink 可以被配置为每读取若干条记录就执行一次清理操作,而且可以指定每次要清理多少条失效记录;对于 RocksDB 的状态清理,则是通过 JNI 来调用 C++ 语言编写的 FlinkCompactionFilter 来实现,底层是通过 RocksDB 提供的后台 Compaction 操作来实现对失效状态过滤的。

State TTL 的实现原理

为了了解 State TTL 功能在 Flink 代码的实现,同样可以从 StateTtlConfig 类开始顺藤摸瓜。可以看到,关于 State TTL 的实现代码,主要集中在 flink-runtime 模块的 org.apache.flink.runtime.state.ttl 包,以及 flink-statebackend-rocksd 模块的 org.apache.flink.contrib.streaming.state.ttl 包下。

首先我们来看一下 flink-runtime 模块是如何定义和实现 TTL 功能的,这里面有多个类可以特别留意:

TtlValue 类

这个类是一个包装类,它可以为任意的值对象增加一个 lastAccessTimestamp 的时间戳,并且可以获取传入的对象以及时间戳。但需要注意的是,一旦初始化,所有参数就不可以改变。它是 State TTL 状态保存的基本单元,可以通过 TtlUtils 工具类提供的 wrapWithTs(value, timestamp) 方法将一个普通值对象包装为 TtlValue 对象。

代码语言:java
复制
public class TtlValue<T> implements Serializable {

	private final T userValue;
	private final long lastAccessTimestamp;

	public TtlValue(T userValue, long lastAccessTimestamp) {
		this.userValue = userValue;
		this.lastAccessTimestamp = lastAccessTimestamp;
	}

	public T getUserValue() {
		return userValue;
	}

	public long getLastAccessTimestamp() {
		return lastAccessTimestamp;
	}
}

AbstractTtlDecorator 及子类

这是一个抽象的包装类。把 Flink 原有的状态(State Handler)与用户设置的 StateTtlConfig 对象一起传入这个类的构造方法后,将会根据前面介绍的多个参数,对这个类的若干布尔常量做赋值,例如 updateTsOnRead 表示是否在读取记录时更新时间戳,returnExpired 表示是否允许返回已过期的状态等,从而返回一个支持 TTL 的状态对象。

它有若干子类,例如对我们常见的 Aggregating / List / Map / Reducing / Folding / Value 等六种状态类型,均提供了具体的实现。这个抽象类还提供了若干工具方法,例如判断状态值是否过期、将普通的值包装为带时间戳的状态值等,同时还提供了 TTL 检查是否过期以及过期后的增量清理等逻辑。

AbstractTtlState 类的子类
AbstractTtlState 类的子类

例如在下面的 getWrappedWithTtlCheckAndUpdate 方法中,首先会调用传入的 getter 对象来获取 TtlValue 对象,它是一个普通的状态加上了时间戳,然后判断它是否为 null 以及是否过期,如果过期就调用传入的 stateClear 对象来做清理,它是一个 ThrowableRunnable 对象,约等于 Java 自带的 Runnable,只是允许抛出给定的异常。正如之前所言,这个方法会根据之前传入的 StateTtlConfig 的参数而决定,是否在读取时更新时间戳,以及在过期后是否返回过期的状态等。updater 对象则负责处理更新时间戳等操作。

代码语言:java
复制
<SE extends Throwable, CE extends Throwable, CLE extends Throwable, V> TtlValue<V> getWrappedWithTtlCheckAndUpdate(
    SupplierWithException<TtlValue<V>, SE> getter,
    ThrowingConsumer<TtlValue<V>, CE> updater,
    ThrowingRunnable<CLE> stateClear)    // 增量清理状态的 Runnable 类(经过 Flink 封装,允许在运行时抛出异常
    throws SE, CE, CLE {
    TtlValue<V> ttlValue = getter.get();
    if (ttlValue == null) {
        return null;
    } else if (expired(ttlValue)) {
        stateClear.run();
        if (!returnExpired) {
            return null;
        }
    } else if (updateTsOnRead) {
        updater.accept(rewrapWithNewTs(ttlValue));
    }
    return ttlValue;
}

而我们以 TtlMapState 为例,看一下上述 方法的调用情况:

代码语言:javascript
复制
private TtlValue<UV> getWrapped(UK key) throws Exception {
   accessCallback.run();
   return getWrappedWithTtlCheckAndUpdate(
      () -> original.get(key), v -> original.put(key, v), () -> original.remove(key));
}

可以看到,getter、updater、stateClear 对象的定义方式非常简单,采用 Lambda 表达式,清晰地描述了各自的动作。original 表示原始的 Flink 状态(State Handler),可以看到具体的操作还是在原始状态对象上进行的,这个类只是一个装饰器,给原始状态对象增加了时间戳以及过期判断等逻辑。

然后再看下 TtlMapState 的 put 方法:

代码语言:java
复制
@Override
public void put(UK key, UV value) throws Exception {
    accessCallback.run();
    original.put(key, wrapWithTs(value));
}

这里的 accessCallback 是一个回调的 Runnable 对象,用来实现过期状态的增量清理逻辑,它会在每个 put 或 get 方法被调用前都执行一次。限于篇幅限制,这里不再展开叙述,感兴趣的同学可以参考 TtlStateFactory 类的 registerTtlIncrementalCleanupCallback() 方法。本文下面的示例代码中也给出了具体实现逻辑。

下面两张图是 TtlMapState 和普通的 HeapMapState 的继承关系图,可以清楚地看到,支持 TTL 的 State 对象与普通的 State 对象之间并没有很大的区别,只是增加了一下辅助方法以扩展了 TTL 的特性。另外一个区别在于它只是一个包装类,需要传入其他的 State 对象才可以完成其功能。

TtlMapState 类继承关系图
TtlMapState 类继承关系图
普通 HeapMapState 继承关系图
普通 HeapMapState 继承关系图

TtlStateContext 类

这个类主要用于初始化上面提到的 AbstractTtlDecorator 类,它包含了实例化 TTL 状态类所需的所有参数,例如被包装的普通状态对象(original)及其所需的 TypeSerializer、StateTtlConfig 对象(前文有讲)、TtlTimeProvider 对象(用来获取当前的时间戳,默认情况下只是对 System.currentTimeMillis() 的封装,用户也可以自定义,例如测试代码里提供的 MonotonicTTLTimeProvider 单调递增时间戳生成器等)、Runnable 类型的 accessCallback 对象(用来做失效对象的具体清理,前文已经描述过)。

TtlStateContext 对象可以由 TtlStateFactory 类来动态创建,它是一个工厂模式的类:

代码语言:java
复制
private IS createValueState() throws Exception {
    ValueStateDescriptor<TtlValue<SV>> ttlDescriptor = new ValueStateDescriptor<>(    // 创建一个 TtlValue 类型的 State Descriptor, 可以看到它是一个复合类型
        stateDesc.getName(), new TtlSerializer<>(LongSerializer.INSTANCE, stateDesc.getSerializer()));
    return (IS) new TtlValueState<>(createTtlStateContext(ttlDescriptor));
}

... ... ...

private <OIS extends State, TTLS extends State, V, TTLV> TtlStateContext<OIS, V>
    createTtlStateContext(StateDescriptor<TTLS, TTLV> ttlDescriptor) throws Exception {

    ttlDescriptor.enableTimeToLive(stateDesc.getTtlConfig());     // 从给定的 State Descriptor 获取 TTL 时间
    OIS originalState = (OIS) stateBackend.createInternalState(   // 创建原始状态
        namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory());
    return new TtlStateContext<>(    // 将原始状态包装到 TtlStateContext 类型中,用于随后生成具体的 TTL State 对象,例如 TtlValueState
        originalState, ttlConfig, timeProvider, (TypeSerializer<V>) stateDesc.getSerializer(),
        registerTtlIncrementalCleanupCallback((InternalKvState<?, ?, ?>) originalState));
}

private Runnable registerTtlIncrementalCleanupCallback(InternalKvState<?, ?, ?> originalState) {
    StateTtlConfig.IncrementalCleanupStrategy config =
        ttlConfig.getCleanupStrategies().getIncrementalCleanupStrategy();
    boolean cleanupConfigured = config != null && incrementalCleanup != null;
    boolean isCleanupActive = cleanupConfigured &&
        isStateIteratorSupported(originalState, incrementalCleanup.getCleanupSize());
    Runnable callback = isCleanupActive ? incrementalCleanup::stateAccessed : () -> { };    // stateAccessed  执行具体的清理逻辑
    if (isCleanupActive && config.runCleanupForEveryRecord()) {
        stateBackend.registerKeySelectionListener(stub -> callback.run());
    }
    return callback;
}

可以看到,经过初始化后,它可以把一个 Flink 的任意普通状态对象(只要实现了 InternalKvState 接口,例如 HeapValueState 和 RocksDBValueState 等),转化为支持 TTL 和增量清理功能的状态对象。这样在今后的 Flink 状态调用过程中,只要调用了状态的 get / put / update 等通用方法,都会自动地对失效状态进行判断、清理等操作,而 Flink 并不需要知道其背后的实现逻辑,只是把这些状态对象当作普通的来使用即可。这种封装的方式也体现了 Flink 的可扩展性,避免实现细节对上层调用逻辑产生干扰。

接下来,我们简单看下 Flink 是如何在 RocksDB 中实现 State TTL 的。Flink 提供了一个 RocksDbTtlCompactFiltersManager 类,它的实现了一个 RocksDB 的 Compaction Filter,这样当 RocksDB 在后台执行 Compaction 操作时,可以过滤掉那些失效的 Key 及 Value. 这里的 Compaction Filter 名为 FlinkCompactionFilter,是通过 C++ 编写的原生代码,而 Flink 这里通过 JNI 的方式来调用。对于具体实现细节,代码并不在 Flink 的 Git 仓库里,而被放在了专为 Flink 定制的 RocksDBJNI 封装库 frocksdbjni,详细实现可以参考 1 2 这两个 Git 提交记录。

总结

Flink 的 State TTL 特性的引入,解决了长期以来困扰用户的问题:随着状态数越来越多,旧的状态无法及时被清除(尤其是通过 Flink Table / SQL API 来创建的作业,用户无法显式地管理状态),导致系统越来越不稳定。目前 State TTL 仅对 Processing Time 时间模式有效,但通过与开发者进行交流,Flink 在不远的今后也将对 Event Time 的 State TTL 特性提供支持。

State TTL 特性是基于状态后端底层的状态实现的,不同于 Table 模块基于 Timer 机制实现的 Idle State Retention Time 机制,后者局限性很大,且 Timer 的数目过多时,本身也会对内存造成巨大的压力,但优点在于状态的清理可以很实时,不必像 State TTL 一样只能增量地每次清理一小部分或者在后台异步清理。因此对于 Table / SQL 作业,两种机制可以结合使用,以应对逐渐增加的状态带来的挑战。

参考文章

Apache Flink 1.8 Documentation: Working with State

State TTL for Apache Flink: How to Limit the Lifetime of State

[FLINK-3089] State API Should Support Data Expiration (State TTL)

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • State TTL 功能的用法
  • StateTtlConfig 的参数说明
  • State TTL 的实现原理
    • TtlValue 类
      • AbstractTtlDecorator 及子类
        • TtlStateContext 类
        • 总结
        • 参考文章
        相关产品与服务
        流计算 Oceanus
        流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的企业级实时大数据分析平台,具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档