00:00
好,上节课我们已经讲到了,呃,不同south和不同think,他们对这个整个流程状态一致性的保证啊,那大家会看到最好的一个保证其实就是两阶段提交,然后再结合我们的SS,这里只要它偏移量可以重置,那其实就能够保证整个过程端到端的exactly one啊,那所以大家会想到我们说这个flink和卡夫卡是天生的一对,对吧,他们俩就就适合匹配起来,也就应该在一起,那他们俩匹配在一起,如果要是我们用这个卡夫卡作为S,然后又用卡夫卡作为SK的话,连接flink系统做处理,能不能实现端到端的it one语义呢?啊,当然是可以的,所以接下来就给大家介绍flink加卡加卡不卡啊,它俩结合起来怎么样去保证端到端的状态一致性好,那首先我们还是分开来看,那还是不同的组件,每个都得保证exactly ones嘛,所以我们首先先看内部,内部这个就比较简单了,那还是checkpoint,对不对,这个跟卡夫卡没什么关系吧,只要有checkpoint,那么状态该存的时候存,发生故障的时候能够恢复,就可以保证内部的状态一致性。
01:19
接下来我们继续看S。那如果是卡夫卡consumer作为一个S的话,大家知道我们是不是可以把偏移量作为状态保存下来,存到checkpoint里边对不对,然后等到出现故障恢复的时候,我们从checkpoint里边读出当时的偏移量,然后是不是就可以由我们的连接器直接去向S那边重置偏移量,重新消费数据啊啊,所以这一部分就是卡夫卡S直接可以保证。那另外最麻烦的是这个think。Think这边的话,我们知道如果要连接还是连卡夫卡的话,呃,那个think function,那就是一个卡夫卡producer了,它作为SK的话,他需要怎么样保证exciting one呢?显然要采用两阶段提交,那是不是就得实现一个所谓的to face commit s function啊啊,这个讲到这里可以先给大家在代码里边大概的看一眼啊,我们先找一下这个卡夫卡。
02:28
大家看这里边我们又了一个flink卡夫卡producer,呃,在这里面点进去。其实大家已经看到了,它实现的是一个什么。实现的就是一个to face commit s function对不对啊,它它就是在这里去实现的啊,那大家会想到在这个过程当中,它是不是还会有一些别的一些一些操作一些需要的东西呢?啊,其实是需要的啊,大家可以看到就是在下边我们给它里边传参数的时候,其实没有看它还可以传什么。
03:06
比方说大家如果拖到下边一点的话,这里可以看到大家看它的这个构,这个构造方法啊,这里边传一个topic对吧,后边是这个序列化的STEM,然后再往后producer config,这都是常规的,最后还跟了一个。跟了一个什么semantic对吧,一个所谓的语义,这是个什么语义呢?点进去看一眼。啊,大家到这儿一看是不是就明白了对吧?这就是要选择的达到的状态一致性的级别语义对不对啊,所以这里边exciting ones at least ones not,对吧?呃,这里大家要注意一下,这里默认的级别其实是。其实是at least once,所以大家如果要是想要让它达到exactly one的话,还得把这个东西选上,好,我们还是接下来看这个弗林格加卡夫卡端到端一致性的这个保证啊,呃呃,那大家会想到就是刚才大家其实也看到了,就是卡夫卡的producer,它其实是实现了一个这个to face commit s function的,对吧?啊,当然了,在里边我们还可以加一些这个呃,配置啊,就是指定它的这个状态一致性的语义啊,这些东西都是可以做到的。好,那接下来我们来看一眼具体的这个提交过程,那我们其实已经知道了,就是flink是通过checkpoint保证内部的exciting ones语义的,对吧?然后现在呢,我们。
04:41
本身它的这一个卡夫卡作为这个S这边连接器,可以去重置卡夫卡的偏移量,那保证了我们这个数据可有这个重放的能力,那么后边的这个think这边如果要是再实现了一个两阶段提交的话,那相当于就是我们整个端到端都能保证这样的exactly one语义了,对吧?啊,这就是我们已经知道的一个概念性的东西,然后。
05:11
大家会看到这个。具体来讲啊,在做这个两阶段提交是一个什么样的过程呢?首先我们看到这是卡普卡进,卡普卡出这样一套系统,对不对?然后具体我们的流程里边,Data source,然后中间啊,这是以window为代表,其实中间就是各种各样的转换操作了,最后是data s,然后最后还是到卡夫卡整个一个完整的数据管道,对吧?那么在checkpoint的相关的过程当中,还涉及到哪些组件的参与呢?还有job manager对吧?啊,另外还有end状态后端他们都要参与进来,那job manager主要是协调各个task manager进行这个checkpoint的存储,那另外这个checkpoint主要是要保存在状态后端里边,默认是内存级的,当然我们平常在实际生产环境当中,可能往往要把它改成文件级的,或者是这个rocks DB对吧,进行持久化的保存啊,这是首先我们系统的一个状态。
06:16
然后当checkpoint启动的时候,那他就会想到现在其实就应该是在我们的进入我们的这个预提交阶段了,对吧?啊,其实整个过程已经就是在这个预提交阶段了,那么job manager,首先他发起这样的一个job manager,呃,他发起这样的一个checkpoint操作,他会把一个所谓的checkpoint barrier。注入到我们的S算子里面去,对吧,S任务里面去,然后接下来这个barrier就会像普通的数据一样夹在这个数据中间,顺着这个流一起往下走,对不对啊,每一个任务都会接到这样的一个barrier。好,那接下来我们继续看每一个任务接到他的时候都会干什么事情呢?
07:05
呃,那大家看到这个对这个barrier。给到这个S这里来的时候,那么S收到这个信息,他就会把当前的状态做一个快照,他的状态其实就是。当前的偏移量对吧?是不是卡普卡的偏移量offset,它会把自己的偏移量存到状态后端去啊,当然这个过程当中它是暂停读取数据的对吧?啊,先存,然后存到状态后端,然后完成之后把这个barrier朝下传递,然后继续再再消费数据,读取数据,所以大家看到整个这个过程当中,Checkpoint是可以保证这个内部状态的一致的啊。那接下来Barry继续朝下游传递,传到window中间的各种转换操作的时候也是一样,每一个任务接到这个barrier的时候,就开始保存自己的状态,往这个back end里面去写,对吧?然后做完了之后继续朝下传递啊,所以大家可以看到在这个过程当中,其实还是我们做checkpoint的这个过程。
08:15
那这是这这就已经看到所有的内部的算子都已经完成自己对状态的一个保存了,对吧,Barry从这里边一直传,已经传到了thinkc任务,那么sinc任务这里碰到barrier的时候,他首先。会把自己遇到的数据都先写入到外部卡夫卡系统里边去,这些数据其实就都属于预提交的事物,而当他遇到一个barrier的时候呢,它会把这个状态保存到状态后端,当然就是如果有的话,对吧,然后接下来大家注意,它是要开启一个新的预提交事物。但是之前的那个预提交事务,预提交的事务没有没有关闭,没有正式提交,对吧?而是从这里开始了一个新的预提交事物,言下之意就是以barrier为界,之前的数据属于上一个事物,然后之后的数据是不是就属于下一个新的事物了?哎,所以大家看,这就是把事物和check完整的结合在一起了。
09:26
然后我们继续看,当所有的算子任务快照完成的时候。啊,那也就是说这一次的checkpoint整个都完成了,那么job manager就会通知所有的任务,告诉他们,好,我们这一次checkpoint胜利完成,你们该干嘛干嘛了,那大家想到别人的话,那基本上就是该干嘛干嘛,对吧,其实也不会受什么影响,但是think这里边就有很重要的任务要做了,因为现在checkpoint正式完成,他要做一部,是不是正式提交之前的那个事务啊,就checkpoint,就是收到Barry之前的那个事物,现在就可以关闭正式commit了,那么卡夫卡那里边没有确认的数据就可以改为已确认的状态,就可以正常被外部系统消费了,啊,这是整个这样的一个过程啊。那么在这个任务运行的过程当中,大家会想到任何一个阶段失败,它是不是都会从上一个checkpoint保存的那个状态里边去做恢复啊。
10:27
而且呢,大家发现checkpoint是跟这里的事物捆绑在一起的,所以是不是要恢复就都恢复,所以整个端到端就保持了这样的一个exciting ones的语义表达。那我们再把这个整个的过程再总结一遍,整个是一个什么样的过程呢?首先第一条数据来的时候,其实这个时候就应该开启卡夫卡的事物了,对吧?啊,当然是说那个在thinkk任务那里啊,收到第一条数据的时候,他就应该开启一个卡夫卡的事物,这个时候所有来的事物,所有来的数据都是写入,按照这种事物写入的方式写到卡夫卡里边去,那么这个时候他们是会正常进入卡夫卡的分区日志,但是会被标记为未提交对吧?哎,所以他其实是一条数据,一条数据都已经写进去了,但是这只是一个预提交,并没有真正的能够让卡卡消费。然后接下来draw manager在某个时刻触发checkpoint的操作的时候,Barrier就会沿着S对吧,沿着这个数据流从S开始向下传递,遇到barrier的算子,就把自己的状态存到状态后端里边。
11:43
然后通知招。那think连接器当他收到Barry的时候呢,也是一样,也是保存当前的状态,存入checkpoint的通知job manager,而且它还有另外的一个任务,他要开启下一阶段的新的事物,对吧?接下来要收到的数据,所有的就都通过这个新的事物去预提交了。
12:06
啊,但是呢,之前那个事故还没提交,那之前那个事故什么时候真正提交呢。当job manager收到所有任务的通知,发出确认信息,告诉大家checkpoint真正完成的时候,那think任务这个时候他就正式提交这段时间的数据。啊,那当然了,这个时候外部的卡夫卡就可以关闭事务,那么提交的数据就可以正常被外部消费了,对吧?啊,这就是整个flink和卡夫卡exactly one2阶段提交的一个步骤的总结啊,大家看这个过程是不是就会对两阶段提交有更深刻的一个理解啊,啊,就会梳理的更清晰一些,是吧?啊,所以这个是希望大家能够做到的一些东西需要给大家讲的,就是说呃,如果我们这里边要去在这个呃代码里边实现,就是真正的实现这个卡夫卡跟flink连接起来的exactly ones语义的话,除了我们前面说到的就在这里边啊,因为它默认的那个semantic是at least ones嘛,所以我们这里边要打开这个呃,Exactlytic ones的这个语义,除了这里需要配置之外,还需要注意什么呢?
13:25
啊,这里大家注需要注意啊,对于外部卡夫卡读取的那个,呃,消费者的隔离级别也得注意一下。因为大家知道像我们说的那个隔离级别,呃,isolation.level正常情况下应该是默,默认情况下应该是read uncommitted对吧?如果说你要是默认情况下可以读未提交的数据的话,那相当于我们是不是这个整个一致性还是没有得到保证啊,我这边预提交数据没提交,呃,没有最终确认结果那边已经可以读了,那相当于那边已经消费掉数据的话,那我们这个事物不是相当于是假的吗?啊,所以大家注意啊,就是还涉及到我们外部卡夫卡的这个隔离级别需要改一下,另外还有一个点。
14:13
还有一个什么点呢,就是大家需要注意,就是我们前面提到的timeout的这个超时的特性,这个主要就是说,呃,在flink卡夫卡它的这个SK这个过程当中,它默认的那个超时是一小时,而卡夫卡集群里边配置的transaction事物的。默认超时时间是15分钟,所以说大家想如果要是按照这样的一个默认时间去配置的话,可能会有什么问题,就是我们flink卡夫卡SK,就是连接器这边这个这个任务。他能开放的时间长,对吧,这边他还在等着一边做操作呢,但是卡夫卡那边是不是觉得这个就是有可能这边事物时间太长,我们等那边checkpoint等的时间很长,对不对,结果那边直接把这边关了,不等这边完成,那边就已经关了,那有可能会出现什么。
15:12
那就相当于那边的事务已经结束了,而这边我们认为还没结束,还在继续执行checkpoint,等他完成对吧,那最后就是这边checkpoint完成了,结果最后那边的数据没有没有正常写入进去,对不对啊,相当于丢掉了,所以这里大家需要注意啊,这个超时时间,两边事物的那个超时时间也得匹配起来,正常情况下肯定是啊,你前面得比后面那个小,对不对,得等这边就是说假如说。你要是让他挂的话,也得是checkpoint那边,就是他那边超时先挂,先挂的话,那就相当于那边也不要提交,直接就全回滚了,对吧?啊,所以这就是这个两阶段提交确实还是有一些坑的啊,就是还需要做很多很多的配置,大家如果想看详细的一些内容的话,呃,可以在这个官网上去看它的一些具体的讲解啊,就里边其实配置还是讲的很详细的。
我来说两句