前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink-Kafka 连接器及exactly-once 语义保证

Flink-Kafka 连接器及exactly-once 语义保证

作者头像
kk大数据
发布2019-12-18 16:23:23
1.5K0
发布2019-12-18 16:23:23
举报
文章被收录于专栏:kk大数据kk大数据

Flink Source & Sink

在 Flink 中,Source 代表从外部获取数据源,Transfromation 代表了对数据进行转换操作,Sink 代表将内部数据写到外部数据源

一个 Flink Job 一般由 Source,Transformation,Sink 组成

Flink 提供了 Kafka Connector 用于消费/生产 Apache Kafka Topic 的数据。

Flink 的 kafka consumer 集成了 checkpoint 机制以提供精确一次的处理语义

在具体的实现过程中,Flink 不依赖于 kafka 内置的消费组位移管理,而是在内部自行记录和维护 consumer 的位移。

基本使用方式

val env = StreamExecutionEnvironment.getExecutionEnvironment
  val prop:Properties = new Properties
  prop.put("bootstrap.servers", "192.168.17.26:9092,192.168.17.27:9092,192.168.17.28:9092")
  prop.put("group.id","Flink_kafka_test_5")
  prop.put("enable.auto.commit","true")
  prop.put("auto.commit.interval.ms","100")
  val consumer = new FlinkKafkaConsumer010[String]("flink-test",new SimpleStringSchema,prop)
  env.addSource(consumer)
    .print()
  env.execute("job_name")

设置起始 offset

默认情况,从 group offset 开始读,即从消费者组(group.id)提交到 kafka broker 上的位移开始读取分区数据(对于老版本而言,位移是提交到 zookeeper 上)。

如果未找到位移,使用 auto.offset.reset 属性值来决定位移。该属性默认是 largest,即从最新的消息位移处开始消费。

consumer.setStartFromGroupOffsets()

设置从最早的 offset 开始读

consumer.setStartFromEarliest()

设置从最新的 offset 开始读

consumer.setStartFromLatest()

设置从指定的 offset 开始读(注意,这里的位移记录的是下一条待消费消息的位移,而不是最新的已消费消息的位移)

 val offsets = new util.HashMap[KafkaTopicPartition, java.lang.Long]()
 offsets.put(new KafkaTopicPartition("flink-test", 1),600L)
 offsets.put(new KafkaTopicPartition("flink-test", 2),700L)
 consumer.setStartFromSpecificOffsets(offsets)

当任务从失败中恢复,或者手动的从 savepoint 恢复时,上述的这些设置位移的方法是不生效的。在恢复时,每个 kafka 分区的起始位移都是由保存在 savepoint 或者 checkpoint 中的位移来决定的

DeserializationSchema 反序列化

如何将从 kafka 中获取的字节流转换为 java Object ?Flink 提供了 DeserializationSchema 接口允许用户自己自定义这个序列化的实现。

该接口的

T deserialize(byte[] message) throws IOException

方法 会在收到每一条 kafka 消息的时候被调用

为了方便使用,Flink 提供了一些反序列化的默认实现:

(1)SimpleStringSchema,可以将消息反序列化成字符串,使用方法:

val consumer = new FlinkKafkaConsumer010[String]("flink-test",new SimpleStringSchema,prop)

(2)JSONKeyValueDeserializationSchema,使用 jackson 将消息反序列化成 ObjectNode,并且构造函数中可以指定需不需要返回 metadata,metadata 包括 topic,offset,partition 信息

val consumer = new FlinkKafkaConsumer010[ObjectNode]("flink-test", new JSONKeyValueDeserializationSchema(true), prop)
  env.addSource(consumer)
    .map(f => (f.get("value"), f.get("metadata")))
    .print()

(3)JsonNodeDeserializationSchema,使用 jackson 将消息反序列化成 ObjectNode

val consumer = new FlinkKafkaConsumer010[ObjectNode]("flink-test", new JsonNodeDeserializationSchema, prop)

自动发现 kafka 新增的分区

在上游数据量猛增的时候,可能会选择给 kafka 新增 partition 以增加吞吐量,那么 Flink 这段如果不配置的话,就会永远读取不到 kafka 新增的分区了

prop.put("flink.partition-discovery.interval-millis", "30000")

表示每30秒自动发现 kafka 新增的分区信息

Flink的容错机制

当 Flink 开启了 checkpoint 的时候,Flink 会一边消费 topic 的数据,一边定时的将 offset 和 其他 operator 的状态记录到 checkpoint 中。

如果遇到了 job 失败的情况,那么 Flink 将会重启 job,从最后一个 checkpoint 中来恢复 job 的所有状态,然后从 checkpoint 中记录的 offset 开始重新对 topic 进行消费。

Flink 如何保证端到端的 exacly-once 语义

Flink 基于异步轻量级的分布式快照技术提供 Checkpoint 容错机制。

Flink 分布式快照的核心概念之一就是数据栅栏(barrier)。

Barrier 在数据源端插入,和数据流一起向下流动,(Barrier不会干扰正常的数据,数据流严格有序)

当 snapshot n 的 barrier 插入后,系统会记录当前 snapshot 位置值 n (用 Sn 表示),在 apache kafka 中,这个变量表示某个分区最后一次消费的偏移量。

这个位置值 Sn 会被发送到一个称为 checkpoint coordinate 模块(即 Flink 的 JobManager)。

barrier 插入后,随着数据一起向下游流动,从一个 operator 到 另一个 operator。

当一个 operator 从其输入流接收到所有标识 snapshot n 的barrier 时,它会向其所有输出流继续插入一个 标识 snapshot n 的 barrier。

当 sink operator (DAG 流的终点)从其输入流接收到所有 barrier n 时,它向 checkpoint coordinate 确认 snapshot n 完成。当所有 sink 都确认了这个快照,快照就被标识为完成。

有一个特性是,某个operator 只要一接收到 某个输入流的 barrier n,它就不能继续处理此数据流后续的数据,后续的数据会被放入到接收缓存(input buffer)中(如上图红框标识的缓存区)。

只有当 operator 从最后一个流中提取到 barrier n 时,operator 才会继续发射出所有等待向后发送的数据,然后发送 snapshot n 所属的 barrier。

那么如何保证 exactly-once 语义的?

假设现在 barrier 现在在 source 和 map 之间,任务挂掉了。下一次 Flink 会自动的重启任务,从上一次的快照中恢复。

会从 kafka 的上一次消费的地方开始消费。由于上一次 sink 还未接收到 所有的 barrier 就挂掉了,上一次的数据都被缓存在 input buffer 中,还未到 sink 中处理,这一次重新消费的记录会被sink继续处理。也就是没有多消费一条记录,也没有少消费一条记录。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-12-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 KK架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
文件存储
文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档