专栏首页SmartSiFlink 管理大型状态之增量 Checkpoint

Flink 管理大型状态之增量 Checkpoint

Apache Flink 是一个有状态的流处理框架。什么是流处理应用程序的状态呢?你可以理解状态为应用程序算子中的内存。状态在流计算很多复杂场景中非常重要,比如:

  • 保存所有历史记录,用来寻找某种事件模式
  • 保存最近一分钟的所有记录,对每分钟的记录进行聚合统计
  • 保存当前的模型参数,用于进行模型训练

但是,有状态的流处理仅在状态可以容错的情况下才建议在生产环境中使用。这里的容错是指即使出现软件或机器故障,计算的最终结果也是准确的,不会出现丢失数据或重复计算的情况。Flink 的容错一直是一个功能强大的特性,可以最大限度地减少软件或机器故障对我们业务带来的影响,并可以保证 Flink 应用程序结果的 Exactly-Once 语义。

Flink 应用程序状态容错保障机制的核心是 Checkpoint。Flink 中的 Checkpoint 是周期性触发的全局异步快照,并发送到持久存储(通常是分布式文件系统)上。如果发生故障,Flink 会使用最近一个完成的快照来恢复应用程序。有些用户的作业状态达到 GB 甚至 TB 级别。这些用户报告说在如此大的状态下,创建 Checkpoint 通常比较耗费时间,也耗费资源,这就是我们为什么在 Flink 1.3 中引入增量 Checkpoint 的原因。

在增量 Checkpoint 之前,每个 Flink Checkpoint 都会包含应用程序的完整状态。每次都对完整状态进行 Checkpoint 是没有必要的,因为从一个 Checkpoint 到下一个 Checkpoint 的状态变化一般都不大,所以我们创建了增量 Checkpoint 功能。增量 Checkpoint 会维护每个 Checkpoint 之间的差异,并仅存储最后一个 Checkpoint 与当前状态之间的差异。

增量 Checkpoint 在状态非常大的情况下性能有很大的改进。有生产用户反馈对于 TB 级别的作业,使用增量 checkpoint 后能将 checkpoint 的整体时间从 3 分钟降到 30 秒。这是因为 Checkpoint 不需要每次都将完整状态传输到持久存储上。

2. 如何使用

目前只能在 RocksDB 状态后端上使用增量 Checkpoint,Flink 依赖 RocksDB 内部的备份机制来生成 Checkpoint 文件。总之,Flink 中的增量 Checkpoint 历史不会无限增长,并且 Flink 会自动删除旧的 Checkpoint。

如果要在您的应用程序中启用增量 Checkpoint,我建议您阅读 Apache Flink 文档有关 Checkpoint 的信息,但总而言之,您可以像以前一样正常启用 Checkpoint,只需要在构造函数中将第二个参数设置为 true 即可启用增量 Checkpoint:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new RocksDBStateBackend(filebackend, true));

默认情况下,Flink 只会保留一个成功的 Checkpoint,如果你需要保留多个的话,可以通过下面的配置进行设置:

state.checkpoints.num-retained

3. 如何工作

Flink 增量 Checkpoint 以 RocksDB 的 Checkpoint 为基础。RocksDB 是一种基于日志结构合并树(LSM)的 KV 存储,把所有的修改保存在内存的可变缓存中(称为 memtable)。所有对 memtable 中 key 的修改,会覆盖之前的 value,当 memtable 写满之后,RocksDB 会将其写入磁盘,按 Key 排序并进行轻度压缩。一旦 RocksDB 将 memtable 写入磁盘,就不可更改了,我们称为有序字符串表(sstable)。

RocksDB 的后台压缩线程会将 sstable 进行合并以删除可能的重复 Key。随着时间的推移,RocksDB 会删除原始 sstable,合并后的 sstable 包含来自所有其他 sstable 的所有信息。

在这个基础上,Flink 会跟踪 RocksDB 自上一个 Checkpoint 以来创建和删除了哪些 sstable 文件,并且由于 sstable 是不可变的,所以 Flink 使用 sstable 来确定状态变化。为此,Flink 调用 RocksDB 的 flush,强制将 memtable 的数据全部写到 sstable,并硬链到本地一个临时目录中。这个步骤是在同步阶段完成,其他剩下的部分都在异步阶段完成,不会阻塞正常的数据处理。

然后 Flink 将所有新的 sstable 复制到持久化存储(例如 HDFS、S3)以在新的 Checkpoint 中引用。Flink 不会将前一个 Checkpoint 中已经存在的 sstable 复制到持久化存储中,而是引用他们。任何新的 Checkpoint 都不会引用已经删除的文件,因为 RocksDB 中文件删除是由压缩完成的,压缩后会将原来的内容合并写成一个新的 sstable。这就是 Flink 增量 Checkpoint 能够切断 Checkpoint 历史的原因。

