00:00
好,上节课我们是把卡法生产者对应的初始化操作已经做完了,那这里面各种组件的一个初始化创建都已经完毕,那接下来我们要做什么事情呢?接下来我们要做的事情就是把外部数据通过这个散的方法发送到对应的这个缓存队列里面,哎,先到这儿之后呢再来研究,呃,这个三往集群里面发送,好,那么先把数据发送到这儿,那发送到这儿的话呢,那我们来看一下对应的源码啊。那首先呢,这是系统的源码对吧?上列克的模式这个地方还能记得吗?这里面是producer,嗯,Producer里面有一个创建了卡法producer,那创建完毕之后,接下来它就开始工作了,那他干什么活呢?哎,其实呢,它是调用这个S的方法进行发送啊,因为这个run方法呀,它是一个线程嘛。对吧,这是个线程啊,所以说他要执行,那看这个呢,其实不利于你这个啊进行一个理解啊,最好的方式,你看咱们写代码,咱们之前写代码呢,就是这里面用了一个卡va producer之后调用的就是这个卡va producer send发送是吧?哎直接看这这个方便呢,比如说你点这点开之后,诶发送调用send。
01:07
那好,那这个地方我给你定出来。是到这儿吧,那我们再来看一下系统源码。有样板呢,你点击这个散你看。是不是跟这是一模一样的啊,一模一样的,只不过呢,我们自己写的这个代码呢,虽然说看起来比较容易,但是呢,不容易进行一个注释,那好,那我们还是用这个系统的源码啊,方便大家去注释啊,大家调用这个散的方法之后走到这儿,走到这儿之后大家看一下。这里面有一个intercept,然后。啊,也就是说这个intercepts都可以调用这个你看到哪了数据。那数据呢,其实调这个散之后,诶走的是这啊in特三克斯,比如说拦截器链,那好往下点进去。那进来之后你会发现啊,如果你有多个这个拦截器的话,那每一个拦截器都要调一遍对应的这个的方法。啊也说都是截道的是吧,哎,谁来的给你截下来哎,添加点东西对吧,补补还有人参丸那行,那数据呢,就到这儿,那这样看到啊,就是这块呢是对。
02:10
拦截器。对数据进行加工。那如果你没有拦截器的话,那这块他就不走了,是这样吗?哎,好,那我退回来。好,那这块呢,都是拦截器相关操作。行,那走过这个拦截器之后,那数据继续往下传,调用这个do散的方法走进来,那do散的方法进来之后往下看啊看一下这块首先叫wait on meet。那这里面它首先要获取原数据,那这块大家理解一下,他为什么要获取原数据呢?就这张图里面没写啊,因为原数据相关的我都没大写啊,因为在我们开发的时候,其实用的不是特别多啊,其实呢,你想这个分区啊,未来说要往这个队列里面发。往队列里面放的时候,都是按照这个主题的分区进行放数,那他怎么知道主题分区是几个呢?
03:06
是不是,哎,那其实这个呢,是从这儿诶获取到这个对应的分区信息之后,然后再往这里面进行发送的啊,所以说这里面有一个获取原数据的过程。这块。啊,获取原数据。那获取原数据这里面你看啊,他开始记录当前的一个时间,然后这边开始获取等待,那不是说你发送请求直接就能把原数据获取过来的是吧?啊也有可能需要等,那你看,那就是当前时间加上这个等待时间,然后继续判看看什么呢?看这里面有没有超过说我还剩多少时间。比如说我设置一个请求时间,是不是得有一个超时时间呢?你不能无限的等,比如说有十秒,十秒钟还没来,那是不是我就获取不到原数据了,哎,那我就得返回了啊,那既然那这里面呢,这是原数据相关的啊处理的啊,那这里呢,我们不用啊做过多的一个研究啊,那再往下走,下面是我们重点,比如说序列化对吧,那这块呢,是序列化情况操作。
04:09
啊,序列化循环操作,那这里面有一个K的序列化,那还有一个对应的是Y6的序列化,那你看数据找到哪了,哎,也就说经过拦截器之后,下面就对这个数据进行一个K和Y6序列化的一个操作啊,那再往下是不是就是分区器了,你看是不是这样呢。到这儿。那这里面就是有分区相关的一个操作了,来吧。那叫分区。好,那么分区,然后这里面所有的一些细节啊,我先不给大家啊,去详细看一会儿大家看,我先给总体流程先给大家演示完啊呃,分区操作完毕之后继续往下走,走哪呢?走到这个位置吧。这里面有一个叫ins啊,Value的啊,Record set,比如说保证数据的大小啊,正常。保证数据大小,能够传输。
05:05
啊,其实这里面你可以简单看一下,它为什么要保证数据大小能够传输呢?还记得这个max request size吗?这参数大家还记得吗?它的值啊是对应的叫默认是一兆,是我单条信息的最大值啊,叫单条。最大值默认的是一兆,那么你传过来这个消息,如果大于了这一兆怎么办?就直接就给你抛异常了,哎,数据根本就不会往下走啊,直接抛异常,那还有一个。如果你传过来的消息大于谁叫to memory size,那这值还有多少?还记得吗?缓冲器大小啊,叫缓存大小。就默认是32兆啊,32兆,比如说你这条数据啊,不能大于32兆,大于32兆直接也抛异常了啊,这是保证发送过来的数据能够正常的这个啊处理啊。
06:01
啊,那这一块是这儿,那再往下走。那这里面往上稍微看一下啊,稍微再看一下,说这个数据是这序列化来的,那这个数据一定是经过序列化,还经过什么吗?看到了吗?哎,这个呢是还经过压缩比,就说不管你是序列化还是压缩,那是序列化和压缩之后的数据大小不能超过,比如说单条日志一兆,或者呢是总的缓冲机大小三12兆啊这两个呢,都不能超过啊。那这里面是叫。序列化后。和这个压缩后的。哎,这个大小。那我们再往下看,那下面有一个叫intercept啊,Call back其实就是拦截器的一个回调方法对吧,比如说你拦截器呃,处理完之后啊的一个回调啊。那再往下下边往上走走。这里面有一个accumulator,还记得accumul是什么了吗?哎,这个accumulator它是那个缓存区大小啊,那它默认是不是32兆。
07:00
那么这个accumul。缓存。那么accumulator.apaa是不是追加的意思,那就说像这个缓存队列里面追加数据,那所谓的追加数据,那不就是往这里面发送数据嘛,对吧?安道你看一下啊,一会儿呢,研究一下他怎么往这里面追加数据的。对面是飞家。数据的。那再往下,如果说追加完毕数据之后,看好往下看,往里面添,添加到这儿。这不是追加完毕之后返回了一个结果吗?那返回的结果说这个batch size啊,这个BA is for,比如说批次大小已经满了。完了或者什么呢,或者说有了一个新的批次创建。
08:00
P创建,那要干什么事?哎,他要调用这个this standard with。啊,Wait这个wait up呢,就是唤醒对应的这个发送线程。哎,唤醒这个发送线程,那还记得这个发送线程在我们讲初始化的时候,是不是它已经正常启动了,但是启动的时候它有没有说要达到这个发送条件呢?啊,没有啊,直到什么条件它才可以进行发送呢?那这里面就是其中一个参数就是by size,这个值到了,还有一个值就是这个link格MS,那link格MS在后面我们会看到啊好,那总之看到一个,哎,说这个条件满足之后,他会把这个三根线程进行换起,说诶哥们,我这边已经满足条件了,你可以从我这里面拉取数据发送给好吧,继续。那在这个总体流程当中,我们见到了对应的拦截器,见到了序列化器,见到了分区器,还见到了一个保证发送数据的大小必须得小于一兆,或者呢是小于32兆啊之后呢,把数据。
09:02
添加到对应的这个缓存对列里面是这样一些事儿吧,那些宏观上已经了解完了,那下边我们来研究一下对应的细节,对吧,一个一个研究。那往下看,往再回来啊,往上走。那元数据这块不用看了啊,序列化操作,那这也比较简单,就是P和Y6戏的话,那你是这个默认的基本类型,他都已经给你处理好了,那分区操作要看一下分区操作,那还记得它的分区规则吗。稍微回忆一下,说这里面分区规则是,如果指定了分区,是不是按照分区进行发送数据,那你看这块说啊,我这块要获取分区,那么说这个分区啊,如果说它不等于空返回什么,直接就返回这个分区,你说这块表示什么含义?如果指定分区。那么是不是就是按照指定的分区进行配置?那就完事了,那好,那如果说它等于空,也就是说没有指定分区,那什么呢?那那按照这个partan点。
10:07
Partan。哎,那按照你分区器进行一个分区控制,那好,那按住CTRL键点,点到这之后,你发现这是一个接口,诶,那我要想看到它的时间类怎么办?反过来CTRL2的加B点。点击之后,这里面就有一个default part,哎,那他走的就是这个进来,进来之后那继续点它part。进来之后,那这里面就是默认的分区规则,那好。那这里面你看这个叫K等于空,比如说没有指定P,那这叫没有指定P。那没有指令可以按照什么呢?你看这个单词叫粘性分区缓存,然后分区处理,也就这块呢,就按照粘性分区处理。好,那下面呢,那下面这块就是如果说你看你上面这呢,是说没有指定K是不是按这处理了,那么如果是指定K呢,那指定K是不是走这个。
11:04
如果指定P。那按照什么呢?按照K的哈西扣的值对应这个分区修模啊,这是K的哈希扣的值的一个求法啊。按照。K的哈西扣的。值哎,对分区数求模。那这样就OK了,那这里面至于粘性分区,那你可以详细看一下,那这里面呢,点击这里有一个nice part,那这块的处理其实就是粘性分区处理,那它的规则就是某一个分区,哎,它要么批次大小到了,要么就是这个令MS时间到了,哎,那这时候你可以切换分区,那么切换分区的时候你要稍微注意一下,就是它不会跟上一个分区一致,只要跟上一个分区一致,它就会干什么,你看这块new分区和ES,如果等于这个上一分区能干嘛?哎,又重新进行随机生成一个分区啊,就这里面一些细节啊,有兴趣的同学呢,可以详细的研究一下啊。
12:10
让他这块呢,往回走啊往回走。这是分区的一个处理,那分区完事之后呢,这个保证这个数据传输大小,这个已经看完了啊,不看了,那往下看哪个呢?看这个追加的操作。看这个。这里面呢,就是accumulator a向这个缓存队列里面追加数据,那追加数据的时候,咱说是按照这个每一个分区是不是往里面进行追加呀,那是不是这样呢。那看一下呗。好了,这里面有一个TP,看这个TP啥啊,说往里面追加。哎,这个T呢叫topic part,其实就是每一个主题下边对应的一个分区。好,那那有它往下看,你点选中它,选中它之后,首先就到这里面看这啥。这块。叫获取。
13:00
或者。或者创建一个队列。知道吗?哎,那按照按照什么时候呢,按照分区。知道吧,哎,说按照每一个主题的分区去创建一个队列,好啊队列。那拿到这个队列之后,他就尝试着向这个队列里面添加数据,那么大家这时候他能不能添加成功?叫尝试。像。队列里面。添加数据,那这时候呢,他添加不成功啊。因为他现在所有的内存资源还没有准备好,批次大小啊也没有设定好,所以说你这里面尝试是失败的,那继续往下走,那往下走啊走什么呢?就是说啊,往这一堆里面添加,那下面这有一个这个地方。你这个队列里面每一个批次得多大,比如说你这是一个总队列是有了,那我得往这里面一个批次一个批次放,那好,那这一批次大小多大呢?你看这里面有这个值。
14:11
嗯。叫max啊,叫max max size。还有后面呢,叫啊获取跟它进行比较,让这两个值取一个最大的。啊,取得最大的,那好,那by size大家都知道哈,它的大小是多大了,比如说by size大小默认是16K。那大家思考一个问题啊,如果我传过来一条日志,比如说数据大小是多大呢?是17K。那这时候你怎么帮我来传这条日志?那你是掰成两半吗?把这个数据放到这个,哎,一个批次里面放两个是这样吗?这边16K,这边1K。那很显然它不是这样的,看什么啊,那这个批次呢,它就放17K不就完了吗。嗯,那所以说这里面涉及到一个BA size和你真实数据传输一个大小啊的一个比较,那这里面真实数据大小,你看这里面是不是有一个压缩呀,哎,其实可以算你压缩之后的数据的大小,跟我这个BA3的大小啊进行一个比较啊,到底谁大啊,然后我取谁,那先来看一下,简单看一下啊进去。
15:18
进去之后呢,在里面继续点。点完之后有这个值,这个值呢说啊,如果大于等于它,那么就走这个default末了啊,其实它呢是跟这个值是相等的啊相等的,那进来之后看这。这里面就这个值啊,这里面有涉及到key value head,比如说你传输数据的时候,有它的key,有它的value,是不是还有它对应的hi头啊,以及一些其他校验信息啊啊对吧,那这些值都加在一起,看看跟你这个16K比较是谁的,那其实这个后面算的就是总体的这个消息大小啊消息大小。比如说传输数据大小,那这个是多少个字节呢?看一下啊。啊,是这么多。啊,太多了啊,不看了啊回来。
16:02
像那这块呢,总之啊,你要知道啊,这里面呢,一个是16K,一个是呃,数据真实大小,那你看它的哪一个大就取哪一个,那这里面就得到这个塞大小,那得到这个S大小之后,那接下来他就开始申请内存了。那申请内存你看就是你申请这个大小传进来,那这个是谁呀。点看看了吗?Buffer熟悉吧,之前我们在创建那个accumulator的时候,也就是32兆缓存的时候,是不是有一个buffer pro,哎,就是那个缓存池啊,其实就是内存池。这内存,那这里面就是内存池。分配。内存,那这里面要记住啊,稍微回忆一下你这里面啊,这是队列,这是内存池。内存池,那你这里面,哎,需要一个批次大小,比如说我需要一个16K,那从内存池里面看到没?哎,申请对应的16K的批次啊,数据放在这,那然后之后你就可以往这里面写了,那这边往外发,发完之后是不是还有一个释放内存的,哎,释放内存之后再回归到这个内存池啊,是这样一套流程啊,这叫双端队列吗?还记得吗?
17:15
回忆一下叫双端。比那将拿到这个内存之后,那继续往下走,这个内存呢,它又对它进行了一个封装到这。对它进行封装。叫封装。内存。把他进行封装,封装完毕之后得到了是一个他那得到它之后呢,他又通过这个producer Bach对它又进行了一个封装,啊,再次封装。再次封装完之后,就得到了这个批次大小啊,真正的批次大小。哎,比如说你这里面对吧,哎,这是一个队列,那每一个批次的大小其实是不一样的,有可能这个是16K,那有可能这个17K,有可能这个呢是呃18K,但是有没有可能说小于这个16K,不可能,因为那里面是不是算数max这个批次大小和你数据的大小的一个比较啊啊所以这里面要注意啊,也是最小是16K,但是呢,这枚批次呢,可以大于16K。
18:21
那大于它之后,那就是这个P次大小了,那P大小你看这个DQ是不是一个队列,然后at特last是往这个队列里面添加数据,它是不添加到这个队列的末尾呀,看添加到队列末尾那头给谁了,头是不是用来这个三条往外发送数据啊啊这样一个过程啊啊,那这就是向队列的末尾。添加。出去。添加批次吧,啊,批次啊,那你就往这个批次里面可以正常添加数据了啊,这就是整个过程,然后再往下呢,这里是你有一个这个record,呃,APA result,其实就是判断你这次有没有正常添加成功,那好,那如果说by size for,比如说哎,这个base已经添加完了,或者呢,这个DQ的一个size已经大于一,那就说明这里面已经添加好数据了啊,那添加好数据,那就返回结果呗。
19:14
结果,结果。往回走啊,往回走。到这你看这呢,是这个PA返回的值,那就是添加是否成功的一个结果,好,那获取到这个结果就说result。是否?添加。成功的结果啊,那么这个result拿到之后,你看啥呢?看这看到没啊,这个result BA is for,比如说如果满了,或者这个result已经是一个新创建的这个BA啊,那创建那怎么办?那我就唤起这个发送线程,那你说这个时候其实数据已经发送到对应的这个缓存地列里面。啊,已经进到这里面了。那进到这里面之后,那接下来谁闪亮登场了,那接下来就是由这个三德线程来闪亮登场,来帮我们拉取电的数据好。
20:08
那这个过程呢,就是这个,呃,把外部数据发送到这个缓存队列里面哈。
我来说两句