首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Flink的Checkpoint机制详解

所有的数据处理工具都面临数据高可靠、高可用的问题,一旦服务发生问题,如何保证数据不会丢失?

高可靠解决方案

MySQL用BinLog来解决这个问题,它把每一步事务操作都记录下来,一旦发生问题,可以追踪binlog找到每一步的操作记录。MySQL还会提供快照、备份的功能。

HDFS通过多副本和ZooKeeper的选举机制来解决这个问题,它会把收到的每一份数据存成N个副本,当发生故障的时候,通过ZooKeeper来确定最新的副本数据。另外,HDFS也提供快照SnapShot的功能。

storm里面是通过ack和Trident搞定。

Flink的Checkpoint机制

MySQL的思想很容易理解,就像棋谱一样,把每一步都记录下来。后人读棋谱,可以随时切换到任意一张棋谱,然后跟着每一步的操作重现当时的情景。

HDFS的思想也比较好理解,怕丢数据,就存成N份。只要写进去最少副本数,就自动会把所有旧副本都覆盖了,最大程度的保存好数据。而且他们都属于离线数据库,随时可以存一个快照。

但是Flink不一样啊,MySQL和HDFS都是离线存储,Flink是在线的,是一个数据流呀,不能停啊!也不能把数据流做一个快照啊,那咋弄?

其实现实世界就有这种场景:

顾客源源不断的往收银台上的传送带上放物品,收银员负责扫码、计算、收钱。前面那个顾客和后面的顾客的东西都放在一起,怎么区分?你看Flink的场景跟超市购物是不是一样一样的?

最简单的方式是:

顾客1放商品到收银台

收银员把顾客1的东西陆续扫完,并结账

清台

顾客2放商品上去,重复步骤2、3

但是这样也太慢了点吧!这时间浪费的太多了!

于是就有了这个:下图中的“欢迎光临”:

在每个顾客之间,放一个“欢迎光临”,隔断一下就行了。“欢迎光临”之前的商品该扫码扫码,该结账结账。等看到“欢迎光临”了,就相当于看到Checkpoint的标志了,把小票数据上报系统。

在Flink中就是这样:

第1步:由Job Manager初始化Checkpoint,在数据源之后放一个barrier“欢迎光临”,以此为隔断。在“欢迎光临”下游的数据,照常处理。

第二步:在“欢迎光临”下游的所有数据都处理完毕之后,我们就可以获取到几个信息:CheckPoint的source、数据源的offset和最终计算的Result。然后我们把这几个数据存到state里面就好了。这样就即能搞定Checkpoint的记录,又不耽误流式数据的处理了。一旦任务发生故障,重启任务,到State中读取所有任务元数据,重来一遍就好了。

当然,上面只是并行度为1的情况,这两个图可以画的更复杂一些,并行度为2的情况,原理也是一样一样的:

第一步:发起Checkpoint:

第二步:将所有barrier下游的数据都计算完,并将source、offset等数据上报至State,存好。

当然,再复杂一些,就会遇到Checkpoint的时间过长的问题了。短时间内,Flink会把该barrier后的数据暂时缓存下来,等Checkpoint完成之后再进行计算。另外,还会启动Checkpoint超时时间,超过这么长时间没完成,该Checkpoint将被丢弃,保证Flink的通畅。

Checkpoint的参数配置

//默认checkpoint功能是disabled的,使用

StreamExecutionEnvironment.enableCheckpointing方法来设置开启

checkpoint StreamExecutionEnvironment env =

StreamExecutionEnvironment.getExecutionEnvironment();

// 每隔1秒进行启动一个Checkpointing【设置checkpoint的周期,建议不要太短,否则前一个checkpoint未完成,后面的又要启动】

env.enableCheckpointing(1000);

// 设置数据消费语义为exactly-once严格一次 【数据消费语义:严格一次,默认且推荐】

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔,可以适当放大】

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

// 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间,建议结合资源和占用情况,可以适当加大。时间短可能存在无法成功的情况】

env.getCheckpointConfig().setCheckpointTimeout(60000);

// 同一时间只允许进行几个检查点,一般1个就够。

env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

总结

无论是结构化数据库还是分布式数据库,无论是实时还是离线,数据的高可靠和高可用都是必须要解决的问题。单点故障就用主从、分布式解决,防止任务故障,就用各种log、快照、checkpoint解决。

一个新技术的出现,总是会遇到各种问题,但也同样会有高手来解决问题。我要赞美这些聪明的脑袋,是怎么想出这么奇妙的解决方案的?

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20210317A013YU00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券