前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >三种State Backends | 你该用哪个?

三种State Backends | 你该用哪个?

作者头像
王知无-import_bigdata
发布2019-10-14 13:35:59
3.9K0
发布2019-10-14 13:35:59
举报

选择 State backend

Checkpoint 的存储的位置取决于配置的 State backend(JobManager 内存,文件系统,数据库...)。

默认情况下,State 存储在 TaskManager 内存中,Checkpoint 存储在 JobManager 内存中。Flink 支持在其他 state backend 中存储 State 和 Checkpoint。可以通过如下方法配置:StreamExecutionEnvironment.setStateBackend(…)

Flink 提供了不同的 State backend,支持不同的 State 存储方式和位置。默认会使用配置文件 flink-conf.yaml 指定的选项,也可以在每个作业中设置来覆盖默认选项:

代码语言:javascript
复制
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(...);

Flink 自带了以下几种开箱即用的 state backend:

  • MemoryStateBackend
  • FsStateBackend
  • RocksDBStateBackend

在没有配置的情况下,系统默认使用 MemoryStateBackend。

优劣比较

尽管有checkpoint保证exactly-once,但对于实时性要求高的业务场景,每次重启所消耗的时间都可能会导致业务不可用。也许你也经常遇到这样的情况,checkpoint又失败了?连续失败?task manager 内存爆了? 这些情况都很容易导致Flink任务down了,这时候需要思考下你所处的业务场景下,选用的Flink State Backends是否合理? 我们翻译一下上面图中对三种状态后端的介绍:

  • MemoryStateBackend 默认,小状态,本地调试使用
  • FsStateBackend 大状态,长窗口,高可用场景
  • RocksDBStateBackend 超大状态,长窗口,高可用场景,可增量checkpoint

MemoryStateBackend

MemoryStateBackend将内部的数据保存在Java堆上。Key/value状态和窗口操作符持有存储值,触发器等的哈希表。 当进行checkpoint时,这个状态后端会对当前的状态进行快照,并且将其作为checkpoint ACK消息的一部分发送给JobManager(master),该JobManager将其存储在它的堆上。

我们来看下MemoryStateBackend的四个构造函数:

代码语言:javascript
复制
public MemoryStateBackend() {
        this(5242880);
    }
public MemoryStateBackend(int maxStateSize) {
    this(maxStateSize, true);
}
public MemoryStateBackend(boolean asynchronousSnapshots) {
    this(5242880, asynchronousSnapshots);
}
public MemoryStateBackend(int maxStateSize, boolean asynchronousSnapshots) {
    this.maxStateSize = maxStateSize;
    this.asynchronousSnapshots = asynchronousSnapshots;
}

MemoryStateBackend可以配置使用异步快照的方式。虽然我们强烈鼓励使用异步快照的方式来避免管道阻塞,但是请注意,这个是一个新特性,目前默认情况下不启用。为了启用这个状态,用户可以在初始化 MemoryStateBackend 时将构造函数中相应的布尔标识设为 true,例如:

代码语言:javascript
复制
new MemoryStateBackend(MAX_MEM_STATE_SIZE, true);

MemoryStateBackend的局限性:

  • 单个状态的大小默认情况下最大为5MB。这个值可以通过MemoryStateBackend构造函数进行增加。
  • 无论配置的最大状态大小为多少,状态的大小不能超过akka帧大小
  • 聚合的状态必须在JobManager的内存中能存放

MemoryStateBackend适用于:

  • 本地开发和调试
  • 只有很小状态的作业,例如作业只由record-at-a-time函数组成(Map,FlatMap,Filter,...)。Kafka消费者只需要非常小的状态。

FsStateBackend

FsStateBackend需要配置存储的文件系统,可以是hdfs路径:

代码语言:javascript
复制
hdfs://namenode:40010/flink/checkpoints

也可以是文件系统路径:

代码语言:javascript
复制
file:///data/flink/checkpoints

