00:00
前面我们已经介绍了,端到端精确一次的保证,我们知道这个要求其实还是非常高的啊,最好的实现方式诶,那就是首先。数据源方面我们可以去重置偏移量啊,那我们知道很多的这个流失处理的数据源其实都能做到这一点啊,然后另外呢,在think端写入外部系统的时候,最好是能实现一个事务性的提交,最理想的实现方式就是所说的两阶段提交to PC的方式,哎,那在实际应用的时候,什么样的架构能够实现这样的端到端精确一次呢?诶,那我们知道啊,那就是。Flink的最佳搭档卡夫卡啊,所以接下来呢,我们要介绍的就是flink和卡夫卡进行连接的时候构建的这一套流处理系统,它的精确一次状态一致性的保证好,那这个过程当中我们会发现啊,他俩既然天生一对嘛,所以我们完全就可以前面读取数据源的时候利用卡夫卡。
01:01
作为一个数据源,哎,消息队列啊,先放到这个卡夫卡里边,从卡夫卡读取数据,接下来呢,交给link。去进行一个数据的流处理转换,然后接下来得到的结果再重新写入到卡夫卡的另外一个。主题topic下面去,哎,那这就是我们所说的类似于一个数据管道,那前面的source任务和think任务其实连接到的都是卡夫卡,之前我们说到了啊,South这一端link底层其实用到的是一个flink卡夫卡consumer,一个消费者,那它本身呢,就可以把当前已经处理的数据的这个偏移量作为状态保存下来,发生故障之后呢?诶,那进行回滚,从检查点去进行回滚,读取状态之后,那就会。重置偏移量,从卡卡里边重放数据,重新去进行处理,那这样的话就至少保证数据不丢,然后接下来内部我们打开checkpoint,开启检查点,就可以保证处理的结果都是精确一次的,那最后呢,关键就在于写入到卡夫卡这里面的这个连接到底是怎么样去做的,那我们知道flink写入到卡夫卡这里,我们使用的这个连接器提供的一个function,就是所谓的flink卡夫卡producer啊,那这样一个think方式呢,其实之前我们在代码里边也简单的看过啊,我们可以打开之前所实现的s to卡夫卡这里。
02:31
这里传入了一个flink卡夫卡producer,我们会看到它本身其实就继承自to face commit s方式,这就是我们所说的flink底层给我们提供的两阶段提交think function对应的这样一个接口啊,当然了,它本身是一个抽象类,因为它还呃就是继承自rich think function这样一个抽象类。那么在这个类里边呢,如果我们简单的看一下它里边可以调用的方法就会看到啊,这里面的抽象方法其实非常非常多,除了每来一个数据都要去调用的这个invoke方法之外啊,另外啊,我们可以看到这里有一些初始化状态的方法initial state。
03:12
比如说有去构建事物,处理事物的一些方法啊,Begin transaction啊,去开启一个事物啊,那current transaction啊,去获取当前的事物,那这些方法都是有的啊,另外我们还可以看到两个非常明显的方法,一个叫做。Pre commit,很明显这就是预处理阶段我们所要去进行的操作啊,然后另外还有一个就是commit啊,就是正式提交,正式处理的阶段,这就是我们所说的两阶段提交,当然了这个具体的处理过程啊,可能会非常的复杂,所以这里边我们就不去详细的去进行介绍了,那感兴趣的同学也可以自己去看一看具体的处理逻辑,那一般在实际使用的过程当中呢,只要直接使用连接器帮我们提供的这个producer,它自己就给我们实现了底层的那些东西。
04:02
所以我们在这里呢,还是只要了解了弗link跟卡夫卡连接起来之后,端到端进行状态一致性啊,精确一次保证的原理就可以了啊,那接下来呢,我们还是举一个例子啊,考虑一个具体的流处理系统,考虑一下具体的过程到底是怎么样去处理的,好那首先我们看一下当前这个流处理系统里边所有的组件啊,诶,那首先呢,这里边涉及到有一个job manager,这是我们说进行。检查点中央调度的组件,然后另外呢,诶还涉及到一个状态后端,因为检查点的保存跟它是有关系的啊,另外呢,我们这里边还有就是数据源使用的当然就是卡夫卡了啊,这是连接到外部系统,然后呢,中间这一块,这就是flink处理系统,我们主要分成了三大块儿。Source transform和think,哎,这里的transform呢,我们举的例子是进行了一个窗口计算啊,是一个window算子啊,那整体来看的话,South这边连接到卡夫卡,那很显然这里就是弗link卡夫卡consumer,那最后的S呢,连接到外部系统写入的还是卡夫卡,所以这里当然就是弗Li卡夫卡producer。
05:12
接下来我们就来看一看端到端的exactly one啊,这样一个两阶段提交的过程到底是怎么实现的啊,那首先啊,我们在处理的过程当中啊,我们就以一个检查点的触发作为一个初始的阶段啊,因为关于这个检查点啊,数据处理和检查点的保存,前面我们都已经讲过了啊,这个流程已经非常熟悉了,这个就简化,我们就以draw manager,像task manager啊,所有的这个S算子注入一个检查点的分界线barrier作为当前我们考察的起始点。啊,那首先这里触发了一个检查点的保存啊,那么SS任务这里接收到这个信息之后,就会把一个分界线barrier插入到当前的数据流里边啊,接下来呢,这个barrier就随着所有的数据啊,在当前的流里边去进行传递啊,当然了,朝下游传输的时候是直接去广播出去的啊,那下游接收上游不同的算子任务发来的边的时候啊,那可能需要去执行这个分界线的对齐啊,那当然了,我们去进行配置的时候,可以去设置非对齐的分界线,这个前面我们都已经说过了。
06:18
那接下来我们就考察具体的每一步操作,那首先我们知道啊,对于每一个算子任务而言,它接收到数据的时候处理数据,那接收到bar分界线的时候呢,就要对自己当前的状态做一个快照保存好,那首先就是SS任务需要对自己的状态,也就是当前读取数据的偏移量做一个快照保存。保存呢,当然就要存到状态后端对应定义的那个存储空间了,一般默认情况下就是分布式的文件系统。然后将Barry朝下一个任务啊,下游任务window这个操作去进行传递啊,那同样window操作接收到Barry之后呢,也是把自己当前的状态做一个快照保存,保存到状态后端啊,所以这样一步一步去进行操作,这是我们之前所说过的检查点的保存,然后最后呢啊,这barri会传递到最后一步的think任务,那同样think任务接收到这一个。
07:17
Barry之后。也会将自己的状态去做一个快照的保存,写入状态后端,那它不一样的地方在于,诶,当前它是需要将处理的结果数据要写入到外部卡夫卡里面去的,那这个时候的写入怎么写入呢?注意这个时候我们是预提交,所以这个预提交它就需要去。开启一个事物,这就是我们所说的transaction,一对应检查点的一个事物,所以它开启事物的时间点其实就是接收到Barry去进行状态快照的时候。它就连接到外部系统开启了一个事物,然后呢,Barry之后再到来的数据,那就都会基于这个事物去提交到卡夫卡里面去了,哎,所以这个过程,这就是我们所说的预提交的阶段。
08:14
那这里需要注意的就是think任务接收到barrier代表着当前事物的开启,那事物的关闭是什么时候呢?要注意并不是下一个Barry来到的时候。每一个barrier来到的时候,啊,来到think任务这里的时候,只对应着一个事物的开启。哎,那什么时候之前的这个事故就要关闭了呢,那是要等到。Job manager通知,当前检查点已经保存完成的时候,哎,那是对应着checkpoint的完成,这个检查点保存完毕了,哎,那所有包含在这个检查点状态更改里边的这个所有的数据对应的输出结果就可以正式提交到外部系统里面了,所以这个时候呢,Think任务就正式提交之前的事务,把之前还没有被正式确认的数据标记为已确认,接下来呢,这些数据就可以正常被外部应用消费了。
09:13
那如果在这个过程当中啊,如果啊,在接收到drop manager发出这个检查点保存完成通知之前发生了故障挂掉的话,那很显然我们当前所有这个事物就没有正式被提交,那所有这里边啊,这个检查前里边涵盖了的所有数据,对应的这个输入写入就全部会被撤销回来,所以回滚之后啊,就是上一个检查点里边保存的所有数据的状态,而那些数据呢,都已经因为检查点确认了,所以那些数据都已经正式提交到了卡卡里面,所以这样的话就实现了真正意义上的两阶段提交,就可以保证端到端的exactly one精确一次状态一致性。
10:01
这就是flink跟卡夫卡进行连接的时候,它进行状态一致性保证的原理和过程啊,那在实际使用的过程当中呢,真正要实现端到端的exactly one啊,还要有一些额外的配置。啊,首先我们在flink内部啊,必须开启检查点的功能,这个就不用说了啊,要保证精确一次嘛,然后接下来呢,在flink卡夫卡producer它的构造方法当中,必须要传入一个参数semantic.it once,也就是指定当前的状态一致性语义是精确一次啊,那其实这个在我们源码这个当中啊,看这个弗link卡夫卡producer它的构造方法。我们也会看到有一种后边就可以传入一个flink卡夫卡producer.semantic啊,那这里边它的选项呢,就是exactly one at least one和那那这里我们既然保证精确一次啊,默认情况下它其实开启的是at least one,所以如果想要达到精确一次的话,需要把这个也打开啊,这也是一个选项。
11:10
那另外还有就是我们需要去配置卡夫卡读取数据的消费者的隔离级别,这里所说的卡夫卡是什么呢?是我们最终写入到的这个外部对应的卡夫卡的这个topic。因为我们说当前是使用了一个两阶段的提交,那预提交阶段提交上去的那些数据已经在卡夫卡这个主题下了,那他能不能被外部应用去进行访问消费呢?诶,那。默认情况下提交上去的那些数据啊,它都是uncommitted的状态啊,也就是说只是被标记成了未提交的数据,那如果说卡夫卡里边啊,它的隔离级别isolation.level,我们知道它默认情况下其实是read UN committed,也就是说如果是未提交的数据的话,外部应用也是可以去读取消费的。
12:01
那这样一来的话,我们这里边对于事物的这种保证就没有意义了吧,关键就在于其实不是这个事物要去撤回,要去回滚,关键是不能让外部应用直接消费这些数据,不能让用户,最终用户看到。啊,那所以这里面呢,我们就应该把这个隔离级别要单独的设置成committed read committed只有。真正提交了的数据才能够被读取消费,那遇到被提交的消息的时候呢,就不会被去读取消费了。啊,那最后呢,还有一个配置的细节点,就是事物的超时配置,之前我们就说过啊,对于外部系统而言,有可能就会出现我们在等待这个检查点保存完成的时候呢,事物已经超时了,诶直接外部系统把这个事物关闭了,那这个时候就会出现状况,就会出现错误。这里有一个很大的问题啊,其实就是flink卡夫卡连接器里边,它是有一个默认的事物超时时间的,那这个默认的时间配置呢,是一个小时啊,等待还是很长的啊,但是卡夫卡的集群里边。
13:09
默认的事物的最大超时时间配的不一样啊,默认是15分钟,所以有可能就会出现什么情况呢?那就是卡夫卡连接器这一边,这就是我们的thinkk任务吗?Thinkk任务这边事物还没有超时,那当然他就会继续等待检查点保存完成啊,那但是呢,如果已经达到15分钟的话,卡夫卡集群那边就已经把我们当前的事物关闭了,那当然就会直接丢弃预提交的数据。而如果之后我们的那个检查点又正常保存了的话,那这些数据它默认是已经写入到外部系统了,那最终这些数据相当于就没有写入,就会被丢掉。所以如果我们想做配置的话,应该。让前边的这一个超时时间,也就是卡夫卡连接器里边的超时时间,事物的超时时间要小于卡夫卡集群里边配置的事物最大超时时间啊,那这样的话就可以真正意义上的实现弗林跟卡夫卡连接的端到端的精确一次状态一致性。
14:09
这就是这一部分内容。
我来说两句