3.2 RocksDB 模式 ValueState 和 MapState 是如何存储的 RocksDB 模式表示所有的状态数据存储在 TM 本地的 RocksDB 数据库中。...查询数据也用相同的逻辑:将 key 和 namespace 序列化后拼接起来作为 RocksDB 的 key,去 RocksDB 中进行查询,查询到的 byte 数组进行反序列化就得到了 ValueState...ValueState 中存 Map,Flink 引擎会把整个 Map 当做一个大 Value,存储在 RocksDB 中。...时间戳字段也会保存到状态引擎中,之后查询数据时,就可以通过该时间戳判断数据是否过期。 ValueState 将 value 封装为 TtlValue。...MapState 的 TTL 是基于 UK 级别的 ValueState 的 TTL 是基于整个 key 的 扩展:其实 ListState 的数据映射到 RocksDB 比较复杂,用到了 RocksDB
Flink的数据模型不是基于键值对的。因此,不需要将数据集类型物理地打包到键和值中。键是“虚拟的”:它们被定义为实际数据之上的函数,以指导分组操作符。...过期数据的清理 默认情况下,过期数据会在读取的时候被删除,例如 ValueState#value,同时会有后台线程定期清理(如果 StateBackend 支持的话)。...在 RocksDB 压缩时清理 如果使用 RocksDB state backend,则会启用 Flink 为 RocksDB 定制的压缩过滤器。...RocksDB 会周期性的对数据进行合并压缩从而减少存储空间。 Flink 提供的 RocksDB 压缩过滤器会在压缩时过滤掉已经过期的状态数据。...RocksDB backend 的默认后台清理策略会每处理 1000 条数据进行一次。
博主自己在初学 Flink 时,也会被这些概念搞混,经过博主的整理之后认为,在 Flink 中关于状态、状态管理主要是有 3 个概念,能把这 3 个概念能分清楚,你就已经超越 95% 的实时数据开发同学了...这些状态后端就是实际存储上面的状态数据的。比如配置了 RocksDB 作为状态后端,MapState 的数据就会存储在 RocksDB 中。...是给小伙伴们实现特殊逻辑使用的,举例:在做 cp 时,可以从 ListState l 删除一些不要的数据,添加一些特殊的数据。...我们可以使用 add(value: T) 或 addAll(values: java.util.List[T]) 向状态中添加元素,使用 get(): java.lang.Iterable[T] 获取整个列表...博主有见过在 ValueState 中存储一个大 Map,并且使用 RocksDB,导致 State 访问非常慢(因为 RocksDB 访问 State 经过序列化),拖慢任务处理速度。
然后不做划分,直接交给用户; BroadcastState:如大表和小表做Join时,小表可以直接广播给大表的分区,在每个并发上的数据都是完全一致的。...RocksDB状态后端为每个存储值、列表条目或映射条目添加8个字节; 目前只支持与处理时间相关的TTLs; 如果试图使用启用TTL的描述符或使用启用TTL的描述符恢复先前在没有TTL的情况下配置的状态,...in full snapshot 默认情况下,过期值只有在显式读出时才会被删除,例如通过调用 ValueState.value() 方法。...(Time.seconds(1)) .cleanupInRocksdbCompactFilter .build RocksDB compaction filter将会从Flink每次处理完一定数据量的状态之后...、持续流入的,Flink 并不知道如何丢弃旧的数据。
Managed State表示数据结构由Flink runtime控制,例如内部哈希表或者RocksDB。例如,“ValueState”,“ListState”等等。...Flink的runtime层会编码State并将其写入checkpoint中。 Raw State是操作算子保存在它的数据结构中的state。...当进行checkpoint时,它只写入字节序列到checkpoint中。Flink并不知道状态的数据结构,并且只能看到raw字节。...FoldingState:它保存了一个聚合了所有添加到这个状态的值的结果。与ReducingState有些不同,聚合类型可能不同于添加到状态的元素的类型。...RocksDB状态后端会为每个存储的值(list entry或者map entry)增加8byte。 当前TTL仅仅支持处理时间。
3、ReducingState 这种状态通过用户传入的reduceFunction,每次调用add方法添加值时,会调用reduceFunction,最后合并到一个单一的状态值。...托管状态是由 Flink 框架管理的 State,如 ValueState,ListState,MapState 等,其序列化与反序列化由 Flink 框架提供支持,无序用户感知,干预。...以 MapState 为例,提供了添加、获取、删除、遍历的 API 接口 2、内部 State 接口 内部 State 接口是给 Flink 框架使用的,除了对 State 中数据的访问之外,还提供了内部的运行时信息接口...HeapKeyStateBackend 面向 Flink 引擎内部,使用者无感。 RocksDB:RocksDBStateBackend,适用于长周期大规模的数据。...适用嵌入式的本地数据库 RocksDB 将流计算数据状态存储在本地磁盘中,不会受限于 TaskManager 的内存大小,在执行检查点时,再将整个 RocksDB 中保存的 State 数据全量或者增量持久化到配置的文件系统中
Flink 应用运行中会保存状态信息到 State 对象实例中,State 对象实例通过 StateBackend 实现将相关数据存储到 FS 文件系统或者 RocksDB 数据库中。...并在 Flink 应用重启时加载checkpoint/savepoint 来实现状态的恢复,从而让 Flink 应用继续完成之前的数据计算,实现数据精确一次向下游传递。...状态存储在内存,并在做 cp(checkpoint)时存到远端。基于 RocksDB 的 RocksDBStateBackend。...将对象序列化成二进制存在内存和本地磁盘的 RocksDB 数据中,并在 cp 时存到远端。...然后在 Flink 应用重启时读取 State 状态数据,进行运行现场的还原。
的使用方法 对于Keyed State,Flink提供了几种现成的数据结构供我们使用,包括ValueState、ListState等,他们的继承关系如下图所示。...我们可以使用void add(T value)或void addAll(List values)向状态中添加元素,使用Iterable get()获取整个列表,使用void update(List...RocksDBStateBackend 这种方式下,本地状态存储在本地的RocksDB上。RocksDB是一种嵌入式Key-Value数据库,数据实际保存在本地磁盘上。...然而,每次从RocksDB中读写数据都需要进行序列化和反序列化,因此读写本地状态的成本更高。...快照执行时,Flink将存储于本地RocksDB的状态同步到远程的存储上,因此使用这种State Backend时,也要配置分布式存储的地址。
至此,初步结论是:window窗口中本应过期的数据没有释放。那么,再从程序中查看有valuestate的StateTtlConfig,但是却没有设置清除策略!...问题解决 ---- Flink的过期数据的清理。 1....默认情况下,过期数据会在读取的时候被删除,例如 ValueState#value,同时会有后台线程定期清理(如果 StateBackend 支持的话)。...在 RocksDB 压缩时清理 如果使用 RocksDB state backend,则会启用 Flink 为 RocksDB 定制的压缩过滤器。...RocksDB 会周期性的对数据进行合并压缩从而减少存储空间。Flink 提供的 RocksDB 压缩过滤器会在压缩时过滤掉已经过期的状态数据。
状态:状态就是用户在程序中使用的数据结构。比如 flink 中的 MapState,ValueState,ListState。...然后可以在 flink 任务 failover 时,从远程把状态数据恢复到 flink 任务中,保障数据质量。 状态后端:状态后端就是决定了以什么样数据结构,什么样的存储方式去存储和管理我们的状态。...flink 目前官方提供了 memory、filesystem,rocksdb 三种状态后端来存储我们的状态。...无论用户配置哪种状态后端(无论是 memory,filesystem,rocksdb),都是使用 DefaultOperatorStateBackend 来管理的,状态数据都存储在内存中。...用户在配置 rocksdb 时,会使用 RocksdbKeyedStateBackend 去管理状态;用户在配置 memory,filesystem 时,会使用 HeapKeyedStateBackend
Flink 的状态数据可以存在 JVM 的堆内存或者堆外内存中,当然也可以借助第三方存储,例如 Flink 已经实现的对 RocksDB 支持。...我们可以看一下 State 的类图,对于 Keyed State,Flink 提供了几种现成的数据结构供我们使用,State 主要有四种实现,分别为 ValueState、MapState、AppendingState...但是与 FsStateBackend 不同的是,RocksDBStateBackend 将正在运行中的状态数据保存在 RocksDB 数据库中,RocksDB 数据库默认将数据存储在 TaskManager...RocksDB 数据库中,吞吐量会有所下降。...当chckpoint成功时Flink负责提交这些写入,否则就终止取消掉它们。
在Flink去重第一弹:MapState去重中介绍了使用编码方式完成去重,但是这种方式开发周期比较长,我们可能需要针对不同的业务逻辑实现不同的编码,对于业务开发来说也需要熟悉Flink编码,也会增加相应的成本...就是一个计数器的作用,这两部分都是作为动态生成聚合函数的中间结果accumulator,透过之前的聚合函数的分析可知中间结果是存储在状态里面的,也就是容错并且具有一致性语义的 其处理流程是: 将devId 添加到对应的...DistinctAccumulator对象中,首先会判断map中是否存在该devId, 不存在则插入map中并且将对应value记1,并且返回True;存在则将对应的value+1更新到map中,并且返回False 只有当返回True时才会对...第二种: datatime+devId->row(0) 聚合函数中accumulator 是存储在ValueState中的,第二种方式的key会比第一种方式数量上多很多,但是其ValueState占用空间却小很多...,而在实际中我们通常会选择Rocksdb方式作为状态后端,rocksdb中value大小是有上限的,第一种方式很容易到达上限,那么使用第二种方式会更加合适; 这两种方式都是全量保存设备数据的,会消耗很大的存储空间
Flink的一个算子有多个子任务,每个子任务分布在不同实例上,我们可以把状态理解为某个算子子任务在其当前实例上的一个变量,变量记录了数据流的历史信息。当新数据流入时,我们可以结合历史信息来进行计算。...用户自己管理 状态数据结构 Flink提供的常用数据结构,如:ValueState、ListState、MapState...当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的 key。因此,具有相同 key 的所有数据都会访问相同的状态。...例如当消费 kafka 数据的 Kafka Source 并行度为 3 时,默认每个并行度都是从一个 Kafka 的 topic 的某个分区中消费数据,而每个 kafka Source 为了保证在极端情况下也不丢失数据...作业恢复或重新分配时,每个算子都将获得所有的状态数据。
Flink的一个算子有多个子任务,每个子任务分布在不同实例上,我们可以把状态理解为某个算子子任务在其当前实例上的一个变量,变量记录了数据流的历史信息。...用户自己管理 状态数据结构 Flink提供的常用数据结构,如:ValueState、ListState、MapState等。 Raw State只支持字节,任何上层数据结构需要序列化为字节数组。...当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的 key。因此,具有相同 key 的所有数据都会访问相同的状态。...作业恢复或重新分配时,每个算子都将获得所有的状态数据。...初始化RocksDB实例。 将key-groups从临时RocksDB转换到Base RocksDB数据库。
RocksDB StateBackend 概览和相关配置讨论 RocksDB 是 Facebook 开源的 LSM 的键值存储数据库,被广泛应用于大数据系统的单机组件中。...Flink 的 keyed state 本质上来说就是一个键值对,所以与 RocksDB 的数据模型是吻合的。...的并发数目很大时,很容易触发 job master 的内存超用问题。...来替代 ListState 或者 ValueState,因为RocksDB 的 map state 并不是将整个 map 作为 value 进行存储,而是将 map 中的一个条目作为键值对进行存储。...建议当检查点频繁因为超时而失败时,增大超时时间。
二、Flink状态编程 1、支持的状态类型 Flink根据数据集是否根据Key进行分区,将状态分为Keyed State和Operator State(Non-keyed State)两种类型。...RocksDB的对象存储,然后将这些状态数据通过内部的接口持久化到Checkpoints中,任务异常时可以通过这些状态数据恢复任务。...另外一种是原生状态(Raw State)形式,由算子自己管理数据结构,当触发Checkpoint过程中,Flink并不知道状态数据内部的数据结构,只是将数据转换成bytes数据存储在Checkpoints...中,当从Checkpoints恢复任务时,算子自己再反序列化出状态的数据结构。...在Flink中推荐用户使用Managed State管理状态数据,主要原因是Managed State能够更好地支持状态数据的重平衡以及更加完善的内存管理。
Flink状态编程 支持的状态类型 Flink根据数据集是否根据Key进行分区,将状态分为Keyed State和 Operator State(Non-keyed State) 两种类型。...RocksDB的对象存储,然后将这些状态数据通过内部的接口持久化到Checkpoints中,任务异常时可以通过这些状态数据恢复任务。...另外一种是原生状态(Raw State)形式,由算子自己管理数据结构,当触发Checkpoint过程中,Flink并不知道状态数据内部的数据结构,只是将数据转换成bytes数据存储在Checkpoints...中,当从Checkpoints恢复任务时,算子自己再反序列化出状态的数据结构。...在Flink中推荐用户使用Managed State管理状态数据,主要原因是Managed State能够更好地支持状态数据的重平衡以及更加完善的内存管理。
PDF 资料,关注下方公众号,在公众号后台添加博主微信后,私聊博主获取。...状态:本质来说就是数据,在 Flink 中,其实就是 Flink 提供给用户的状态编程接口。比如 flink 中的 MapState,ValueState,ListState。...然后可以在 Flink 任务 failover 时,从远程把状态数据恢复到 Flink 任务中,保障数据质量。...如果状态后端为 RocksDB,极其不建议在 ValueState 中存储一个大 Map,这种场景下序列化和反序列化的成本非常高,这种常见适合使用 MapState。...2 但是其实如果我们能对资源预估有一个成体系、有数据支撑的方案在向 Sre 要资源时是更有说服力的。
、持续流入的,Flink 并不知道如何丢弃旧的数据。...需要注意的是,旧版本 Flink 允许只指定一个参数,表示最早和最晚清理周期相同,但是这样可能会导致同一时间段有很多状态都到期,从而造成瞬间的处理压力。..., Collector out) throws Exception {} 方法,这个方法会被 Flink 的 InternalTimerService 所间接调用,从而当向 timerService...实现优化 Flink 的空闲状态清理 Timer 也有其不足之处,例如状态清理 Timer 本身就是 ValueState 对象,当 Timer 数目过多时,会对内存造成很大的压力,甚至导致作业的提前崩溃...针对这些问题,社区提出了将 Timer 保存到 RocksDB State Backend 的思路并进行了实现。
领取专属 10元无门槛券
手把手带您无忧上云