前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink1.4 状态终端

Flink1.4 状态终端

作者头像
smartsi
发布2019-08-07 11:50:09
6960
发布2019-08-07 11:50:09
举报
文章被收录于专栏:SmartSiSmartSi

1. 概述

Flink 提供了不同的状态终端,可以指定状态的存储方式和位置。

状态可以存储在Java的堆内或堆外。根据你的状态终端,Flink 也可以管理应用程序的状态,这意味着 Flink 可以处理内存管理(可能会溢出到磁盘,如果有必要),以允许应用程序存储非常大的状态。默认情况下,配置文件 flink-conf.yaml 为所有Flink作业决定其状态终端。

但是,默认的状态终端配置也可以被每个作业的配置覆盖,如下所示。

Java版本:

代码语言:javascript
复制
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(...);

Scala版本:

代码语言:javascript
复制
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setStateBackend(...)

2. 可用的状态终端

开箱即用,Flink 内置了如下状态终端:

  • MemoryStateBackend
  • FsStateBackend
  • RocksDBStateBackend

如果没有配置,系统默认使用MemoryStateBackend

2.1 MemoryStateBackend

MemoryStateBackend 将数据以对象的形式保存在 Java 堆上。键值对状态和窗口算子拥有保存值,触发器等的哈希表。

在进行检查点操作时,状态终端对状态进行快照,并将其作为检查点确认消息的一部分发送给 JobManager(master),并将存储在其堆上。

MemoryStateBackend 可以配置为使用异步快照。尽管我们强烈建议使用异步快照来避免阻塞管道,但请注意,这是一项新功能,目前默认情况下不会启用。要启用此功能,用户可以在实例化 MemoryStateBackend的构造函数中设置相应的布尔值 true,例如:

代码语言:javascript
复制
new MemoryStateBackend(MAX_MEM_STATE_SIZE, true);

MemoryStateBackend 的使用限制:

  • 每个单独状态的大小默认限制为5 MB。这个值可以在 MemoryStateBackend 的构造函数中增加。
  • 不考虑配置的最大状态大小,状态不能大于akka frame大小。
  • 聚合状态必须能够放进 JobManager 内存中。

MemoryStateBackend 适用场景:

  • 本地开发和调试
  • 只存储较小状态的作业,例如只包含 record-at-a-time 函数的作业(MapFlatMapFilter,…)。 Kafka消费者只需要很少的状态。
2.2 FsStateBackend

FsStateBackend 使用文件系统URL(类型,地址,路径)进行配置,如 hdfs://namenode:40010/flink/checkpointsfile:///data/flink/checkpoints

FsStateBackend 将正在使用的数据保存在 TaskManager 的内存中。在进行检查点操作时,将状态快照写入配置的文件系统文件和目录中。较小的元数据存储在 JobManager 的内存中(或者在高可用性模式下,存储在元数据检查点中)。

FsStateBackend 默认使用异步快照,以避免在写入状态检查点时阻塞处理管道。如果要禁用此功能,用户可以在实例化 FsStateBackend 的构造函数中将对应的布尔值设置为 false,例如:

代码语言:javascript
复制
new FsStateBackend(path,false);

FsStateBackend 适用场景:

  • 具有大状态,长窗口,大的键/值状态的作业。
  • 所有高可用配置。
2.3 RocksDBStateBackend

RocksDBStateBackend 使用文件系统URL(类型,地址,路径)进行配置,例如 hdfs://namenode:40010/flink/checkpointsfile:///data/flink/checkpoints

RocksDBStateBackend 将 正在使用的数据保存在 RocksDB 数据库中,其位于 TaskManager 数据目录下(默认情况下)。进行检查点操作时,整个 RocksDB 数据库进行检查点操作存储到配置的文件系统和目录中。较小的元数据存储在 JobManager 的内存中(或者在高可用性模式下,存储在元数据检查点中)。

RocksDBStateBackend 总是执行异步快照。

RocksDBStateBackend 使用限制:

  • 由于 RocksDBJNI桥接API基于 byte [],每个键和每个值支持的最大大小为 2^31 个字节。重要的是在 RocksDB 中使用合并操作的状态(例如ListState)可以累积超过2^31字节,然后在下一次检索时会失败。目前这是 RocksDB JNI 的限制。

RocksDBStateBackend 适用场景:

  • 具有非常大的状态,长时间窗口,大键/值状态的作业。
  • 所有高可用配置。

请注意,你可以保存的状态数量仅受可用磁盘空间的限制。与保存状态到内存的 FsStateBackend 相比,这可以保存非常大的状态。但是,这也意味着在这个状态终端下可以达到的最大吞吐量将会降低。

RocksDBStateBackend 是目前唯一个提供增量检查点的终端(见这里)。

3. 配置状态终端

如果你不指定,默认的状态终端是 jobmanager。如果你希望为集群中的所有作业建立不同的默认值,可以在 flink-conf.yaml 中定义一个新的默认状态终端来完成。默认的状态终端可以被每个作业的配置覆盖,如下所示。

3.1 设置每个作业的状态终端

作业状态终端在作业的 StreamExecutionEnvironment 上设置,如下例所示:

Java版本:

代码语言:javascript
复制
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));

Scala版本:

代码语言:javascript
复制
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"))
3.2 设置默认状态终端

可以使用配置键 state.backendflink-conf.yaml 配置文件中配置默认状态终端。

配置的值可以是 jobmanagerMemoryStateBackend),filesystemFsStateBackend),rocksdbRocksDBStateBackend),或实现状态终端工厂 FsStateBackendFactory 类的全限定类名,例如 RocksDBStateBackendorg.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory

如果默认状态终端设置为 filesystemstate.backend.fs.checkpointdir 定义了检查点数据存储目录。

配置文件中的示例部分可能如下所示:

代码语言:javascript
复制
# The backend that will be used to store operator state checkpoints

state.backend: filesystem

# Directory for storing checkpoints

state.backend.fs.checkpointdir: hdfs://namenode:40010/flink/checkpoints

备注: Flink版本:1.4

原文:https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/state_backends.html

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018-01-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 概述
  • 2. 可用的状态终端
    • 2.1 MemoryStateBackend
      • 2.2 FsStateBackend
        • 2.3 RocksDBStateBackend
        • 3. 配置状态终端
          • 3.1 设置每个作业的状态终端
            • 3.2 设置默认状态终端
            相关产品与服务
            对象存储
            对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档