Hi~朋友,关注置顶防止错过消息
什么是有状态的计算?
有状态计算指的就是程序在计算过程中,需要将数据(状态)存储在本地存储或者外部存储中,以便下一次进行计算时获取使用,比如统计Nginx某个地址的调用次数,需要在每次计算时 不停的进行累加,并且将结果进行存储以便下次累加获取使用。
使用状态的场景
为什么需要状态管理?
流式作业一般需要7*24小时不间断的运行,在宕机恢复时需要保证数据不丢失,在计算时要保证计算结果准确,数据不重复,恰好计算1次,为了达到上述这些目的,我们就需要对 程序运行过程中的状态进行管理。
理想状态管理的特点
Flink状态分类
Managed State | RawState | |
---|---|---|
状态管理方式 | Flink Runtime自动管理:自动存储、自动恢复、内存优化 | 用户自己管理,需要自己序列化 |
状态数据结构 | 已知的数据结构:Value、List、Map等 | 字节数组byte[] |
推荐使用场景 | 大多数情况下可以使用 | 自定义Operator时使用 |
Managed State分类
Managed State主要分为两类:
Keyed State特点
Operator State特点
Keyed Stated具体分类
如何保存状态
保存状态依赖Checkpoint和Savepoint机制,Checkpoint是在程序运行过程中自动触发,Savepoint需要手动触发。
如果从Checkpoint进行恢复,需要保证数据源支持重发,同时Flink提供了两种一致性语义(恰好一次或者至少一次)。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000L);
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
checkpointConfig.setMinPauseBetweenCheckpoints(500L);
checkpointConfig.setCheckpointTimeout(60000L);
checkpointConfig.setMaxConcurrentCheckpoints(1);
checkpointConfig.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
Checkpoint和Savepoint区别
Checkpoint | Savepoint | |
---|---|---|
触发管理方式 | Flink自动触发管理 | 用户手动触发管理 |
用途 | Task发生异常时快速恢复 | 有计划地进行备份,作业停止后可以恢复,比如修改代码、调整并发 |
特点 | 轻量;自动从故障恢复;作业停止后默认清除 | 持久;标准格式存储,允许代码或配置发生改变;手动触发从Savepoint的恢复 |
状态保存在哪里?
状态保存有三种方式:
MemoryStateBackend在Checkpoint是基于内存保存状态,该状态存储在TaskManager节点(执行节点)的内存中,因此会受到内存容量的限制(默认5M),同时还要受到akka.framesize的限制 (默认10M)。Checkpoint保存在JobManager内存中,因此总大小不能超过JobManager的内存,只推荐本次测试或无状态的作业使用。
FsStateBackend是基于文件系统保存状态的,状态依旧保存在TaskManager中,因此State不能超过单个TaskManager的内存容量,Checkpoint存储在外部文件系统中(比如HDFS或本地),打破了JobManager内存的限制, 但是总大小不能超过文件系统的容量,推荐状态小的作业使用。
RocksDBStateBackend,首先RocksDB是一个K-V的内存存储系统,当内存快满时,会写入到磁盘,RocksDB也是唯一支持增量Checkpoint的Backend,这说明用户不需要将所有状态都写入进去,可以 只将增量改变的状态写入即可。Checkpoint存储在外部文件系统,因此State不能超过单个TaskManager内存+磁盘总和,单key最大为2GB,总大小不超过文件系统的容量即可,推荐大状态作业使用。