00:00
了解了状态一致性和端到端状态一致性的概念啊,接下来我们就可以看一个具体的应用实例,那就是考察一下flink跟卡夫卡进行连接的时候,怎么样保证端到端的精确一次状态一致性。啊,我们知道在流处理当中,最好的流处理器目前当然就是flink,而最好的数据源和外部系统应该是什么呢?我们知道如果想要保证精确一次的状态一致性的话。首先数据源这边就得保证它能重放数据,那我们知道卡夫卡它是可以重置偏移量的,就符合这样的一个要求,而且它本身就是流式的处理数据的这种方式,它是消息队列嘛,诶那所以卡夫卡跟flink可以说是天生一对,往往我们在流处理应用当中都是以卡夫卡作为数据源。读取数据之后,然后进入flink进行处理计算得到的结果我们还可以写入到卡夫卡当中,所以接下来我们就考察一样一下这样的一套系统怎么样去实现真正意义上的端到端。
01:14
那首先我们先看一下这个整体我们想要做什么样的操作,那首先既然是端到端的exactlys,那我们就得保证三个环节,三个组件都能够实现精确一次的保证。首先flink内部这个就不用说了,我们直接开启检查点,那就可以保证内部是精确一次。然后在输入端,输入端这个也非常简单,我们知道卡夫卡本身是可以重新提交偏移量的,它可以对数据进行持久化的保存,我们只要重新提交偏移量,就可以再次去访问之前已经消费过的数据啊,那所以我们可以在当前的S任务,也就是。Flink跟卡夫卡的连接器里面有对应的那个consumer flink卡夫卡consumer里面可以去配置,把当前的读取的偏移量保存成一个算子状态,然后写入到检查点当中,发生故障的时候呢,只要从检查点里边读取出对应的偏移量,然后再重新连接卡夫卡的。
02:19
连接卡普卡去重新提交偏移量,重新读取数据,就可以保证我们所有的数据不会丢失了,这样的话至少就能保证至少一次。这个过程呢,不需要我们单独的去进行任何的处理,因为flink卡卡这个连接器里边其实已经帮我们全部处理完成了。那更加重要的呢,其实是输出这一段,因为之前我们知道输出这一端的话,最严格意义上的一个实现其实是两阶段提交啊,那对于flink卡卡的连接的话,官方连接器flink卡夫卡producer,它能不能实现两阶段提交呢?
03:02
完全是可以的,我们可以在源码当中去做一个考察,我们可以看一下之前写过的。到写入到卡夫卡的这个过程,看一眼当时添加的flink卡夫卡producer,我们会看到。本身弗卡夫卡producer。实现的就是一个face commit function,这就是之前我们提到过的Li给我们提供的两阶段提交的s function接口,啊,当然了,本身这是一个。抽象类啊,那这里我们的flink卡夫卡producer就是继承了这样一个抽象类的对应的这个接口,这个抽象类里边当然就有各种各样做两阶段提交的方法,比如说我们这里看到有commit,就是做正式提交的时候要做什么样的操作,那另外当然就还有pre commit预提交啊,这两阶段提交可以看的非常的明显。
04:00
当然了,也还会有跟事务相关的一些操作,Transaction相关的一些操作,那具体的这个流程我们就不在源码里面去看了,可能会比较复杂,我们直接梳理一下,通过图例的方式做一个讲解。那首先我们先看一下当前的这个系统应该是什么样子,那当前我们要的呢,整个中间的这一部分是flink系统,我们现在简单起见,就三个任务,三个算子,首先是source,我们知道当前算子是连接是卡夫卡,从卡夫卡消费数据,那就应该是flink卡夫卡consumer,然后接下来是一个window算子要开窗,然后经过开窗操作,经过开窗计算之后得到的呢,直接就通过think任务输出到外部的卡塔里面去了,所以这里面的thinkk当然就是一个。Flink卡夫卡啊,那所以前边是从卡夫卡去读取数据,后边又是写入到卡夫卡当中去,这就是我们整个的。
05:02
端到端的连接起来的一个完整的应用,而接下来还涉及到的其他组件呢,首先应该有manager manager当然是起到了一个协调调度的作用啊,在做这个检查点操作的时候。只需要装manager者出现的,另外呢,还有状态后端,状态后端这边是涉及到了,我们要把检查点做持久化的保存,是要写入到这里面去的。接下来我们就看。首先。应该要启动,第一步就是启动检查点的保存。启动检查点的操作呢,是由job manager发起的啊,它会给所有的task manager发出一个指令,那我们知道所有的task manager接到指令之后,就会在S任务里边去插入一个检查点的分界线。那这一时刻就标志着我们进入了两阶段的。预提交阶段啊,当然了,现在其实还没有具体提可以提交的数据,但是其实这个状态已经是预提交阶段了,所以首先我们这里边是要启动检查点的保存,那么S任务就会插入一个barrier,而且把自己的状态就要开始保存了。接下来barrier就要在这所有的任务之间顺着这个数据流向下游流动,每一个接收到barrier的任务都要保存自己当前的状态。
06:29
那所以接下来呢,我们就是每一个算子任务需要对状态进行快照,首先我们会看到SS任务它的快,它的状态当然就是当前读取的偏移量了啊,所以首先应该把自己的状态做一个快照保存,保存到状态后端里面去,保存完成之后呢,应该要通知job manager,然后把barrier朝下游传递。那接下来下游的每一个算子任务都做相同的操作,都是把自己的状态保存起来,做快照,保存到状态后端,然后通知job manager,然后继续朝下游保存,继续朝下游传递。
07:14
当然了,这个过程还是一个预提交的阶段,因为我们当前还没有数据到达think这里,还没有真正的朝外部系统提交呢。啊,那最终我们关注的就是要达到了thinkk任务,Thinkink任务同样也要把自己的状态做一个快照,保存到状态后端,然后接下来要通知job manager,另外还需要当前的数据,如果要是处理的话,那需要把它通过事物。预提交到外部系统当中,所以我们会发现当前的S任务,它对于数据的处理。那是当前我们考察的应该分成了两种情况,一种是遇到了正常的数据。
08:00
那就开启一个事物。通过事物把它预提交到外部系统,那如果说当前遇到的是barrier的话,那么就把当前的状态做一个快照保存,然后通知manager当前保存完成。诶,那我们可能会考虑到当前的这一个barrier,如果遇到的话,难道不应该是表示应该要提交当前事物了吗?怎么只是把当前的这个快照做一个保存呢?这里我们需要注意,当前的barrier只是一个分界线,以它为界,之前的所有数据是前一个检查点,所以也就应该是前一个。事物去进行预提交,而在它之后呢,再来的数据,那就应该是下一个检查点,同样也就应该开启一个新的事物去进行提交。它的区别在这里,而真正这个事物最终的提交并不是以当前的barrier作为分界的,而是要等job manager那边确认所有的任务都已经完成状态保存的时候,当前的checkpoint真正保存完毕的时候,这个时候才去提交。之前。
09:21
指代的对应的检查点,对应的那样一个事物,哎,所以这里这个开启事物和提交事务的节点我们一定要搞清楚。所以当前thinkin任务看到一个barrier的时候,他所做的操作其实是。把当前的状态做保存,然后通知job manager接下来要开启一个新的事物。在barri之后到来的所有数据的提交就要通过下一个事物去进行预提交了。当然有可能上一个事物还没有真正的提交,因为有可能还没有做完检查点的保存嘛。我们现在是一个分布式的系统,当前这一个think任务它已经保存完毕,不代表其他的任务都保存完毕,所以接下来我们还要等。
10:15
一直要等到job manager通知所有的任务当前的check的保存完成啊,那前提当然就是照manager这边收到了所有任务发来的确认消息,那这个时候就可以通知所有任务了,保存完成,这个时候think任务就可以把之前的。事务一去做一个真正意义上的提交了,所以这个阶段才是真正的正式提交。这就是我们所说的两阶段提交,在这之前都是预提交,在这一步才是真正的提交。那它的优势就在于,跟我们前面提到的预写日志wal这种方式相比的话,它不是一批批量的去写入数据,而是之前所有的数据其实通过这个事物都已经写入到卡夫卡里边了,当前只是发起一个消息告诉卡夫卡我当前要把这个事物做一个正式提交,这些数据正式可用,仅此而已。所以当然当前的效率是非常高的,而我们的写入也是流逝的,写入这个过程没有任何的性能上的影响,时间延迟也会非常的低。
11:31
这里还要需要注意的一点是,在卡夫卡里边之前。通过事务提交的数据,在卡夫卡当中其实是会标记为未确认,也就是我们所说的uncommitted啊,那这些未确认的数据其实正常来讲,我们是对于卡夫卡来讲是不能去消费的,因为你如果消费了未确认的数据的话,那相当于。在外部系统看来,对于外部的用户看来的话,就相当于我已经拿到了一次数据了啊,那如果要是发生故障,当前这个事物被撤销的话,回滚的话,相当于之后我还会再去写入一次,那外部用户可能会看到两次写入,所以我们应该保证uncommitted未确认的数据应该是不能被消费。
12:20
而之后如果我们已经接收到了manager发来的确认检查点保存完成的消息的时候,这个时候正式做提交,提交当前事务,卡夫卡才会把之前的所有数据,在事物当中提交的所有数据标记成已确认,那接下来就可以正常的去做消费。那我们知道在。整个的这个流程当中,任何一个环节其实都有可能发生故障,那发生只要发生故障,我们就直接从上一个已经保存好的checkpoint恢复所有的状态,而如果要是当前没有正式提交的事物就都会被回滚,因为我们知道事物跟checkpoint是完全一一对应的,所以只要是之前已经保存好的checkpoint。
13:13
对应的那些数据当然就已经正式提交了,而没有保存的那些的对应的数据,当然对应所有即使是已经做了预提交啊,那当然也会回滚,也会撤销啊,那接下来我们就真正能保证。写入一次,而且只写入一次,这样的话就实现了端到端的one状态一致性的保证。那这里我们会发现,其实真正在实际应用的时候呢,还要去做一些额外的配置啊,那首先我们知道肯定在代码当中首先是要启用检查点啊,如果不启用检查点的话,那中间flink处理的这个环节就直接没有效果了嘛啊,所以这是首先我们要保证的第一点前提。
14:03
主要是针对flink卡夫卡producer,也就是我们的S端,这里我们需要去做一些处理。在源码里面我们可以看到,对于Li卡法producer来说。它的构造方法里边。有一种构造方法的传入,是可以传入一个所谓的producer SE,一个语义的,这个语义就是所谓的。状态一致性的语义,我们可以看到,这里边可以选择ones at least one和none啊,也就是说这个none当然就是at most ones了,其实是一样的嘛,没有任何保证,如果说我们想让它真正意义上达到。精确一次的状态一致性级别的话,一定要把它设置成one,一定要在它的构造方法里边去传入这个参数,因为默认情况下其实使用的是at least one,这是我们需要去注意的一点。
15:06
除了代码当中我们需要传入one这样一个基本的参数之外,那其实对于端到端的状态一致性的话,我们还要去做一些额外的配置,比如说卡夫卡的消费者隔离级别啊,因为我们知道对于卡夫卡而言,默认的隔离级别isolation.level其实是read uncommitted。这里边所说的卡夫卡,我们要配置主要是配置,配置写入的那个外部系统卡夫卡,而那预提交阶段数据,我们写入的时候,它标记的是未提交嘛,Uncommted,那假如说我们默认直接就是read UN committed的话,相当于就可以直接去消费它了。这不符合我们对于端到端精确一次状态一致性的定义啊,那所以如果说我们真正要想做到精确一次的话,应该把隔离级别配置成read committed,这样的话,消费者如果想要去访问未提交的消息的时候,就不能直接访问啊,那我们必须要等到真正的checkpoint做完,当前事务已经正式提交了之后。
16:15
当前的数据标记成了committed的时候,才可以真正去做消费啊,那当然了,如果这样做的话,外部应用消费这个数据可能就会有一个延迟了,因为类似于我们又得一直等,等到这个事物真正提交啊,这个延迟会稍微的大一点。除此之外,我们还有一个地方需要去做配置,那就是事物的超时配置,这就是之前我们说的。如果说当前我们在做这一个checkpoint过程当中时间比较长,而本身的事物。超过了它的超时时间的话,那就相当于这个事物直接失效了,那甚至有可能会导致我们整个数据丢失啊,这里面有一个非常重要的点,就是弗Li卡夫卡的连接器,它本身是有一个事物的超时时间默认是一小时,这个是比较长的,因为考虑到我们要做很复杂的呃,检查点的保存,还有其他很复杂的一些计算,而卡夫卡集群里边配置的事物最大超时时间默认是。
17:20
15分钟,所以这样的话就有可能会出现卡夫卡连接器,这是我们的sink任务,Sink任务这边认为。事物还没有超时,还可以继续进行事物的,呃,当前检查点的保存继续等待。而卡夫卡集群那边可能已经超过了对应的时间,已经把它关闭了啊,那这样的话我们就会丢弃预提交的数据,所以说如果说我们想要不出现这种情况的话,那应该要配置。前边的这个超时时间应该要小于等于后边卡夫卡集群配置的集群超时时间,当然一般情况我们是把这个集群的超时时间调大就可以解决这个问题。
18:07
所以我们会发现端到端的。精确一次状态一致性的实现还是比较复杂的,我们要考虑很多非常实际的细节点。这就是关于状态一致性的所有的内容。
我来说两句