00:00
三节课我们把这个消费者组对应的初始化已经完毕了,其实这个消费者组初始化,哎,主要的目标呢,就是加入到对应的这个消费者组里面啊,然后呢,找到对应的C来进行后续的一个消费者组相关的操作啊就可以了。那下面这个做完之后,接下来他要做的事情就是。通过这个客户端对象给这个服务端发送请求,并通过回调函数把对应的结果数据带回来,带回来之后呢,这批数据放到一个消息队列里面,那以什么形式放呢?是以分区的形式,一个一个分区啊,进行一个放。放这里面之后,然后呢,调用这个fech的records这个方法,哎,对这里面的数据进行处理,那处理的每批次最大的条数上限呢,是500条啊,那最小呢啊,你可以有一条,我可以处理一条啊。那下面我们来看一看呗,啊,是不是这么操作的,那接下来就是调用这个啊pro啊,Feature就是抓取数据,那你看它返回的值啊,返回的值就是一个集合,那么这个集合呢,是以这个topic parts为key,那之后呢,这里面是一组一组的数据记录啊,也就说你相当于哎,抓回来的数据是按照分区进行存放的哈。
01:10
那继续往里面进。进来之后呢,首先啊,它判断这个coalin是否等于这个空啊,如果等于空的话,那直接是这个啊,如果不等于空,那就是这个啊,其实这是消费者主和消费者相关的一个操作啊,对我们来说不重要,往下看看这块说fe.fe record,那这个是什么意思?他上来就先调过来一次这个fight record,比如说从这个队列里面来拉取数据,那他第一次能不能拉到是拉不到的啊,这块要注意。第一次。拉取不到。哎,那如果说啊,能够拉取到,如果说第二次或者第三次,比如说这里面已经缓存了一部分数据的话,那我说这块就能够拉取到一部分数据了啊,就可以进行处理了啊,那先往下看,那假如说第一次没抓到,没抓到之后呢,就是F点三的F。
02:01
那这个就是这个了。二人们,哎,调这个赞,派去跟服务端请求,准备拉取数据往这里面放。那往这里放,那看他是不是这么干的啊,先记录一下。开始拉取。数据啊,这个比较核心进去。进去之后往下看。看这地方。这里面有一个这个叫fetch record啊,其实你看你抓取的话,不就发送网络请求吗?那record里面它传了哪些参数呢?啊,这几个值还记不记得。熟不熟悉?这个叫max wait等待时间,也就是说每批次抓取的最大等待时间是多少了?是不是默认是500毫秒啊?这些值,要记住哈。那下一个呢,是这个mini BAS,那这个mini BAS呢,其实就最少一次抓取多少个字节最少。一次。抓取一个质检。
03:04
那下面还有一个,这个叫markx mark best。再呢,是最多一次抓取多少数据?多少呢?哎,默认的是50兆啊,50兆行,那这几个是抓取的一个参数配置,你看我这块已经写了,哎,这里面对吧,哎,Mini以及超时时间还有最大啊抓取。那行,那往下走呗,他有了这个请求之后,他把这个请求发给谁的呢?放到了这个client send是不是里面去啊是吧?哎,通过这个客户端后端对象也就consumer network client来调用它进行一个发送。那发送之后是不返回来一个feature,那这个feature呢,它干了什么事呢?这不是返回的值吗?反回的值它添加了一个监听,咱们说这个发送之后是通过这个回调函数来获取值的,那这个监听器里面就有对应的这个on success和对应的这个失败操作。
04:05
下面这失败,那我们来看一下这个成功的。那这块呢,就是成功获取到数据。获取到这个response之后,那我们怎么对它处理呢?看往下走啊,往下走走到哪呢?走到这个地方,它是循环便利你这个response,也就说返回的这个数据来进行处理,那返回的处理首先获取的K,它的K什么呢?I对应的这个分区,之后呢,传进去对应的分区,取出来对应的这个Y流值,那么如果获取的值它等于空,相当于没获取,那这个呢,我们不关心对吧?哎,那就说异常操作啊,抛异常,那么如果获取的数据有数据。那有数据它又怎么操作的呢?哎,有数据操作,比如说这里面是获取对应的像啊对吧,也就说抓取数据的这个呃位置啊,那之后还有什么呢?看看啊,这里面下面有一个response。
05:00
这个地方看tch response是谁呀?看到没?哎,其实fetch response就是你获取的这个结果值是吧,结果值,那你fetch这个response。它里面有一个啊,Record of file啊,然后BY啊,这里面获取这应的数据,那其实返回的是这个BY斯,那这个BY斯它放到哪里了呢?包括这个be斯和这个ch of outside都给它放到了,这个叫complete fe啊,是这里面。那这还是谁吗?Complete其实就是这个消息队列,那这个队列长什么样,大家看一下。点开这个队列里面是吧,就是长这样,这里面放着一个一个的这个啊,对列吗?一个一个的complete的fe啊,就是它啊,你再点开啊,这是一个对应的类啊。加,那这个数据呢,就已经放到这里面去了,那放到这里面去了,接接下来是不是有这个fech的record闪亮登场啊,那它怎么登场的呢?你这地往回退啊,往回退回退。
06:00
回退。再退好。那好,我们回到这个位子啊,那这个地方呢,就是开始拉取数据,对吧,那拉取数据完事之后,它是不是把数据放到了这个complete的feature这个队列里面啊,那放到这里面之后,你看这个程序继续往下轴啊,继续外轴找到哪呢?找到这叫fe车,这个fech的record也是再次从这个里面也说再次调用这个fast record从这里面拉取数据,那这次不能拉到,这次是不是就能拉到了,哎,进去了。那进去的话他怎么处理的呢?那进去的话之后呢,他首先准备了两个这个集合啊。或者是队列吧,进来之后还记得这个值吗?叫max。叫max Co,其实这一值呢,就是条数呢,最大值呢,就是500条啊,比如说每次。处理的最大条数。是500条。
07:00
好,那这个值复制给它之后,那往下看,那这个值啊,是放到了这个while循环里面啊,While这个里面大于零,它只要大于零,那我就在这里面一直循环乘数,那它什么时候做减法呢?它这块。这个remaining啊,减去request也算你处理多少条,那我这块呢就减掉多少条,那知道是不是达到这个500条上限,那不就退出了吗?那有没有其他条件退出呢?当然有了啊,往下看。哎,往下看什么呢?看这块啊,说你这个是fe啊,这个complete factor片。其实就是他们从这里面来取数。那你从这里面取数。获取数据,那你获取数据,如果这里面已经被你取空了,等于空,是不是就可以结束循环啊。如果没有数据了。可以退出。循环啊,也就是说没有必要非得等到你这个是必须得是500条的时候,我再进行一个退出啊,所以说这里面要注意一下,那在里面开始处理呗,Record对这个record进行各种处理。
08:03
然后下面这个地方,哎,这地方哎这个地方其实就是这个哎拉取对应的数据啊,一波一波的数据的一个开始处理哈,这波。开始处理数据,那行,那这块呢,就是在这位置,我们从这里面一波一波的啊来处理,那最大的上限呢,就是500条,那处理完之后是不是就该往下走了,走对的拦截器了,那我们来看啊,往回推看一下拦截器。退回来,退回来。在这儿。你这不是fetch吗?然后返回。那你通过这面返回的值是什么呢?按照分区哎为K,然后对应的这个啊,后面是相应的数据,哎,获取完之后放到这里面,那放到这里面之后,那接下来它要进行什么操作呢?它放到了一个intercepts.consumer介就是进行后续的进行操作了啊这块就是拦截器进行操作。
09:03
那我在设INTERCEPT4,那你进去呗,那你进去之后啊,你发现这里面其实它是intercept,每一个intercept都会对它进行一个循环遍历这里面。每个。拦截器都会对。数据进行加工操作对吧?啊,也就组成了拦截器链,当然你要没有拦截器的话,那不用也是没关系的啊。啊,那这样就退出了啊,这就是整个数据的一个啊操作过程啊,稍微回忆一下,所以这里面我们做的事,第一个呢是初始化消费者主,哎主要呢是找这个C,找到之后呢,开始进行抓取数据。从这里面开始抓取数据,那抓取数据啊,其实他抓数据呢,是抓到了这个complete fe这个队列里面以分区的方式进行存放的,那存完之后呢,它会调用这个fat record来对每一个数据呢进行一个处理,但是最处理的这个最大的上限呢,是500条啊,之后呢,往下走调用所有的拦截信量,那之后呢,给最终的一个消费者,哎,这就是整个这个消费者主相关的一个处理流程啊。
我来说两句