前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >面试被问到Flink的checkpoint问题,给问懵逼了....

面试被问到Flink的checkpoint问题,给问懵逼了....

作者头像
木野归郎
发布2020-07-02 14:46:11
9020
发布2020-07-02 14:46:11
举报
文章被收录于专栏:share ai happinessshare ai happiness

Checkpoint 机制

1.什么是 checkpoint

简单地说就是 Flink 为了达到容错和 exactly-once 语义的功能,定期把 state 持久化下来,这个过程就叫做 checkpoint ,它是 Flink Job 在某一时刻全局状态的快照。 如果要实现一个分布式系统的全局状态保留功能时,按照传统方案会我们引入一个统一时钟,并且通过分布式系统中的 master 节点对每个slaves节点进行广播,当这些节点接收到这个统一时钟时,它们就记录下自己当前的状态。

这种广播时钟的方式也存在此 slave 和其它的机器出现数据不一致而最终导致脑裂的情况问题:

1. 某一个 node 进行的 GC 时间比较长

2. master 与 slaves 因为网络而造成的发送延迟或者发送失败

如果想要对这些问题进行解决,就需要对 master 和 slaves 做一个 HA(High Availability)即高可用。

这就又存在一个问题,如果一个系统越是复杂,就会导致系统越不稳定且维护成本越高。

checkpoint 在Flink中都放进了一个名为 Barrier 的流。

这里举一个Barrier 的栗子,看上图,从上游的第一个 Task 到下游的一Task,每次当 Task 经过图中蓝色的栅栏时,就会触发 save snapshot(快照)的功能。

为了更加形象的说明,这里再举一个栗子。

2.实例分析

上图是一个简单的 ETL 过程:

1. 首先我们把数据从 Kafka 中抽过来

2. 进行一个 trans 的转换操作

3. 然后再发送到一个下游的 Kafka 这个例子没有涉及到对 chaining 的调优。所以此时采用的是 forward strategy ,也就是 “一个 task 的输出只发送给一个 task 作为输入”。这种方式有一个好处就是当两个 task 在一个 JVM 中的话,就可以避免不必要的网络开销。

当设置 Parallism 为 2,此时的 DAG 图如下:

■ CK的分析过程

每一个 Flink 作业都会产生一个 JobManager ,JobManager 里面又会有一个 checkpoint coordinator 来管理整个 checkpoint 的过程,我们可以设置一个时间间隔让 checkpoint coordinator 将一个 checkpoint 的事件发送给每一个 Container 中的 source task,也就是第一个任务(对应并行图中的 task1,task2)。 当某个 Source 算子收到一个 Barrier 时,它会暂停自身的数据处理,然后将自己的当前 state 制作成 snapshot(快照),并保存到指定的持久化存储中,最后向 CheckpointCoordinator 异步发送一个 ack(Acknowledge character --- 确认字符),同时向自身所有下游算子广播该 Barrier 后恢复自身的数据处理。 每个算子按照上面不断制作 snapshot 并向下游广播,直到最后 Barrier 传递到 sink 算子,此时快照便制作完成。这时候需要注意的是,上游算子可能是多个数据源,对应多个 Barrier 需要全部到齐才一次性触发 checkpoint ,所以在遇到 checkpoint 时间较长的情况时,有可能是因为数据对齐需要耗费的时间比较长所造成的。 ■ Snapshot & Recover

如上图,Container容器初始化,从kafka中抽取e1和e2两个数据,同时,CheckpointCoordinator 也往它发送了 Barrier。

此时 Task1 完成了它的 checkpoint 过程,记录下 offset 为2(e1,e2),然后把 Barrier 往下游的算子广播,Task3 的输入为 Task1 的输出,现在假设我的这个程序的功能是统计数据的条数,此时 Task3 的 checkpoint 效果就是就记录数据数为2(因为从 Task1 过来的数据就是 e1 和 e2 两条),之后再将 Barrier 往下广播,当此 Barrier 传递到 sink 算子,snapshot 就算是制作完成了。

此时 source 中还会源源不断的产生数据,并产生新的 checkpoint ,但是此时如果 Container 宕机重启就需要进行数据的恢复了。刚刚完成的 checkpoint 中 offset为2,count为2,那我们就按照这个 state 进行恢复。此时 Task1 会从 e3 开始消费,这就是 Recover 操作。

■ checkpoint 的注意事项 下面列举的3个注意要点都会影响到系统的吞吐,在实际开发过程中需要注意:

