00:01
行,那我们终于呢,就把这个写好了啊,这个花了很长时间,呃,但是我觉得这个时间是值得的啊,就是给大家把这个解决的方案呢,给你们聊一聊,然后顺带着呢,把一些这个之前我们学过的东西,但是大家可能已经忘了东西再给你啊,相当于什么变相的这个回顾一遍,对吧?好,接下来我们就直接诶回到我们这个主程序中,然后呢,把它这个用进来啊,来回到主程序中,那你这个回到主程序中以后大家看了啊。我们现在。呃。注意啊,我们现在要干嘛呀,是不是我每次再去消费你数据的时候,就是从你的卡不卡消费数据的时候来看我们这个图啊,诶我的图呢。我的图啊。完了诶还可以哈,来看我们这个图啊,就现在我们每次从你的这个卡不卡消费数据的时候,我就不能什么直接去消费了吧,那我是不是得去干嘛呀。啊。要干嘛呀,同学们,那我是不是得从你的RA中先把我的opposite读出来,读出来以后再指定你的opposite去做消费了吧?所以说你看一下我们这个地方还能直接这么去写吗?同学们,不行了,我们要去补代码了,补什么代码了来写到这不代码啊,就是什么指定,就是从RA中啊读取。
01:27
Opposite,然后呢,指定opposite进行这个消费,好,所以说你一上来以后呢,应该这么去写的,就是my opposite yous点叫什么叫read,然后呢,你把你的topic的名字传进去,把你的group的名字传进去,你给它读回来,读回来以后这就是我们的这个opposite。能听懂吧,那我读到以后呢,接下来我就要拿上这个东西呢,到你的卡不卡里面消费,但是现在我的卡不卡这个消费的过程呢,呃,他好像还不支持这个指定奥消费,对不对,所以说我需要回到这个卡不卡有条里面的去补充一个方法来,这个方法很简单,我就把它复制一下。
02:06
注意啊,这个方法是什么,是指定。使用什么指定的opposite OK,那我就需要什么,在这个基础之上,我再给你传过来一个什么呀,Opposite好,那我传的类型是什么呢?传的类型是这个类型啊呃,我的主程序呢。诶打开太多了啊,传的类型什么,就是你的这个类型。拿过来,拿过来以后呢,放到呃,卡不卡呢,又又跑了是吧,来卡卡的这个油条啊行,那我就放到这。然后呢吧,像这样的话,你看一下我就可以,什么在你去做订阅的时候呢,我再把什么把我的opposite给它传进去,这不就相当于什么指定你的opposite去消费了吗?对吧,现在我有这个方法了啊,那么就相当于我有两个方法,一个是不指定opposite的,一个是什么指定opposite的。好,接下来我们就。回到我们这个主程序中来,主程序过来过来以后呢,注意了啊,那我现在读到你的opposite以后呢,我就可以怎么去消费了,那就相当于什么,我要在这个位置把这个offset是不是给他传进去了呀。
03:11
对吧,这不是指定的消费吗。但是大家想想这个代码有没有问题啊?这个代码有问题,你就一定敢保证我能从你的中读到OBS吗?如果说我现在是刚刚启动了这个实时任务,对吧,我之前都没有做过任何的消费,那我问你,你的中有这个数据吗?同学们没有。那没有的情况下就说白了,我现在手里面我根本就没有ET,那我是不是只能是依靠卡夫卡,他自己帮我消费一次,后续我是不是就可以按照我自己维护的来去做了呀。能明白我的意思吧,同学们,所以说我们要对它做判断啊来,如果说你的opposite是不等于空的,并且呢,它里面得有东西哈。它的size是什么?大于零的,就是你里面的的确确是有东西的啊,他让我优化,优化成now empty箱,那就优化一下。
04:08
好,你的的确确是有东西的,OK,这样我就可以什么呀,指定oppositeet进行消费,那else就是你里面是没东西的,那我就什么呀,默认赛进行消费对吧?诶进行这个消费,OK,好,来,那我们就写一下了啊呃,那我就把这个代码呢拿过来。好,注意啊,这个是我指定进行消费。对吧,然后下面是我默认的进行消费,那就不传它。对不对,但你这么写的话有问题,你看下面很明显你这个东西用不着了吧,所以说呢,我们需要什么,需要把它提取出来啊提取出来。提出来提到外面啊,提到外面这个改成VR,因为你要我要去改值啊,等于空好,那你甭管是指定还是不指定,你最后呢,都是给他复值对不对,下面也是一个道理啊,把这个干掉,是不是都给它复制就可以了呀,反正这两种情况,你只有可能是满足一种嘛,对吧,你只有可能满足一种啊,那你不不管不管是指定也好,还是不指定也好,那你总归是要什么做一次消费的,对吧,那接下来我的代码就会什么一次往后去走了。
05:23
能听懂了吧,啊就可以一直往后走,好,那走到走走走一直往下走,然后呢,一直处理处理处理处理处理处理,那我们是不是要等到什么呀,等到你写出去数据以后,我就要去提交你的opposite了吧。对吧,那我问你啊,写出数据以后在什么地方写出的呀。就相当于什么,一直到这以后,我的数据是不是完全写完了呀。对吧,写完以后,那我就要去提交opposite了,OK,那我问你,你提交opposite要写到什么地方。对吧。我把代码给你折一下啊,折一下我来去分析一下这个过程啊。
06:06
好。折一下啊。折起来,你来去做分析啊。行,来看一下吧。注意啊,现在你看我处理的数据过程是这样子的,这个是我分流什么呀,分流我的错误数据。对不对啊,这个可以展开啊,这是分流我的错误数据,这个是下面else,这里面就是分流我的什么页面数据啊,分流我的启动数据。难道还明白?啊,那就相当于我们从你的这个分流开始,就从这个位置开始,这就开始做分流了,对不对,那你不管你怎么分流的,那我是不是到这个位置就分流结束了呀,你看啊,是不是从这个位置开始,一直到这个地方,那就说白了,我的这些代码执行结束以后,执行结束以后,我是不是就表示我的分流结束了呀。
07:04
对不对,是不是就表示我的这个数据是不是写出去了呀,好,那那你说我能够在这个地方去提交我的opposite吗?就在这个位置提交opposite能不能。对吧,这个里面又会考验一个什么点的啊同学们,这里面又会考验一个什么点呀。你想想你的ET提交我是消费一次,然后呢,我要去提取一次oppositeet,那你说我是不是要想的是我整个这一批次的数据,我全部都写到你的卡夫卡中以后,我是不是再最后统一提交一次呀。是这样吗,同学们?对吧,那我问你现在我在这个位置提交合适不合适,就是我刚才跟你说的这个位置提交合适吗。啊,那就想想这个位置是在哪里啊。这个位置在什么地方,是不是在你的这个位置啊,这个位置是哪里呀。
08:02
这个位置是你的for each里面。你RDD的for里面,你RDD的for里面,你就要明白for的执行频率是什么。对吧。这个东西你都要想得到好吧,那的执行频率是什么呀。知道吗?同学们赶紧想想。啊行,呃,这个地方是一个地方。啊,还有一个地方。什么地方呢?就是我可以在这地方去提交。对吧。这个是我在之前的。就在这个地方去提交,这个地方对应的哪里呢?对应的是这个地方。对应的是这个地方对不对,那当然了,还有一个地方,什么地方呢?我RDD的外面。能看到RDD的外面。
09:00
这个地方提交。对不对,来我给你说一下啊,这个是哪里呢?这个是。叫里面对吧,这个是哪里呢?这个是呃,For r DD里面,然后呢,For each外面。对吧,你看了啊,是不是我的for这个RDD的里面不就是这个位置嘛,然后呢,For的外面不就是这个位置吗?对不对啊行,这个是哪里呢?这个是for r DD的外面。RDD的外面。好,我问你来选一下吧。选一下啊,在哪提交,这个是A,这个是B,这个C。选一个答案啊。
10:08
B。这个都选B是吧,同学们啊,我看有这个1123456啊,这个差不多五六位同学选的是B,都选B对吧?好,那谁能告诉我为什么A不行C不行啊。啊,来说一下,为什么你不选A,为什么你不选C呢?都选B了。这个的点在哪呀,就是这个考验的点在哪,这个考验点是不是就是你这个所调的这个方法,或者说我所说的这个位置,它的一个执行的。执行的地方以及执行的频率吧。对不对啊,来这个有同学说的对啊,这个俊鹏说的对,说for a是什么excuse执行,诶对的呢。
11:05
它是在exq端执行的,能理解,那这个就一定不能选了,为啥呢,你来看。我们的这个opposite,我要提交的opposite我是什么呀?这个是在我的driver端我去提取出来的,那如果说你将来说来我在A这个位置提交吧。对吧,你在A这位置提交你在excu上执行,那就说白了,你要把driver端的这个东西呢,拽到你的excu上去执行。而且你的ex是什么呀?是有很多个的,对吧,因为我们有并行度嘛,那就说白了,你提交一次,我也提交一次,我也提交一次,你就什么相当于提交了很多次,你需要提交这么多次嘛,而且你还会涉及到一个什么数据的一个传输。不需要吧?我们是不是只要是你的一个批次,你消费到一个批次的数据,我提交一次,你消费到一个批次数据我提交一次不就完事了吗?
12:00
对吧,所以说啊,你要明白它里面是A是什么,CU端执行。能不能听明白它是Q段执行,然后呢,还有一个什么问题啊。它是每条数据执行一次,能听得吧,每条数据要执行一次的。你要提交多少次呀,你提交无数次。这不行啊,C为什么不行呢?注意啊,C是你的for r DD的外面,来我们看这个位置啊。是在这个位置,你在for RD外面就说白了,我的整个主程序里面,我要去写一个提交的位置,对不对。那你说我的主程序,我是不是只有在我的程序启动的时候执行一次啊同学们。那你后面还执行吗?你不执行了,那你说你提交offset,你就就提交一次就完事了吗。那我每次消费,消费过来的数据你就不提交了,不行吧,对吧,说这个分析啊来不行,这个什么是他是在driver站执行对吧,但是呢,它是什么呀,每次启动程序执行一次。
13:07
理解吧,说这个也不行。好,那B为什么就可以呢?因为注意啊,B是写到了这个位置。它是写到你的for r DD里面就这个位置,然后呢,写到了for的外面,我们都知道算子内的代码是在你的exq站执行的。对吧,但是呢,好在我们这个forit r DD啊,还有就是我们上面用的那个什么transform啊,这两个很特殊,你能听懂吧,这两个很特殊。这两个都可以做到在driver端执行,而且是周期性执行,就是写到你的这个位置,就写到它的里面,写到算子的外面。明白了吧,它是可以什么周期性执行的,而且它执行的位置是啥呀,注意啊,这个是driver端执行,而且是什么呀,周期性就是一个批次一批次数据一批次执行一次。
14:01
对吧,就是什么周期性,那理解,那你看,那我们希望的效果不就是你来一个批次的数据,你处理完以后,我给你提交一次,你来一个批次数据处理完以后,我给你提交一次,是不是我们就希望得到这个周期性的这个效果呀。对吧,所以说B是OK的,那我们就要把写到这注意了啊,看好这个位置啊,就是在它里面。就在这个位置啊,这是大括号,来,大括号往下走,就在大括号里面,就在这个位置去提交my outside you.save save的时候把你的topic的名字传过来,把你的group的名字传过来,再把你的opposite的。传过来,这是你提取出来的东西,你要把它传过去。对吧,这就把它提交了。好,那把这个写完以后,我们做一个整体的测试啊,那为了好看到这个效果,我们可以这么来去做哈,怎么做呢?就是我在这里面要打印点东西哈,你看啊,我在读取的时候呢,那我就呃,这是save是吧,来我读取的时候,那我这么写吧,就是等我把它读取到了以后,呃。
15:08
读取到了以后,这里面是那个叫做。我想想啊。这个是分区,然后呢,这个是那个啥哈,那我这么来去写啊,我就打一下就是print LN啊读取到opposite,然后呢,我就加上一个叫做opposite对吧,这是我读取到的,然后呢,我在提交的时候注意啊,我提交的时候,我是不是把它处理成了一个,是不是也是一个哈,Map了呀,对吧,那我就什么在提交之前吧。或者什么提交之后都可以啊,来我把它这个处理好以后,我在这个地方吧,对吧,那我打印一下啊,这个是提交,哎,提交opposite,然后加上这个。也是upset对吧,就是我在读取跟提交之前,我都打印一下,我们就可以看到它这个效果了啊同学们来,接下来呢,我们就。
16:03
干嘛呀,是不是要启动了呀,把这启动起来啊,程序启动起来启动。诶。啊,这个还没启动起来哈,我是看到一个负一啊。好,第一次你肯定读不到。对吧,第一次肯定读不到啊,然后接下来你看啊,我要提交,提交的时候你看我是不是都读到了呀,对吧,而且你看他是不是一直在打印这个提交啊。虽然说这一次我没有数据啊,但是呢,因为我们之前注意啊,这就是你的第一种情况,你要想明白了啊,我第一次读不到,读不到的话,那你说我现在到了卡不卡中,我是不是算一个什么呀。就是我没有给你指定这个奥,但是呢,我们之前已经测试过N多次了,我们一直都是用的同一个消费者主,那其实在你的卡夫卡中,它是不是也自己维护了一下了呀,那就会拿你这个之前维护的来让你去用,那后续的话就是我们自己来去控制了啊,你看一下现在都是什么181618161816对不对?来我们清理一下,清理一下以后呢,接下来我们去跑数据了啊,同学们跑数据。
17:13
跑数据啊,怎么跑呢?Lg.SH我们跑一下2022杠零。三杠二幺。好,跑一下数据。看这个效果就可以了啊,诶你看变了吧。诶,这个怎么都一样啊,呃,它这个可能会都一样啊,但是也可能不一样,你看不一样的吧,1965196419651964是不是不一样的呀,对吧?20742073 20732073,就是他们之间的应该是差别不是很大的啊,但是呢,如果说都一样的话,我们就可能会怀疑是不是我写错了是吧,但是也有可能会都一样啊,但是现在我看到他这个,呃,诶又一样了是吧?212821282181中间是有不一样的对不对。
18:01
对吧,中间是有不一样的啊,那说明这个应该是没啥问题的,就是它正常情况下就是呃,几个分区之间的差距应该不是很大。明白吧,啊,只要你保证就是我们写的代码没啥问题就行啊,来我们再测一次吧,你看啊,这一次注意了啊,看一下。再打一次啊,我把这个截图截出来。放到这,然后呢,我把它停掉,停掉了以后,接下来我重新启啊,你看我重启的时候呢,它就能够读到了,因为我的ready已经维护好了啊。是不是看能看能不能读到这个东西啊。读到了吧。对吧,2182218121812181是不是都读到了呀。对不对啊,这个都读到了啊,读到以后来我们再跑一次数据啊,我跑一个3月20号的吧。好,那我再跑数据的话,你看一下它就又会发生变化了啊。OK,变了吧?
19:02
变了吧,对不对啊,都会发生变化的啊,变了吧。是不是啊?这。还没完啊,数据还没生成完吗。好,这个已经不动了,那不动说明什么,说明我们这一批次的数据,就是我们刚刚生成的那一波数据,他已经什么全部都帮你啊处理完成了。理解吧,啊,这就是我们这个最后的一个测试啊,行,那说明呃,我们刚刚写的这一套方案啊,应该是没有任何问题的,行,那么到此我们就解决了这个就是呃,自己管理偏移量的诶这个过程了。好吧,啊,但是呢,还没结束哈,还没结束,因为我们现在只能保证你的数据不丢了,就是我一定是在你把数据全部都写走以后。最后再去提交了什么,提交了这个偏移量,就是我暂时呢,能够保证你的数据不丢了啊,就按照我们的分析来讲啊,我的数据可以不丢了,但是呢,它会有重复,那这个重复的事情的话呢,我们就只能是等到后面再去解决了。
20:02
OK吧,行,我先停一下啊。
我来说两句