前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >flink exectly-once系列之两阶段提交实现分析

flink exectly-once系列之两阶段提交实现分析

作者头像
Flink实战剖析
发布2022-04-18 11:24:13
6910
发布2022-04-18 11:24:13
举报
文章被收录于专栏:Flink实战剖析Flink实战剖析

flink exactly-once系列目录:

一、两阶段提交概述

二、两阶段提交实现分析

三、StreamingFileSink源码分析

四、事务性输出实现

五、最终一致性实现

在【两阶段提交概述】中介绍了两阶段提交的基本思路以及如何根据checkpoint机制来实现两阶段提交思路,flink给出来两阶段提交抽象实现TwoPhaseCommitSinkFunction与具体实现FlinkKafkaProducer011。

一、TwoPhaseCommitSinkFunction

TwoPhaseCommitSinkFunction是一个抽象类,继承RichSinkFunction,实现CheckpointedFunction与CheckpointListener接口。抽象出了以下四个方法:

  1. beginTransaction, 开启一个事务,获得一个句柄
  2. preCommit,执行预提交
  3. commit ,执行提交
  4. abort,放弃一个事务

使用这四个方法然后结合checkpoint 过程提供的hook,来实现两阶段提交过程,看下其具体调用流程:

a. initializeState 状态初始化方法,只会被调用一次,第一件事情是用来恢复上次checkpoint完成预提交的事务与下一次checkpoint开始的事务,对于上次checkpoint完成预提交说明该checkpoint已经完成,那么执行commit操作,下一次checkpoint开始的事务说明该checkpoint,那么执行abort操作,第二件事情是开启一个新的事务,给新的checkpoint使用;

b. snapshotState 与checkpoint同步周期性执行的方法,首先执行preCommit对本次checkpoint事务执行预提交操作,并且开启一个新的事务提供给下一次checkpoint使用,然后将这两个事务句柄存放在state中进行容错,preCommit提交的事务就是在失败后重启需要commit的事务,而新开启的事务就是在失败后重启需要放弃的事务;

c. notifyCheckpointComplete checkpoint完成之后的回调方法,负责对预提交的事务执行commit操作。

在上面的流程中,任何一个步骤都有可能会失败,如果在预提交阶段失败,任务会失败重启回到最近一次的checkpoint成功状态,预提交的事务自然会因为事务超时而放弃;如果在预提交之后提交之前也就是完成checkpoint 但是还没触发notifyCheckpointComplete动作,这个这个过程中失败,那么就会从这次成功的checkpoint中恢复,会执行initializeState中的逻辑保证数据的一致性;如果在commit之后下次checkpoint之前失败,也就是在执行notifyCheckpointComplete之后失败,那么任务重启会继续提交之前已经提交过的事务,因此事务的提交需要保证重复提交不会影响数据的一致性。整个流程分析下来,除了需要保证事务重复提交保证数据的一致性外,还需要保证事务句柄能够被持久化容错,以便失败后重启恢复,接下来看下输出kafka 是如何保证数据一致性的。

二、FlinkKafkaProducer011

kafka从0.11版本开始提供了幂等与事务的特性,保证了数据的一致性,具体可以参考https://www.infoq.cn/article/kafka-analysis-part-8这篇文章,幂等通过producerId与SequenceNumber 来保证,但是幂等只能保证对单个分区操作的数据一致性,事务通过transactionId、producerId、epoch三个元素来保证,transactionId由客户端指定,producerId内部实现但是对用户透明、epoch表示对相同transactionId 不同producer的区分。FlinkKafkaProducer011继承TwoPhaseCommitSinkFunction抽象类,将kafka事务机制与checkpoint结合,如下图:

kafka的事务机制基本流程是先开启一个事务,然后发送数据,最后提交,将开启事务过程放在initializeState与snapshotState中,发送数据放在invoke中,flush 将所有缓存数据刷新到kafka ,相当于预提交操作,在snapshotState中执行,commitTransaction 提交操作放在notifyCheckpointComplete中执行。上面任何一个流程都有可能出现异常导致任务失败,对于kafka事务提交机制也是使用两阶段提交的模式,根据上一篇的分析,那么可能出现的问题就是在第二阶段,可能会出现部分提交成功部分提交失败导致数据不一致,如果能获取之前提交失败kafka 的transactionId、producerId、epoch这三个元素那么就可以在任务重启继续提交之前失败的事务,在flink 正好可以使用状态将这个三个元素进行容错,使重启之后可恢复。 在FlinkKafkaProducer011中使用KafkaTransactionState对象作为事务的句柄,保存着transactionId、producerId、epoch容错元素与FlinkKafkaProducer对象,FlinkKafkaProducer是transient类型的,不需要进行持久化,通过t-p-e就可以确定一个FlinkKafkaProducer。

理解以上流程就很好理解代码实现了,下面看几个重要的方法:

1. 开始事务,获得一个新的事务句柄

2. 预提交,执行flush操作

3. 提交,执行commitTransaction操作

4. 出现异常,任务重启放弃事务

三、两阶段提交实现总结

1. 外部存储需要满足事务特性

2. 外部存储需提供事务句柄,可持久化、可重新提交

3. 由于这种两阶段提交模式与checkpoint绑定在一起,checkpoint是周期性的执行,那么checkpoint周期的长短则会影响下游数据的延时性,需要根据实际使用情况来调整。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-10-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Flink实战剖析 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档