3.背压的产生及 Flink 的反压处理 抛出问题:

在分布式系统中经常会出现多个 Task 多个 JVM 之间可能需要做数据的交换。

这里我们使用生产者和消费者来说明这个事情。

解决方式一:

如果生产者端使用无界的buffer,当我们的生产者生产速度远大于消费者消费的速度时,生产者端的数据会因为消费端的消费能力低下而导致数据积压,最终导致 OOM 的产生。

解决方式二:

当把 buffer改成有界,消费端的消费能力不进行提高,当 有界的buffer 被生产者的数据积满时,生产者就会停止生产。

反思问题:

上面的解决方式还不能完全地解决我们的问题,所以就需要根据不同的情况进行调整。 Flink 也是按照上面的原理,通过有界 buffer 来进行不同 TaskManager 的数据交换。分为了 静态控流 和 动态控流 两种方式。

什么是反压-静态流控,简单来说,就是当生产者比消费者的 TPS 多时,此时我们采用溢写的方式,使用 batch 封装好我们的数据按批次进行发送,每次发送完成后再 sleep 一段时间,这个时间的计算方式是 left(剩余的数据)/ tps,但是这个做法是很难去预估系统的情况的。

Flink 1.5 之前的流控是基于 TCP 的滑动窗口实现的。而 Flink 在1.5之后已经弃用了该机制。

数据生产的一端只能通过检查当前的 channel 是否可写来决定是否要向消费端发送数据,它对下游数据消费端的真实容量情况一概不知。这就会导致,当生产者一端发现 channel 不可写的时候,下游消费节点可能已经积压了很多数据。 Credit-Based 我们用下面的数据交换的例子说明: Flink 的数据交换大致分为三种,一种是同一个 Task 的数据交换,另一种是 不同 Task 同 JVM 下的数据交换。第三种就是不同 Task 且不同 JVM 之间的交换。

第一种方式:forward strategy 方式,就是同一个 Task 的数据交换,主要就是为了避免了序列化和网络的开销,造成不必要的资源浪费。

第二种数据交换的方式就是数据会先通过一个 record Writer进行序列化,传递给 Result Partition ,之后数据会通过 local channel 传递给另外一个 Task 的 Input Gate 里面进行反序列化,然后推送给 Record Reader 之后进行操作。

第三种数据交换涉及到了不同的 JVM,所以会有一定的网络开销,和第二种的区别就是通过netty把数据推送到远程端的 Task 上。 ■ Credit-Based

我们可以看到backlog 的作用其实只是为了让消费端感知到我们生产端的情况, event1 连带一个 backlog = 1 推送给了 TaskB。

event1 被 TaskB 接收后,TaskB会返回一个 ack 给 TaskA,同时返回一个credit = 3,这个是告知 TaskA 它还能接收多少条数据。

Flink 就是通过这种互相告知的方式,来让生产者和消费者都能感知到对方的状态。

此时经过一段时间之后,TaskB中的有界 buffer 已经满了,此时 TaskB回复 credit = 0 给 TaskA,此时 channel 通道将会停止工作,TaskA 不再将数据发往 TaskB。

此时再经过一段时间,TaskA 中的有界 Buffer 也已经出现了数据积压,所以我们平时遇到的吞吐下降,处理延迟的问题,就是因为此时整个系统相当于一个停滞的状态,如图二示,所有的过程都被打上 “X”,表示这些过程都已经停止工作。

JVM 是一个非常复杂的系统,当其内存不足时会造成 OOM ,进而导致系统的崩溃。

Flink 在拿到我们分配的内存之后会先分配一个 cut off 预留内存,从而保证系统的安全性。

Netword buffers 其实就是之前所提到的有界 buffer,momery manager 是一个内存池,这部分的内存可以设置为堆内或者堆外的内存,当然在流式作业中我们一般设置其为堆外内存,而 Free 部分就是提供给用户使用的内存块。

下面我们做一个计算: 假设分配给此 TaskManager 的内存是 8g。

1. 首先是要砍掉 cut off 的部分,默认是0.25,所以我们的可用内存就是 8gx0.75。

2. network buffers 占用可用内存的 0.1 ,所以是 6144x0.1。

3. 堆内/堆外内存为可用内存减去 network buffers 的部分,再乘以 0.8

所以给到用户使用的内存就是堆内存剩下的 0.2 那部分。

其实真实情况是 Flink 是先知道了 heap 内存的大小然后逆推出其它内存的大小。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-06-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 OnlyCoding 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档