FsStateBackend将流计算数据状态存储在TaskManager的内存中,在数据流遇到检查点屏障时,再将数据快照存储在配置好的文件系统中,在JobManager内存中会存储少量的checkpoint元数据。

FsStateBackend的三种构造函数:

代码语言:javascript
复制
public FsStateBackend(String checkpointDataUri, boolean asynchronousSnapshots) throws IOException {
    this(new Path(checkpointDataUri), asynchronousSnapshots);
}

public FsStateBackend(Path checkpointDataUri, boolean asynchronousSnapshots) throws IOException {
    this(checkpointDataUri.toUri(), asynchronousSnapshots);
}

//fileStateSizeThreshold默认1024
public FsStateBackend(URI checkpointDataUri, int fileStateSizeThreshold, boolean asynchronousSnapshots) throws IOException {
    Preconditions.checkArgument(fileStateSizeThreshold >= 0, "The threshold for file state size must be zero or larger.");
    Preconditions.checkArgument(fileStateSizeThreshold <= 1048576, "The threshold for file state size cannot be larger than %s", new Object[]{1048576});
    this.fileStateThreshold = fileStateSizeThreshold;
    this.basePath = validateAndNormalizeUri(checkpointDataUri);
    this.asynchronousSnapshots = asynchronousSnapshots;
}

FsStateBackend默认使用异步快照,对每个快照文件大小有要求:[0, 1048576],且状态快照大小不能超过 TaskManager 的内存。但状态快照最终保存在文件系统中,所以FsStateBackend适用于大数据的生产环境,可处理长窗口,大状态或大key-value状态任务。

当选择使用 FsStateBackend时,正在进行的数据会被存在TaskManager的内存中。在checkpoint时,此后端会将状态快照写入配置的文件系统和目录的文件中,同时会在JobManager的内存中(在高可用场景下会存在 Zookeeper 中)存储极少的元数据。容量限制上,单 TaskManager 上 State 总量不超过它的内存,总大小不超过配置的文件系统容量。

默认情况下,FsStateBackend 配置成提供异步快照,以避免在状态 checkpoint 时阻塞数据流的处理。该特性可以实例化 FsStateBackend 时传入false的布尔标志来禁用掉,例如:new FsStateBackend(path, false)

推荐使用的场景:

  • 处理大状态,长窗口,或大键值状态的有状态处理任务, 例如分钟级窗口聚合或 join。
  • 适合用于高可用方案(需要开启HA的作业)。
  • 可以在生产环境中使用

RocksDBStateBackend

RocksDBStateBackend 使用文件系统URL(类型,地址,路径),例如

代码语言:javascript
复制
'hdfs://namenode:40010/flink/checkpoints

代码语言:javascript
复制
file:///data/flink/checkpoints

RocksDBStateBackend将in-flight数据存储在RocksDB数据库中,它(默认)存储在TaskManager的data目录下。当checkpoint时,整个RocksDB数据库将被checkpoint到配置的文件系统和目录下。最小的元数据存储在JobManager的内存中(或者,在高可用模式下,在元数据checkpoint中)。

RocksDB 是一种嵌入式的本地数据库。RocksDBStateBackend 将处理中的数据使用 RocksDB 存储在本地磁盘上。在 checkpoint 时,整个 RocksDB 数据库会被存储到配置的文件系统中,或者在超大状态作业时可以将增量的数据存储到配置的文件系统中。同时 Flink 会将极少的元数据存储在 JobManager 的内存中,或者在 Zookeeper 中(对于高可用的情况)。RocksDB 默认也是配置成异步快照的模式。