为了追踪 Checkpoint 间的差距,复制合并后的 sstable 是一个相对冗余的操作。Flink 以增量方式执行该过程,通常只会增加很小的开销,并且可以保持一个更短的 Checkpoint 历史,恢复时从更少的 Checkpoint 进行读取,因此我们认为这是值得的。

4. Example

以一个算子的子任务为例,它有一个 Keyed State,最多保留 2 个 Checkpoint。上图从左到右分别记录每次 Checkpoint 时本地的 RocksDB 状态文件、引用的持久化存储上的文件以及当前 Checkpoint 完成后文件的引用计数情况。

在 ‘CP 1’ Checkpoint 时,本地 RocksDB 目录包含两个 sstable 文件,该 Checkpoint 会把这两个文件复制到持久化存储上,并使用与 Checkpoint 名称一样的目录名称。当 Checkpoint 完成时,Flink 会在共享状态注册表中创建两条记录并将它们的计数设置为 1。共享状态注册表中的 Key 由算子、子任务和原始 sstable 文件名共同组成,值是对应的文件路径。所以注册表还维护了一个从 Key 到文件路径的映射关系。

在 ‘CP 2’ Checkpoint 时,RocksDB 创建了两个新的 sstable 文件,之前两个旧的文件仍然存在。该 Checkpoint 会将这两个新文件复制到持久化存储中,并引用之前的两个文件。当 Checkpoint 完成时,Flink 会将所有引用文件的计数加 1。

在 ‘CP 3’ Checkpoint 时,RocksDB 将 sstable-(1)、sstable-(2) 以及 sstable-(3) 合并为 sstable-(1,2,3),并删除这三个源文件。合并后的文件包含了与源文件相同的信息,并删除了所有重复条目。除了这个合并的文件,sstable-(4) 仍然存在,此外又多了一个新的 sstable-(5) 文件。Flink 将新的 sstable-(1,2,3) 和 sstable-(5) 文件复制到持久化存储中,并对 sstable-(4) 进行引用,并将引用计数加 1。由于 Checkpoint 最多只保留 2 个,因此需要删除旧的 ‘CP 1’ Checkpoint。这会导致在 ‘CP 1’ Checkpoint 中引用的文件 sstable-(1) 和 sstable-(2) 的引用计数减 1。

在 ‘CP 4’ Checkpoint 时,RocksDB 将已经存在的 sstable-(4)、sstable-(5) 以及新生成的 sstable-(6) 合并为 sstable-(4,5,6)。Flink 将 sstable-(4,5,6) 复制到持久化存储中,并对 sstabe-(1,2,3) 和 sstable-(4,5,6) 进行引用,并将引用计数加 1。由于达到 Checkpoint 最大数量,因此删除 ‘CP-2’ Checkpoint。由于 sstable-(1)、sstable-(2) 和 sstable-(3) 的引用计数现在已降至 0,Flink 会将它们从持久化存储中删除。

5. 竞争问题和并发Checkpoint

由于 Flink 可以并行执行多个 Checkpoint,有时在确认前一个的 Checkpoint 完成之前,新的 Checkpoint 就开始了。因此,您应该考虑使用哪一个 Checkpoint 作为新增量 Checkpoint 的基础。Flink 仅从 Checkpoint 协调器确认的 Checkpoint 引用状态,以防止无意中引用已删除的共享文件。

6. 从Checkpoint恢复以及性能

开启增量 Checkpoint 之后,不需要再进行其他额外的配置就可以在发生故障时从状态中恢复。如果发生故障,Flink 的 JobManager 会通知所有 Task 从上一个完成的 Checkpoint 中恢复,不管是全量 Checkpoint 还是增量 Checkpoint。每个 TaskManager 然后从分布式文件系统上的 Checkpoint 下载他们的状态。

尽管增量 Checkpoint 可以显着改善大状态下的 Checkpoint 时间,但增量 Checkpoint 也需要权衡考虑。总体而言,增量 Checkpoint 可以减少了正常操作期间的 Checkpoint 时间,但可能会导致更长的恢复时间,这具体取决于您的状态大小。如果集群故障特别严重并且 Flink TaskManager 必须从多个 Checkpoint 读取,那么恢复时间可能比使用非增量 Checkpoint 时更长。您也不能再删除旧的 Checkpoint,因为新的 Checkpoint 需要它们,并且 Checkpoint 之间的差异历史会随着时间的推移无限增长。您需要规划更大的分布式存储来维护 Checkpoint 以及从它读取的网络开销。

