00:00
来吧,那我们继续往后说啊,呃,刚才呢,我们提出了这个第一种解决方案啊,就是事物的方案啊,但这种方案的话呢,呃,它有很多这个问题啊,也有很多这个限制啊,所以说呢,我们目前的话就不考虑采用这种方案了啊,那我们需要这个,呃,另辟蹊径对吧,我们寻找这个其他的这个解决策略啊。嗯,大家也可以去想一想啊,就是如果说把这个问题呢,诶甩给你了,说这个你需要把这个问题给他这个解决一下啊,那你能不能想出来这个更好的方案。对吧,啊行,我们一起来看看吧。呃,现在我们还有另外一种策略啊,什么策略呢,就是使用这个后置提交偏移量加上这个密等的一个处理方案。啊,这个看起来很抽象是吧,那我们一起来这个分析一下,呃,就是如果说啊,我现在告诉你,你在我们当前这个环节,如果你想做到这个精确一次的话呢,很难。
01:02
对吧,因为你如果说现在你就算做到精确一次,那大概率情况下,你就得采用把他们两个绑定到一起做成15的方案,嗯,但是呢,这个我们已经把它pass掉了,所以说我们就想啊,如果说我现在不追求这个精确一次,但是呢,我能够从整体的处理。流程中啊,最终去保证这个精确一次是不是也可以呀,就在你这个数据的计算之前啊,我把它保证了精确一次是不是也可以的呀。对吧?啊,那如果说你这样能行的话,那我们的这个方案就比较多了,你比如说啊,现在我们有这种方案。来,我把它打开。呃,我们这样去做看好了啊,就是我们一定要先保证。后置提交偏移量。就相当于啊,你现在两个问题,一个是这个重复提交重复消费啊,一个是漏消费,如果说我们现在只能让你去卡一头的话。
02:02
那我们是不是首先要保证?不丢数据啊,同学们。对吧,如果你数据丢了,那后面你就没得聊了。是不是你从前面的这个环节处理过程中,你就已经把数据丢了,你到了后面你就甭想着说有这个数据。对吧,所以说我们就可以什么先考虑什么,先考虑你的数据呢,不丢。对吧,我现在把数据给他留下来,留下来以后呢,在后续的处理过程中呢,我再考虑你这个重复的问题。对不对,好,那怎么就保证数据不丢了呢?那就一定是先写出数据,然后呢后提交偏移量。对吧,这样的话我是能够保证你的数据不丢失的。能听懂我的意思吧,好,所以说你看了啊,我们的所谓的后置提交偏移量,就是一定要保证你把数据写出去以后呢,我们再次呢去做偏移量提交,这样的话呢,哪怕你提交失败了,那顶多是导致我的数据的重发一次,重发一次的话呢,我再去做一次处理,那可能就什么导致了这个重复,但重复问题我们是可以再去解决的吗。
03:00
对吧,但如果说你的数据都已经丢了,那你就没得挽回了。对吧,所以这就所谓的什么后置调偏音量来,它的思想就是手动的后置去提交你的opposite,然后呢,先保证这个at什么list,就是所谓的什么至少一次,对吧,我至少得先把数据呢,消费过一次,能够成功的写入到我的这个下一层,然后呢再把这个偏移那做一个提交。好,那么在这种方案中啊,我们就留下了一个什么隐患,什么隐患呢,就是一个重复的问题,好,那这个重复问题我们就什么在后续的处理过程中,我们看看能不能做一个幂等处理。啥叫幂等处理呢?就是你有相同的两条数据。对吧,那你最终在处理之前能不能把它解决成一条。如果说你在这个数据的计算之前,你能够把数据给它解决成一条了,那你看一下啊,我们再加上你前面,诶,我保证你的数据不丢,最后呢,又保证你这个数据不重复,那么它们两个加起来不就做到了我们的精确一次了吗?
04:02
对不对。好,那现在我们的问题就是你在下游能不能做到这个事情呢?诶我告诉大家我们是可以的。嗯,因为我们最终的数据呢,是要写入到我们的这个ES,对吧?啊,之前也提到过,那么ES呢,其实是支持一个幂等写入的,就说白了你的两条相同的数据,如果你要在ES中,你采用的是幂等写入,那么最后呢,它只会给你保留一条。听懂我的意思了吧,啊,这样我们就可以保证你的精确一次啊,这就是我们目前诶比较靠谱的一种方案。啊,那么这种方案如果你呃提出来以后,接下来我们要想的就是这个落地的实现哈,那我们看一下现在这个落地实现的话呢,我们有哪些这个问题需要解决,首先第一个就是你的后置提交,那怎么后置提交呢?你就一定要保证偏移量是在我写出数据以后提交的,那我问大家你还能让卡夫卡自动去提交这个偏移量吗?不行了吧,如果你是自动提交,你是没法保证它的偏移量提交,每次都在我写出数据之后的。
05:04
对不对,他的自动提交我们昨天也说过啊,在这个位置我还单独给你提了一下,来找到我们这个卡不卡的工具类,在这个位置,如果你是自动提交的情况下,那我们会有一个自动提交的时间间隔。对吧,它是按照你的这个时间间隔来进行提交的,比如说每隔五秒钟我提交一次,每隔五秒钟提交一次,那你就敢保证你的每次五秒钟就一定是在我写出数据之后吗?不一定吧,万一刚刚我把数据诶从你的卡夫卡拿出来了,我还没有写出去呢,那你的五秒钟到了,那我就要去提交,这个时候他就又会跑到前面了。所以说我们就不能再进行什么自动提交了,我们就必须得手动的去控制这个偏移量的提交。理解了吧,啊,所以说呢,这个后置提交偏移量,哎,里面的一个点就是你要手动控制好吧,手动的控制啊,OK行,那如果说我们要手动控制这个提交偏移量的话。怎么去控制呢?
06:01
对吧,这个难点在哪呢?怎么去控制啊。啊,同学们怎么控制?那你就要明白一下卡夫卡对偏移量管理的一个方案。对吧,卡不卡最偏移量管理的方案啊,来这个时我们要去分析的啊,首先我来分析一下,呃,目前我们使用的这个卡不卡呢,是我现在用的是2.4的这个版本啊,但是大家可能讲的时候,你们讲的应该是3.0的对不对。啊,这个虽然说这个3.0中有很多这个新的特性啊,但是呃,其实目前我们并没有用上哈,所以说我们这个还是基于这个2.4的来去讲解也是可以的。能理解吧,好,呃,这个卡夫卡在这个零点零点几0.9吧,应该是我记得。啊,这个应该是一个分水岭啊,0.9之前它的这个偏移量呢,应该是维护到那个。就是主K里面的。对吧,然后之后的话,它就是我们维护到了这个卡不卡的本地,这个大家都是知道的吧,维护到卡布本地,然后呢,呃,通过卡不卡中的一个内置的一个topic,然后呢,进行这个偏量的一个管管理。
07:07
这个topic大家应该都还有印象啊,我可以带着你简单去瞅一眼,来,来到OBT魔掉卡不卡下面,呃,我们就来到这个Z4里面吧,这里面你就会看得到啊,就这个玩意。对吧,啊,当然我们现在看到的是它的这个分区啊,就是这个就是那个topic的名字,它总共有50个区啊,对吧,现在你应该看到的是其中的1/3吧,因为我们是分了这个三个节点嘛,234里面是不是都有啊。对吧,他为什么平均的这个分到这个三个,呃,Block里面。对吧,目前是维护到这个里面的。好,这是你要知道的一个点啊,然后再一个点就是。我们怎么去手动提交。对吧,我们怎么去手动提交,这个是比较难的一个点,现在为啥呢。因为现在我们从卡不卡消费数据呢,其实并不是我们自己去做的,而是什么呀。
08:04
同学们,而是什么呀?而是Spark streaming在去做这个事。对吧。他再去做消费的这个事情,那就说白了,我们手里面其实是没有消费者的。这个都是由Spark stream帮我们做了封装的。如果说我们手里面是有消费者的,那我是可以通过你的消费者对象去调用你的什么commit,然后呢去做什么做这个opposite提交的,但现在问题是我们手里面没有这个消费者,那你就没有办法去做考密的这个事儿,你就控制不了这个事情,你就没法在你代码中去控制它。对不对。这是一个难点。好,那么大家想想哈,既然你会遇到这个问题,那你说人家这个Spark streaming再去封装这个过程的时候,他能不能想到这个问题呢?他一定是能够想到这个问题的,所以说他就会给你提供。手动提交偏移量的。方式对吧,或者什么手动提交偏移量的这个,呃,这个这个代码。
09:03
OK,那我们来看一下它怎么提供的啊在这。听完了在这。就是当我们从你的卡不卡中啊,基于这个Spark streaming消费出来数据以后呢,我们拿到的是这种结构。对吧,这种结构我们昨天都看到了,大家应该还记得吧,你看一下啊,就是我们从你的卡不卡中消费出来数据以后呢,我们拿到的是这种结构。能听懂吧,行,那你拿到这种结构以后,注意我们是可以通过如下的代码进行这个opposite提交的,就是你把你的这个stream,然后呢转成一个。什么呀?就是可以提交opposite的一个类型。然后呢,再去做一个提交。能不理解,这样我们就可以什么手动的去控制了。明白吧,就人家也帮你提供方案了,但是这种方案我们能不能用呢?
10:01
我们现在还用不了。为什么呢?因为他有一个要求啊,什么要求呢,就是如果你想去提交这个opposite,那么你的这个流啊,你必须是这个类型的。能听明白,同学们好,那我们来分析一下啊,我们整个的代码听好了啊,我们整个的代码来还是看这个。看这个流程啊,我还是把它截个图吧,要不这个翻来翻去的啊。把它定个图顶过来啊,我们整个这个代码的处理过程是这样子的,你看了啊,还是回到这儿。我们把数据呢,从你的卡发卡中消费出来以后,就是我刚刚消费出来以后,我敢保证我肯定是这个类型的,这个没毛病吧,同学们好,但是我们什么时候提交offet呢?我们是在你的数据都写入到卡不卡中以后,我再去做提交的,好那么你在这个环节中,你还敢保证你的流是这个类型吗?是这个格式吗?
11:00
对吧,你不敢保证了,为什么呢?因为我们的代码中,我拿到你的数据以后,我首先做的就是你的结构的转换,然后接下来做了分流操作,那就说白了,在你提交这个之前,我们是对你的流呢做了结构转换的。那么。他这么说的。只有你的这种结构的流才能够去做这个操作,如果说你做了结构转换了,对吧,你对它是进行了转换以后,它就没有办法再进行这种操作了,而我们目前的场景就是我确确实实对你的流呢做了结果转换的,那我把数据写到卡夫卡之后呢,我现在手里面那个流的格式呢,已经不能再去做这个事情了。听明白我的意思了吧,所以说虽然说人家这个Spark streaming呢,帮助我们提供了这种方案了,但是不好意思。你用不了。它也有限制,如果你想用,你就不能对你的流做结构的转换。但是呢,你不做流结结构的转换,你没法做我们后续的操作呀。
12:02
对吧,那我不能为了这个手动控制这个偏移量,我不做这些操作了吧。对吧,那你干脆整个实时处理,你就别做了。是吧,这个行不通。OK吧,所以说现在啊,摆在我面前的就是手动提交偏移量,这个事儿我们应该怎么去解决,怎么去处理啊,你使用人家这个Spark streaming默认提供的方案已经行不通了,那你又得去想办法解决这个事。这就比较头疼了。OK吧,大家想想有没有什么方案可以解决这个问题?啊,这个事儿,呃,你们之前都没有遇到过啊,来,我们就一起来去说他啊,应该怎么解决,行,我们需要这么解决,就是我们需要自己完成一套偏移量的管理方案,能听得吧,我们需要自己完成一套这个方案来去做。好,我来给大家去分析一下这个过程啊,现在我们的问题是这样子的,就是你从卡夫卡中消费出来数据以后,就是我刚消费出来的这个结构,我是可以去提交偏移量的,那么如果说你能够提交偏移量的话,那我是不是可以认为你在你的这一波结构中,我其实是有偏移量信息的?
13:18
能不能理解?要么你说你提交,你提交的话,你不得有偏移量吗?你没有偏移量你怎么提交啊,那么既然你能够提交,那我就可以认为你的这个结构中,就你当前这个stream中,你是有偏移量的信息的。对吧,只不过呢,诶我后续做了转换以后,你比如说你转成别的结构了,那我再去提交的时候呢,这个时候我这种结构里面是没有偏移量信息的,所以说你提交不了。对吧,那我能不能够做到。我在我最终提交的时候,我手里面有偏移量的信息,那我再去提交不就OK了吗。对吧,所以说现在我们就有这种方案啊,什么方案呢,我们不用你Spark streaming的这种方案,对吧,我不用你的这个模式来去提交了,因为你的限制太大了,那我怎么做呢?我消费出来数据以后呢,我在没有转换结构之前,我就先把这个偏移量信息呢,给它想办法提取出来。
14:13
我转到我的手里面。能明白我的意思吧,我转到我的手里面好,当然你不能说这个转到手里面,你要把它维护起来,对吧,你把它存储到某个位置好,那我存储起来了,接下来呢,你就放心大胆的去做你后续的操作,你该转换转换,该处理处理,那么最终我把数据写入到我的卡不卡中以后,好,接下来我把我之前存下来这个offset,我再去做一个提交,这样不就OK了吗?对吧,我再次去做一个提交,这样不就OK了吗。能明白我的意思吧,就是我先保证我手里面有这个东西,我才能去提交,如果说我手里面都没有这个东西,我就没法去提交。好。那现在这里面还有一个问题,就是你把这个偏移量转到手里面的,你最后提交的时候,你怎么提交呢?你是不是还得有消费者对象才可以啊。
15:06
但关键是我没有啊。对吧,所以你到了这个环节以后呢,你没有办法跟你的卡不卡去做联通啊,说来卡不卡我给你提交ET,他说对不起,我不认识你。对吧,因为你手里面没有消费者对象,你就没法做这个事儿。因此呢,我们就不能把这个偏移量啊,再往卡不卡提交了。我们需要自己去维护了。对吧,就相当于我们自己完成了一套这个偏移量的管理方案,我把偏移量呢,从你的这个流中呢,把它提取出来,提取出来以后呢,比如说我就把它维护到某个组件中啊,假如说我维护到这个red中,对吧,或者说呢,维护到my circle啊,这个就看你怎么选对吧,当然别的也可以啊,行,我维护到这个组件中。对吧,或者说呢,我先维护到转到我的手里面啊,转到我的手里面,然后呢,等我把数据呢,都处理完成以后呢,我再把这个偏移量呢,就是维护到你的这个red中,或者是维护到的ma so中。
16:00
对吧,就相当于我单独去把你的偏移量的做了一个记录啊,就是我自己知道我目前消费到了什么地方了,那我下一次我在进行消费的时候,我就从我的red中呢,把我上一次维护的偏移量拿出来,我告诉卡不卡我要从这个偏移量开始消费。这样不也行吗?对吧。好来,那我们整体把这个流程给大家去说一下啊,我们正常的环节是卡夫卡中,他帮我维护了便宜呢,比如说诶目前是100,好,那我每一次去消费数据的时候呢,卡夫卡会先会去看一下你的偏移量是100 OK,那你就从100的位置消费,比如说我消费了100条数据,那我消费到200。这个卡卡会帮你维护起来,你下一次消费呢,他会看一下你现在在200的位置,OK,那我就从200的位置开始给你消费。对吧,但是现在这种方案的话,我们行不通了。能明白,因为我们需要手动的去控制,那我们行不通了,那我就这样去做,我可不可以考虑把这个100呢,我自己去维护起来,对吧?那我每次到你卡不卡消费的时候,反正他说了你得给我一个便宜量,我才知道从哪个位置帮你去拿数据,那我就把100给他传过去,说来你从100的位置中帮我拿数据,那我拿出来一波数据以后呢,诶,我一看我拿到了200的位置了,OK,那我就把200呢,我也记录下来。
17:17
对吧,那我下一次我再次去做消费的时候呢,我把200给他带过去,说来你从200的位置开始给我消费。反正对于卡夫卡来讲,他不管这个偏移量到底在哪里,他只管你把这个东西告诉我就行了,我就能够帮助你去做消费了。听明白了吧,就是他只要知道偏移量,就是你告诉他偏移量了,那么他就能够从偏移量的位置呢开始帮你去拿数据,而他是不管这个偏移量到底是哪里的,是你给他的,还是他自己拿回来的,这个他不管。理解吧,啊,这个举一个简单的例子,就是比如说你有一把锁对吧。你有一把锁,那么这把锁你要开的话,你得有个钥匙,能明白吧?哎,你得有个钥匙,好,那我问你啊,这把锁他管不管这个钥匙是原装的钥匙,还是你后来配的钥匙他不管,只要你的这把锁能,只要你的这把钥匙能够打开我这把锁,诶,那就OK了。
18:15
对吧,至于是原装的钥匙,还是说你后来配的钥匙,他不管。对吧,那同理放到这个句话,就是他去做消费的时候呢,他其实是不管那个opposite到底是他自己维护的,还是说你给他传过去的。只要这个是合理的,OK,他就能够帮助我去做消费。能听明白我的意思吧,所以说啊,我们在这个实时的处理过程中,我们一般去解决这个。精确一次,我们都是采用后置提交偏移量加最后的密等处理,这个密等处理我们先不聊,我告诉你后面我一定能够实现,现在我们只要先保证后置提交就行了,那么后置提交的关键点就是你要手动提交,那如果你要手动提交的关键点就在于怎么去维护opposite。
19:01
对吧?啊,那我们用的是自己的一套管理方案。明白了吧,就实际生产环境中,我们通常会使用,比如说诶主keep呀,什么RA呀,什么MYSO呀等工具,然后呢,对这个偏移量呢,进行这个保存。就是我手动的去维护他了。好吧,那么这就是我们的,诶最后的一个解决方案,就说白了,目前我们其实还不能做到精确一次,我只能先做到一半,就先保证数据不丢了,能明白吧,先保证数据不丢了,然后呢,最后呢,再去保证数据不重复,前后加起来我做到精确一次,这是我们的方案。好吧,大家把这个自己去琢磨琢磨啊,你把这个思想琢磨通了以后,那么接下来就是我们具体的实现过程。好吧,来,我先停一下。
我来说两句