00:00
接下来我们来给大家再总结一下不同的S和think啊,呃,在这些特点下,我们能够保证的状态一致性,端到端的状态一致性能达到什么级别,那这里边我们区分就是要看,首先看S是什么样的,然后看think,那S这里边主要分两类,一一部分就是不可重置偏移量的,另外还有一类可重置,那如果不可重置偏移量的话,大家就想到了,这个就没什么好说了,整个任务就有可能就是数据就有可能丢掉啊,所以说它最多也就是达到at most once的状态一致性保证,这个就是没有什么好说的啊,不管什么样的think,不管我们flink内部checkpoint机制到底多么的完完善,这里边都有可能丢数据,因为它不能重置嘛啊,那所以接下来我们的讨论其实主要都是基于source可重置的这种状态下来说的,那这里边如果说它这这个S可重置偏移量的话,我们会发现它至少就能达到at。
01:00
办了对吧?呃,因为可重置,那至少我们这个SS任务提重新提交偏移量之后就可以保证数据不丢,所以至少是at least once,那么呃,这里边think任意情况下都是at least once,那接下来我们要考虑的是什么情况下能够达到exactly one呢?有几种情况都能达到,就是密等提交的时候能够达到exactly one,但是大家要注意在故障恢复的时候会出现暂时的不一致,对吧?就是我们说的好像是那个呃,要回回退到之前的某一个状态,好像历史会回滚一下,重演一下,然后之后追上现在,呃,就是故障恢复点的那个状态之后,后面就一样,所以说整体上来讲最终的结果是一个F,但是呢,呃,中间那个过程好像看起来有点奇怪,这是密等的提交,而这个预写日志wal的这种方式呢,诶我们说它这个是at least ones,为什么呢?因为在你假如说。
02:00
后续没有那个事务保证的话,呃,那你只是一个批量提交的话,这里边其实还是不能保证在提交过程当中,如果挂掉的话,是不是会出现重新写入的情况,对吧?哎,所以在这种情况下,我们这里边就是最终你保证的还是at least one,但是其实我们会想到就是你你从跟这个整个这个运行的集群挂掉的这种故障相比,在提交一批数据的过程当中,刚好挂掉这个概率其实比较小,对吧。所以说它只是在这种比较小的情况下,出现了这种比较极端的情况,他是at least once,但是在大多数情况下,大家可以认为啊,他是一个。就是exactly对吧?啊,所以在有些场景下也还是不错的,也是一个选择对吧,就是它实现起来比较简单嘛,啊那更加好的一种方式,那就是两阶段提交,你真正的开启一个事物绑定到checkpoint上,这就是最终完美的实现了,That one好,那接下来我们再来给大家看一看,就是弗link跟卡夫卡他们连接起来的,这样的系统是怎么保证端到端状态一致性的啊那我们说这个,呃,在内部的保证呢,当然就是checkpoint了,对吧?呃,这就是flink内部利用checkpoint保证了状态的一致性,一个大罐S,然后呢,呃,S端呢,我们知道卡用这个连接器里边给我们提供的卡夫卡consumer对吧?呃,这个弗link卡夫卡consumer它其实是可以把。
03:35
偏移量作为状态保存下来的,而且呢,出现故障的时候,它可以自动帮我们按照就读取出来的那个状态作为偏移量重新向连接的那个卡夫卡集群去做提交啊,那这个时候就相当于我们可以重重重放数据对吧,保证数据不丢啊,这就至少能达到这个要求了,然后接下来关键的一个点就在于think,那think是怎么做的呢?之前我们在代码里边也曾经看到过,对于这个卡夫卡当前的这个提交而言。
04:10
我们在看到这个里边,呃,是在做这个S的时候啊。卡夫卡的think里边看到引入的这个弗Li卡夫卡producer,它的底层实现其实是,呃,之前我们说它是一个这个to face commit think方式就实现了这个接口,对不对啊,所以说利用这样一个接口实现了两阶段的事务提交,把当前我们的这个checkpoint机制跟提交事物的过程绑定在一起,这样就完美的实现了端到端的状态一致性,Exactly one状态一致性啊,那具体来看的话,这个整个的这个过程又是什么样的呢?我们用几张图来给大家说明一下啊,呃,首先我们这里边这张图里边有几个重要的组件,我们看到有manager manager在这个过程当中起什么作用呢?它主要是就我们说的协调调度,对吧?Checkpoint主要是由它来做协调调度的,然后另外还有一个是stay back end stay,大家知道本身本地的状态是存在里边的,然后呢。
05:17
啊,呃,就是就是不同的stay back end,它会存到不同的地方,对吧?然后呢,做这个checkpoint的时候呢,它又会把对应的这个状态直接保存到不同的这个持久化存储空间里面去啊,前提是你是file system的,或者是rock DB的这种back啊,啊,所以它主要起的作用就是保存啊,这是另外重要的一个组件,那接下来还有很重要的就是外部的source系统卡不卡对吧?呃,外部的S系统卡不卡,卡不卡,进卡夫卡出,中间是flink内部的各各个任务,我们主要考虑的就是S任务,S任务还有中间的各种转换操作,Transform,这里边我们举的例子是一个window的计算啊,这里边接下来我们就来看到底是应该做什么样的,这样的一个数据来了之后,到底要做什么样的行为了啊,再给大家梳理一遍流程,首先啊,接下来大家看到我们这说的就是预体交接。
06:17
段啊,就是一开始都没有真正提交的时候,首先我们的这个考虑是前面的这个数据不停的来,这个我们就不说了啊,接下来数据来了之后就是一个来一个处理一个来一个处理一个,对吧,这个阶段都是预提交阶段,然后接下来关键一个点是drop manager这边发出了一个指令,告诉source任务,现在我要去触发一次checkpoint的保存操作,那接下来就相当于S这里S任务这里边就注入了一个barrier对吧?啊,就在当前的这个data flow里边,数据流里边接着就注入了一个barrier,然后barrier就按照之前我们说的,哎,要去做这个barrier对齐对吧?然后向下游广播,按照我们定义的那个规则,跟着数据流不停的在算子任务间朝下游传递,所以呃,就是首先我们SS任务接到了这个barrier之后啊,他是先要把自己的当前的状态做一个保存,那他的状态是什么。
07:17
当然就是当前偏一亮了,对吧?把自己的offset做一个保存,写到状态后端里面去,然后接下来把这个barrier朝下游传递,接下来就继续读数啊,这个就不管了,对吧?当然这里面还有一个呃操作就是他要做完了之后,我们说他要通知一下draw manager对吧?告诉draw manager,好,我已经搞定了啊,那draw manager这这边呢,他会确认说当前这个checkpoint搞定吗?当然不会,我们说checkpoint完成标志是所有任务都完成对吧?Drop manager去确认他完成这个才算完成,所以现在还没有,只是S任务搞定了,然后接下来呢,呃,这个就是window任务,收到了这个barrier,他把前面数据全处理完成之后,接到barrier的时候,同样也是保存自己的这个状态到状态后端,呃,然后做快照对吧,那做完了之后呢,我们就把这一个再通知通知给这个draw manager说好我做完了,然后再把Barry朝下游继续传递。每一。
08:17
任务都这么样去做,那我们说等到恢复的时候呢,Checkpoint从这个状态后端里面读取出来,恢复状态就可以保证内部的状态一致性,就是每一步操作都是这样的一个做法,对吧?直到这个think任务也是这样,他也是把自己的任务状态做一个保存,然后通知drop manager,我这边已经完成了,而且接下来大家看啊,这里边有一个这个操作就是pre到外部的这个系统,那这里大家要注意啊,这个过程其实并不是说这个Barry来了之后,他他要做这个操作,而是说什么呢?是之前所有的数据,每一条数据来了之后,是不是都是通过这个预提交pre commit。
09:01
直接来一个处理一个就输出到外部卡夫卡里面去了啊,就是这样的一个过程对吧?啊这个并没闲着,然后现在注意啊,现在我们是遇到一个barrier之后,现在应该要怎么样了呢?啊,现在遇到这个barrier之后,接下来是要真正的去commit这个transaction啊,但是这里边还需要等一个信息对吧?诶就是大家会觉得这个我我感觉应该是这里边这个think任务,遇到接收到这个barrier之后,就应该去提交这个,呃,当当当前的这个呃,这个事务了,但其实不是这样的,大家要把这个流程捋清楚啊,这个过程其实是什么呢?之前我们的所有的数据一个一个来,这当前是一号拆point的,对吧,之前的这个数据大家注意啊,就是都会保存在一个什么里边啊,就是我们说的一个事物里边,对吧,这个我们叫一不是保存在事物啊,就是通过这个一做预提交,提交到这里。
10:02
对吧,一个一个都提交到这里面来,然后大家注意啊,我这里边think任务接收到这个一号拆的barri的时候,我要做一件什么事呢?我直接把这个直接直接这个就呃事务就关闭吗?大家注意不是啊,因为当前这个这个事物你关闭的条件是什么?是要当前的checkpoint真正完成,对吧?当然你可以认为说啊,我已经是think任务在做这个保存了,我保存完了之后,我就可以认为是已经全保存完了,我可以去提交了,但不是的。那因为我们说这个各个任务之间,他们就是互相都不影响对吧,都不挨着,所以说你在做这个处理的时候,别人也在同时做处理啊,有可能会出现什么事儿呢?前面window操作,我做这个拆框里的保存的时候,他的那个任务状态特别多,状态特别多,就有可能占的时间长,对吧?那就有可能会出现什么什么事情啊,就是我这里边在就是假如说我开启了那个异物保存,那大家想我这里边是不是还在保存的过程当中,拆point没完成,但是呢?哎,当前的那个那个barrier和后面的数据都已经往下游传递了,对吧?所以就有可能会出现thinkin任务都已经保存完了,前面还没保存完,是有可能出现这种情况,或者说我们考虑这个并行任务的时候也是这样对吧,你并行自己这个已经完成的话,别人不一定完成啊,你现在这个拆呢,并没有完成完全搞定的,那这个怎么办呢?这里边并不能直接提交。
11:40
关闭上一个事物,直接提交我们的,呃,当前的这个任务,而是得怎么样呢?在这里我作为分界线重新开启下一个新的事物transaction to,大家注意啊,Barrier的标志是表示我现在要开启下一个新的事务,然后怎么样呢?哎,接下来来的这个数据照常来了之后,哎,就是上一个这个CTION1就不再去接收事事务了,对吧?不再去接收这个,呃,数据的这个写入就不再去提交到这个事物里了,而是后边的这个数据都通过transaction to这个事物去做提交了,预提交,这也是预提交对吧?啊,那到底是什么时候我这个transaction one真正的去提交呢?那就是我们说所有任务都已经处理完成之后,Job manager这里边在给他发出确认信息的时候,这个时候我再正式的提交。
12:40
当前的,呃,所有的这个就是当前真正的确认当前的这个transaction事务,把它关闭,然后正正式的提交所有的数据,这才是完整的一个过程啊,所以说就是真正关闭事务啊,确认提交的这个时间点并不是我们前面收到Barry的那个时间点,而是要等招班的通知。
13:05
啊,所以这里边你就会发现之前那个卡夫卡里边的数据,我们不是有一个那个呃,就是隔离级别吗?对吧?呃,我我们它本身的那个有一个呃,就是有一个comted的,有一个TED的对吧?我们可以去去把它这个做一个配置,那之前本来的所有的数据应该默认你提交之后应该是一个就是未提交未确认的一个状态,对吧?尽管这个数已经在卡夫卡里边了,那现在提正式提交关闭事务,那就相当于卡夫卡里边的数据就变成已确认的状态了,对吧?啊就是这样的一个过程啊,当然这里边我们会发现,其实对这个呃,外部的这个卡夫卡要求还是有的,为什么呢?啊,因为首先是一个我们想到的就是外部的那个隔离级别啊,就是呃所所谓的那个呃,Isol isolation level,你必须要设定成一个read committee的,对吧,就是外部卡不卡那个。
14:05
的消费数据的话,必须只能读那个已经确认的数据,你如果要是说我们这里边未确认的数据,尽管已经提交到唐卡了,结果外边的那个应用程序直接就要消费,直接就把它读出来的话,那不就又相当于是我们这里边的状态一致性没有保证了嘛,对吧,又有可能读到会被取消的那个数据嘛,对吧,就相当于你那个转账一样,你如果转到一半,中间那个挂了,你直接读这个数,那不是又会导致后续有可能回滚之后这个数据就是错的吗?啊,所以这里边必须隔离级别是读取这个啊,就是已提交的数据,Read committee,那另外还有一个要求,就是说我们前面提到的那个。超时时间对吧,就是外部的那个超时时间,呃,在这个卡夫卡的本身的设置配置里边,默认的这个超时时间其实是一个,呃,应该是一个一小,呃,就是应该是一个15分钟的一个超时时间,而我们在很尴尬啊,就是内部做这个S任务的过程当中,这边我们等等等那个checkpoint的这个过程当中啊,它默认的那个超时时间是15分钟,所以如果要是这样的话,那会。
15:16
呃,对,就是外边是15分钟,内部是一个小时,所以这会导致一个什么问题呢?就是里边我们等那个拆point等的时候,我可以等一个小时,一直等他在那边,呃,这个完成这个checkpoint,我这边这个任务才会去超时关闭,对吧?那所以这边checkpoint没完成,我可以一直等,但是卡夫卡那边呢,诶有可能就15分钟就到点就直接关闭事误了,那就会导致前面我们说的,哎就就有可能导致这个就是真的是呃,你这边拆你的最后认为他写成功了,但是这里面没有写进去,最后回滚了,导致数据丢失,所以为了满足这种条件啊,避免这种情况出现,当是一般情况你拆能做15分钟以上,说实话这个呃,那那就不要做了吧,对吧,一般情况我们那个拆的那个超时时间可以把这种情况搞定啊,呃,但是这个你就要注意一下,至少你这个配置不要比那个15分钟少,呃,就是这要至少要。
16:17
哦,比那个15分钟要能hold得住,对吧,要不就是checkpoint的超时时间要比那个要小,至少checkpoint就提前挂了,要不呢,就是这里边你还得配那个think任务里边开启,就是连接器里边开启事物的那个默认的超时时间啊,这个就是稍微还是有点麻烦啊。好,那再给大家总结一下的话,整体来讲两阶段提交啊,弗利格跟卡夫塔之间两个阶段提交是什么样的呢?那就是第一个数据来了之后,我们就应该先开启一个事,对吧,这是,然后接下来所有数据的提交,那就都是提交到里边,就是这样的一个过程,然后接下来怎么办呢?接下来是就触发那个那个操作,那么任务里边注入了一个barrier barrier就朝下传递,所有遇到barrier的任务就开始保存状态,保存完了同时找manage,那think任务收到这个barrier的时候干什么呢?它也是先保存状态对吧?呃,然后通知job manager,但是我们可能会想到你收到这个barri了,该关闭这个任务了,对吧,他不是,他是开启下一个。
17:32
传CTION2,然后这个我们叫做分界线了,分界线就是前边的保存到传CTION1里边,后边的就保存到传ACTION2里边去了,但它表示的是TRANSACTION1不再接收数据了,而不是说TRANSACTION1直接关闭,那它到底什么时候关闭呢?他要等job manager确认的通知对吧?啊,那个整体来讲,我们认为正常情况下应该是差不多的时间,我这边接收到我自己保存完成之后,那就应该招manager,那边就他是最后一个操作嘛,就应该是都完成了,对吧?啊,就应该是差不多时间这边就可以关闭了,所以接下来所有的数据就正式提交,就可以使用了,就可以正常消费了,这就是一个完整的两阶段提交的过程。
我来说两句