原文:Managing Large State in Apache Flink: An Intro to Incremental Checkpointing

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Apache Flink 管理大型状态之增量 Checkpoint 详解

    Apache Flink 是一个有状态的流计算框架,状态是作业算子中已经处理过的内存状态,供后续处理时使用。状态在流计算很多复杂场景中非常重要,比如:

    zhisheng
  • Apache Flink 管理大型状态之增量 Checkpoint 详解

    作者 | Stefan Ricther & Chris Ward 翻译 | 邱从贤(山智)

    用户6259908
  • Flink 状态管理与 Checkpoint 机制

    相对于其他流计算框架,Flink 一个比较重要的特性就是其支持有状态计算。即你可以将中间的计算结果进行保存,并提供给后续的计算使用:

    zhisheng
  • Flink状态管理和容错机制介绍

    本文整理自去年8月11日在北京举行的 Flink Meetup 会议,分享嘉宾施晓罡,目前在阿里大数据团队部从事Blink方面的研发,现在主要负责Blink状...

    zhisheng
  • Flink DataStream—— 状态(State)&检查点(Checkpoint)&保存点(Savepoint)原理

    ​ 最近一次项目当中需要将大量数据保存再Flink程序当中用作缓存数据一共后续数据使用,隧对最近使用到的状态、检查点、保存点等原理和使用进行一个总结

    俺也想起舞
  • Flink 状态管理和容错机制介绍

    计算任务的结果不仅仅依赖于输入,还依赖于它的当前状态,其实大多数的计算都是有状态的计算。比如wordcount,给一些word,其计算它的count,这是一个很...

    smartsi
  • Flink Savepoints和Checkpoints的3个不同点

    在本文中,我们将解释什么是 Savepoint,什么会使用它们,并就它们与 Checkpoint 的区别进行对比分析。

    smartsi
  • Flink 清理过期 Checkpoint 目录的正确姿势

    本博客是笔者在生产环境使用 Flink 遇到的 Checkpoint 相关故障后,整理输出,价值较高的 实战采坑记,本文会带你更深入的了解 Flink 实现增量...

    zhisheng
  • Flink 1.11 新特性详解:【非对齐】Unaligned Checkpoint 优化高反压

    问题导读 1.Barrier 对齐会造成什么问题? 2.Barrier 对齐是否会造成反压? 3.如何理解Unaligned Checkpoint ? 作为...

    zhisheng
  • 零基础学Flink:状态与容错

    在上一篇《零基础学Flink:实时热销榜Top5(案例)》文档中我们介绍了如何计算实时热销榜。在案例的最后TopNHot类中,我们使用了状态类。

    麒思妙想
  • 理解Flink checkpoint

    Checkpoint机制是Flink可靠性的基石,可以保证Flink集群在某个算子因为某些原因(如异常退出)出现故障时,能够将整个应用流图的状态恢复到故障之前的...

    神秘的寇先森
  • Flink CheckPoint奇巧 | 原理和在生产中的应用

    场景描述:Flink本身为了保证其高可用的特性,以及保证作用的Exactly Once的快速恢复,进而提供了一套强大的Checkpoint机制。这个机制在原理是...

    王知无-import_bigdata
  • [源码分析] 从实例和源码入手看 Flink 之广播 Broadcast

    对黑名单中的IP进行检测过滤。IP黑名单的内容会随时增减,因此是可以随时动态配置的。

    罗西的思考
  • Spark Streaming VS Flink

    本文从编程模型、任务调度、时间机制、Kafka 动态分区的感知、容错及处理语义、背压等几个方面对比 Spark Stream 与 Flink,希望对有实时处理...

    美图数据技术团队
  • Flink on TiDB —— 便捷可靠的实时数据业务支撑

    本文由网易互娱计费数据中心实时业务负责人林佳老师分享,主要介绍网易数据中心在处理实时业务时为什么选择 Flink 和 TiDB,以及两者的结合应用情况。

    PingCAP
  • Apache Flink 零基础入门(一):基础概念解析

    Apache Flink 是一个分布式大数据处理引擎,可对有限数据流和无限数据流进行有状态或无状态的计算,能够部署在各种集群环境,对各种规模大小的数据进行快速计...

    Java帮帮
  • 基于Flink的高可靠实时ETL系统

    GIAC(GLOBAL INTERNET ARCHITECTURE CONFERENCE)是长期关注互联网技术与架构的高可用架构技术社区和msup推出的,面向...

    腾讯技术工程官方号
  • Flink面试通关手册「160题升级版」

    主要是当Flink开启Checkpoint的时候,会往Source端插入一条barrir,然后这个barrir随着数据流向一直流动,当流入到一个算子的时候,这个...

    大数据真好玩
  • flink超越Spark的Checkpoint机制

    同时,浪尖也在知识星球里发了源码解析的文章。spark streaming的Checkpoint仅仅是针对driver的故障恢复做了数据和元数据的Checkpo...

    Spark学习技巧

扫码关注云+社区

领取腾讯云代金券