温馨提示:文本由机器自动转译,部分词句存在误差,以视频为准
00:00
好,上节课我们是把消费者主的初始化流程已经详细讲完了,那么它初始化完毕之后是不是就开始准备工作了?那么它到底是怎么工作的呢?那下面我们来双击看看。首先呢,左侧呢,这边是卡卡对应的集群,右侧呢是对应的消费者主好,那么消费者主要想进行工作,首先呢,它需要创建一个消费者网络连接客户端,主要用来跟这个卡法集群进行一个交互的。那他前期啊,先做一些准备工作,首先呢,它调用了一个散的flash方法,哎,用来抓取数据的一个初始化,那在这里面他会准备第一个参数,就是每批次抓取的最小的字节数,那默认是多少呢?哎,默认是一个字节,你说这边只要一个字节,哎,我就可以把它抓取过来。好,那当然这个值可以进行一个调整,比如说我这边调整成十个字节之后,那么你这边有一个字节的时候,我还抓不抓呢?哎,那是不是就不抓了?好,那如果说我这边用户,哎,我就发送一个字节,那你就不消费了吗?不是这样的,它这里面还有一个超时时间,比如说虽然说啊,我这里面是十个字节,哎,我这里面实际只过来一个字节,那么如果说没有达到这个值没关系,诶只要这个时间到了500毫秒,仍然能将这一个字节的数据拉取过来啊,是这个含义哈。
01:23
跟我们讲生产者的时候,那个P大小16K和那个L格MS啊,有异曲同工之妙啊,也差不多任何一个条件满足都能把数据拉过来哈,那当然这里面还给你设置了一个每批次抓取的上限,那最大的上限呢,是50兆啊,也就说这边呢,哎,你最大是50兆的数据一批拉过来。好,那接下来呢,准备完毕之后,开始调用这个散的方法,我要发送请求,发送完请求之后,那么它会通过这个回调方法把对应的结果拉取过来,哎,这个回调啊,还记得我们生产者的时候,是不是也有一个待回调的一个发送啊,哎,跟那个道理呢,是一模一样的,那就把这批数据拉过来了,那么拉取过来这批数据会放在哪里呢?哎,会放在一个消息队列里面。
02:11
哎,一批一批的往上放,那你有可能拉过来的是一个字节,有可能拉过来的是50兆啊,这么一个范围的数据啊,一批批拉,那拉到这儿之后,那接下来消费者怎么工作呢。那么它会啊,哎,一批次呢,去拉取500条,也就说从这里面拉取500条,进行正常的一个处理好,那他会怎么处理呢?首先经过的就是反序列化,原因是因为生产端是已经把数据进行一个序列化了,那你前面序列化,那这边就得进行一个反序列化啊。那接下来再往下走,就是经过拦截器,在生产端是不是也有对应的这个拦截器啊,那这边呢也有拦截器,那这个拦截器有什么用呢?哎,大家想卡夫卡整个集群它是不处理数据的,哎,只用来存数据,那么它处理数据的部分是不是在生产端和对应的消费端,那么在生产端和消费端想处理出去,哎,最主要的这个地方就是这个拦截器啊。
03:06
是吧,嗯,而且这个拦截器呢,可以方便的监控我们卡巴卡集群的一个运行情况,比如说进来多少数据。是不是通过这个拦截器可以记录下来,那出去多少数据,比如说消费了多少,我也可以通过这个拦截器进行一个记录,非常非常方便好。那下面呢,之后呢,就是正常的一个处理数据了哈,啊,那在整个这个消费者主的工作流程过程当中,那几个参数要记住啊,第一个参数呢,也就是说最小的一批次抓取数据呢,是一个字节,那好,那如果说这个条件没有满足,比如说设计时没有满足,那怎么办呢?哎,超时时间,比如说500毫秒到了也能够抓取回来一波数据,还有一个呢,哎,就是最大一波抓取的数据呢,是50兆啊,以及呢,我每批次处理的数据量呢,大小呢,是默认是500条啊,这几个值要记住,后面呢,我们再进行生产条的时候有用啊,像那这个呢,就是整个消费者组的一个工作流程。
我来说两句