RocksDB是一个 key/value 的内存存储系统,和其他的 key/value 一样,先将状态放到内存中,如果内存快满时,则写入到磁盘中,但需要注意RocksDB不支持同步的 Checkpoint,构造方法中没有同步快照这个选项。不过RocksDB支持增量的 Checkpoint,也是目前唯一增量 Checkpoint 的 Backend,意味着并不需要把所有 sst 文件上传到 Checkpoint 目录,仅需要上传新生成的 sst 文件即可。它的 Checkpoint 存储在外部文件系统(本地或HDFS),其容量限制只要单个 TaskManager 上 State 总量不超过它的内存+磁盘,单Key最大2G,总大小不超过配置的文件系统容量即可。

RocksDBStateBackend局限性:

RocksDB支持的单key和单value的大小最大为每个 2^31 字节。这是因为 RocksDB 的 JNI API 是基于byte[]的。

对于使用具有合并操作的状态的应用程序,例如 ListState,随着时间可能会累积到超过 2^31 字节大小,这将会导致在接下来的查询中失败。

RocksDBStateBackend推荐使用的场景:

  • 最适合用于处理大状态,长窗口,或大键值状态的有状态处理任务。
  • 非常适合用于高可用方案。
  • 最好是对状态读写性能要求不高的作业

RocksDBStateBackend是目前唯一提供增量checkpoint的状态后端。

如何使用状态后端

不同 State backend 吞吐量对比

  • 使用 FileSystem 和 Memory 的吞吐差异不大(都是使用堆内存管理处理中的数据),使用 RocksDB 的吞吐差距明显。
  • Standalone 和 on Yarn 的总体差异不大,使用 FileSystem 和 Memory 时 on Yarn 模式下吞吐稍高,相反的使用 RocksDB 时 Standalone 模式下的吞吐稍高。

不同 State backend 延迟对比

  • 使用 FileSystem 和 Memory 时延迟基本一致且较低。
  • 使用 RocksDB 时延迟稍高,且由于吞吐较低,在达到吞吐瓶颈附近延迟陡增。其中 on Yarn 模式下吞吐更低,延迟变化更加明显。

State backend 的选择

StateBackend

in-flight

checkpoint

吞吐

推荐使用场景

MemoryStateBackend

TM Memory

JM Memory

调试、无状态或对数据丢失或重复无要求

FsStateBackend

TM Memory

FS/HDFS

普通状态、窗口、KV 结构

RocksDBStateBackend

RocksDB on TM

FS/HDFS

超大状态、超长窗口、大型 KV 结构

如果您不做任何指定,默认的状态后端是JobManager。如果你希望为你的集群中的所有作业创建一个非默认的状态后端,你可以通过在flink-conf.yaml中指定一个新的默认后端。默认的状态后端可以在每个作业的基础上进行覆盖,如下所示。

设置一个作业级的状态后端

作业的状态后端通过作业中的 StreamExecutionEnvironment进行设置,如下述示例所示:

代码语言:javascript
复制
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));

设置默认状态后端

默认状态后端可以通过在 flink-conf.yaml 中设置state.backend值指定。

可能的配置项是:jobmanager (MemoryStateBackend), filesystem (FsStateBackend), rocksdb (RocksDBStateBackend),或者实现了状态后端工厂FsStateBackendFactory的类的完全限定类名,例如,为RocksDBStateBackend设置为org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory

配置文件中的示例部分如下所示:

代码语言:javascript
复制
# The backend that will be used to store operator state checkpoints
state.backend: filesystem
# Directory for storing checkpoints
state.backend.fs.checkpointdir: hdfs://namenode:40010/flink/checkpoints
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-10-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据技术与架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 选择 State backend
  • 优劣比较
    • MemoryStateBackend
      • FsStateBackend
        • RocksDBStateBackend
          • RocksDBStateBackend局限性:
          • RocksDBStateBackend推荐使用的场景:
      • 如何使用状态后端
        • 不同 State backend 吞吐量对比
          • 不同 State backend 延迟对比
            • State backend 的选择
              • 设置一个作业级的状态后端
              • 设置默认状态后端
          相关产品与服务
          对象存储
          对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档