00:00
好,接下来呢,我们来看一下消费者源码的一个初始化,那么初始化呢,我们还是啊有两个入口啊,一个呢,我们可以通过自己的这个肯膜,那自己的consumer呢,这里面诶创建一个customer consumer类,然后之后在这里面初始化对应的配置信息,然后接下来创建一个consumer对象,之后呢,订阅对应的主题,之后循环来拉取对应的数据,是这样一套流程吧。啊,那看一下,如果说你通过这个系统的源码,那怎么看呢?系统源码这里面有一个一个藏胞斯里面有一个consumer,那consumer点开之后,这里面consumer,哎,它是继承了一个撒当啊right哈,还是一个线程。那么在这里面啊,这个有一个consumer构造,在构造里面它这样做。比如说构造里面啊,诶配置了各种这个配置信息之后呢,他用了一个卡夫卡很凶猛等,说跟我们之前那个卡法生产者是不是一样,也就说在构造里面,诶完成对应的配置信息以及对象的一个创建,创建完之后,哎,它仍然是调用这个线程当中的这个do work这个方法,那在这里面进行执行,那执行了什么呢?执行了这里面有这个呃,Subscribe啊,也就说订阅主题之后呢,是consumer po,比如说拉取数据啊,也是这么核心的三步啊,只播看起来稍微要这个别扭一点点啊。
01:17
那下面我们还是从那个呃,New这个卡法consumer,比如说创建卡法consumer对象啊,往里面去看啊,看他做哪些事。按住CTRL键点击进来,进来之后this,大家来继续点击这个this啊,进来之后呢,这样就来到这个卡夫卡肯修摸啊对应的这里面,那首先映入我们眼帘呢,是一个消费者主平衡的一个问题。这是消费者。猪。挺好啊。负载平衡相关的一个处理,那下面呢,再往下呢,就是消费者主ID的一个获取。怎里面获取消费者主ID对吧?哎,你得知道我们这个消费者啊,是未来要加入到哪一个消费者主招呢?哎,你消费者他也是一个客户端,那是客户端的话,他就得有一个客户端ID。
02:11
哎,有一个唯一标识嘛,比如说你这是集群,那这里面N多个消费者,那你到底是哪一个消费者呢?那是不是都有一个身份标识啊,这是唯一ID啊,那再往下看下面呢,这块呢,仍然是log相关的一个处理,我们不关心,那再往下走,走到哪呢?这里有一个request time Ms啊,什么意思呢?哎,你这里面有一个客户端。这边是肯希莫客户端,这边是卡法集群,你要跟他进行访问的时候,是不是进行一个请求啊,那请求对方如果一直不应答,你怎么办?那这里面是有一个请求等待时间的啊,那这里面这也就是请求等待时间。找客户端,请求服务端。等待时间,那等待时间是多少呢?那这里面进来进来进来之后就是它。
03:04
那他查官网呗,嗯,查官网的时候啊,这里面都是消费者相关参数,所以说你点击一下这个consumer con之后在这里面查,哎这样就好一点了,那你看是多少呢?30秒啊,也就说等待最多等待30秒。回来啊,在这。默认是。30秒,那再往下看,下面有一个谁呢?有一个try back of Ms,其实这个呢,就是重试时间。那看一下这个值是多少进来。冲刺时间呢,是100毫秒哈,百毫秒拿来。对吧,比如说你这个消费者跟这个卡卡机群进行通讯的时候,那我第一次没连上,没连上之后是不是得间隔一段时间啊,100毫秒之后我再进行一个重连呢?啊是这个参数的意思啊。
04:08
那接下来往下看,那这块什么?哎,是拦截器链啊啊,也就是说生产端有对应的拦截器,那么消费端也有对应的拦截器啊。拦截器哎,相关处理啊,它不是一个啊,那仍然如果这块呢,你想自定义拦截器的话,那仍然是这个intercept啊,Class就是这个参数啊,往它里面配就行了。那回来回来之后呢,这都是这个拦截器相关处理,那再往下呢。哎,下面呢,就是K和Y6的反序列化。反序的话,那么这块我可以把这个图啊给大家截出来哈,盯一下。哎,你对着照一下,呃,前的刚才呢,我们说到了这个一个拦截器是这块吧,哎,这是拦截器还涉及到涉及到了这个反序列化器,那是不是这个key和value溜的反序列化呀,都给你找到啊。
05:07
往下看。往下看之后,这有一个这个叫offset,这个reset这个策略,那这个offset reet这个策略,咱们之前也讲过啊,还有参数了,把它拿过来。还记得一个是从最开始消费,一个是按照增量进行消费。那这里面有的值就是latest啊,List more呢,就是latest啊,说白了就是从这个opposite这个哎,最后一个位置开始进行一个消费,那如果是这个瑞呢,就从这个最开始,就说杠杠from beginning啊,从这开始消费啊,就这个意思啊,是这个值。到这儿。这个参数的含义呢,就是off。从什么位置开始消费?
06:02
好,那我们从哪呢,把这个参数拿过来,从这个latest。真的。你是默认吧?那下面继续往下走,下面这块呢,是有一个对应的叫诶又一个consumer,其实就是原数据相关信息的处理啊。原数据。嗯,拿这个吧,拿这个。你这边呢,是这个消费者,这边是服务器端,那么我这个消费者要跟谁进行一个消费,我是不是得拿到这边相关的主题信息以及分局信息之后,才能进行相关的一个消费啊,这在这里面呢,要有一个消费者原数据,那原数据这里面涉及到几个参数,比如说第一个啊,Back of这个MS,其实这呢就是哎,重试时间。那再往下呢,有这个这个。
07:00
这个参数啊,不知道大家还记不记得啊,叫include啊,Internal topic,它这个参数呢,其实就是默认的是不允许你访问这个系统主题相关数据的,就是不允许你消费。那大家还记不记得这个,哎,我们之前获取系统主题奥赛的时候啊,如果你默认的是获取不到的,那你只有把这个参数诶给它修改一下,那就能够获取了,那这个参数是谁呢?拿到它。让他知道吧,等于说默认的是不允许你去访问系统主题的,那你可以把它呢修改一下啊。这叫四否。允许。访问系统主题。这是默认之处。不允许表示的是不允许啊,因为它前面加上一个这个啊,加上一个徐凡。
08:02
行,那再往下呢,有一个这个allow all to case啊叫topic,接下来就是是否允许自动的帮你去创建topic主题,允许吗?哎,默认是允许的啊。是否允许双击它啊,那这个的值你看一下。还记得咱们在调整手册当中说把它关闭掉啊,一般呢,我们生产环境当中啊,啊,不建议把它打开,那其实它默认的是这个处啊。好,那继续往下走,下边呢,这里面,哎,这呢就是这个boot server con啊这呢就是连接卡玛集群。
09:02
对吧,哎,连接上,那再往下看一下还有什么啊。这里面抓取数据啊,这个没什么好往下走,这里有一个,这里有一个这个network client看一下。现在啊,你这个消费者要跟这个服务端进行通讯,就必须得有一个client,但是现在你看说海哥不是啊,现在是net client,哎,一会儿呢,会把这个ne client封装完之后,再给这个consumer night work client啊是这样一个过程啊,我们先创建它。创建客户端对象。那么扣三对象里面涉及到一些参数了啊,来看一下。在哪呢?往下看,这里面有第一个叫recconnect back of Ms啊,其实呢,就是这个重新进行连接啊的一个重试时间。来看一下吧,那这个重时间是多少?
10:01
重试时间呢,是50毫秒。呃,连接。重试。时间。50毫秒。这是默认啊。那行,那50毫秒,这是第一次重试,那么它还有一个呢,叫最大的一个重试时间,那这值是多少呢?一会大家解释什么含义啊,最大的是一秒钟。叫最大连接,重试时间默认的是一秒钟。那这两个什么含义哈,你想啊,现在呢,假如说哈,我这边是客户端跟服务端进行连接,那连接第一次连接没连接上,没连接上之后是不是得进行一个重试啊哎,那重试比如说50秒之后我再进行重试,那么你这一这一直在重试,一直在重试,我一直要连不上怎么办呢?那是不是得有一个总的超时时间呢?那这个总的超时时间呢,是一秒钟啊是这个含义啊。
11:09
呃,那下面这有一个散buffer con啊,仍然是就是这个意思啊,你这个消费者往这个集群啊,服务端发送数据的时候,那是不是有一个发送缓存,那反过来这边回调啊,回来的数据的话,我还有一个接收缓存,还记得我们生产者往集群的时候,是不是有一个发送缓存和一个接收缓存啊,道理都是一样的啊。来看一下这个发送缓存大小多大。发送缓存。发送缓存是他啊点开。点开。点开之后,诶,这个发送缓存的是128KB啊。
12:00
好,那下面呢,还有一个是接收缓存,那接收缓存是多大呢。进来。那这块注意啊,一定要先点一下这个consumer con,比如说消费者相关的参数,然后再来查询,诶那这个呢,它是64KB,那还记得我们之前,呃,这个生产者往卡瓦集群的时候,发送缓存是不是也是128,但是这个接收缓存是不是32兆啊,是32K啊是吧?啊那这个呢,要注意啊,这个呢是呃,消费者端呢,是64KB啊。默认。64KB好,那在一块,那下面呢,这个是socket,这个呃,建立连接通讯啊,这个呢,我们就不关心了哈,那还有一个呢,是request time Ms,其实就是请求时间上面已经讲过了,在哪呢。
13:01
这个值。嗯,就是他啊。拿过来,那这个对应的就是啊,这个值哈,那下面这个底层稍微通信我们就不管了,那继续往下走,那有了这个这个底层的这个net client之后,你看我又把它封装到这个consumer network client啊,放在这里面了,你看是这个放里面,放里面去形成对应的一个新的client。这是消费者客户端。那就是我们这张图当中画的他了是吧?哎,最终由他跟我们这个集群进行相关的一个通讯啊。好。那这里面应该有这个request,这个time Ms con,那这个参数呢,就对应的是这个啊。比如说一次请求的时间默认的是30秒。那再往下,下面是A3啊,A3ER什么意思呢?是分区策略的一个配置分区策略。
14:00
叫消费者。分区。分配策略。那这个分区分为策略,大家还记不记得有哪几种啊?是不是有这个RS啊,Red Robin粘性,还有协助的粘性啊。来查一下。是吧,回忆一下是不是有这个ranger啊,然后run Robin啊,粘性,还有这个协作者粘性啊,对应的这几种分区评率策略啊,就是在这里面进行一个配置的。那这个配置完成之后呢,下面呢,还有一个coordinator,这个coordinator。对吧,哎,是我为我们消费者主准备的啊。准备的,其实这里面是这样一个过程。比如说你这个,那应该是另一张图了啊。这张头。这张图当中。这位子这里面是不是COD,其实这个消费者主里面的每一个消费者,他也有一个cator,未来是不是得跟他进行通讯呢?啊,也类似一个客户端啊,跟他进行一个通讯的这个C啊。
15:08
那这里面要创建一个coordinator啊对象,那这里面进行相关的一个配置,那配置里面有一个叫auto to commit interval Ms,那这个参数有什么意思呢?点开进来。还记得这是什么,这里面是不是有一个自动up的一个提交参数啊,哎,是不是五秒钟自动帮你去提交一次upet啊,这个参数要记一下。参数。自动。提交up。时间。默认五秒钟。好,那再往下看,那下面呢,有一个feature啊,Feature呢就是帮我们来抓取数据的。你看啊,现在呢,这里面这里面是不是有一个散FA啊,哎,其实你在往那个发送数据的时候,那是五要从这里面回调对应的数据啊,那你抓取数据的时候,那你怎么抓呀,一次抓多少啊,抓多大呀啊最多抓多少啊,哎,是不是对应的这个参数值,这个就是。
16:12
他来配置,它配置抓取。抓数据的参数,那来吧,第一个抓fe mini batch,比如说最少一次抓取多少个字节来吧。来到。这个啊,至少一次数抓取一个字节啊,这个参数要知道啊回来。默认。最少。一次。抓取。一个世界。那下面有最少是不是就有最大呀,那来看一下最大这个值。
17:00
最大看能抓多少,最大的话一次呢是抓取50兆啊,啊50兆,那这个地方呢,你要按一下这个consumer,你你再来一下啊这样去找。对吧,50兆。默认最多。一次。抓去。50兆。哎,数据。那再往下看,下面呢,还有这个,呃,抓取等待的一个时间,最大的等待时间看是多少啊。抓取一次啊,如果说没有达到条数,它要等待多久呢?哎,这里面呢,是500毫秒,500毫秒。500毫秒。啊,那再往下面是这个max分区啊抓取,就说每个分区最大抓取多少个之间啊。
18:10
那看看它是多少。那你抓的时候,你看我这里面这是topic a,它有分区零,有分区一,有分区二,那我是不是一个分区一个分区抓呀,那每一个分区我最大能抓抓取多少呢?哎,最大能抓取的是一兆啊,他受谁影响呢?受你单条日志的最大的一个上限,单条日志最大上限不是默默认不是一兆吗?那就用这个啊。行,那这个呢,就是默认是一兆。那下面还有一个这个是这个。那这样的master per record是谁啊?是这个值哈,是他master per record,比如说你这个数据放到这个缓存里面,那接下来我一次处理的条数,那默认一次处理多少呢?哎,一次呢,是处理500条啊500条这个值。
19:15
默认一次。处理。500条也是最多是500条啊。行,那这几个参数啊,大家大概了解一下就可以了啊,那这个呢,就是整个消费者的一个初始化流程啊。
我来说两句