00:00
好三节课,我们呢,已经将这个三的线程唤醒起来了啊,那唤醒起来之后,它要做什么事呢?来看一下这张图。你说现在啊,这里面数据已经发送到这儿了,他呢也被我们唤醒起来了,那接下来就由这个三特线程帮我们把这里的数据发送到可法集群,哎,就干这件事儿,那现在我们要做的就是再次找到对应的这个三线程啊,看它里面做了哪些事。那找到三程,按住CTRL键点这个。进来之后点这个三来到这个地方,来到它之后呢,找它对应的run方法,好,那就这个。那run方法里面进来是一个while循环,Running一直在这运行,运行的一个方法呢,就是这个run one好那点它呗,进来,进来之后首先看到的是这块的收入。说这呢是这个tra manager,其实就是事物manager,是不是等于空,如果不等于空,哎,那进行下面操作啊,那这块呢,是事物相关的操作。
01:05
那由于本次案例呢,我们现在还不研究事物对吧?哎,先看一些核心内容,那这个略过之后,那继续往下走,那往下走看这两个地方,第一行内码这个。这叫发送数据,你看send producer date嘛啊发送数据,那另一个呢是客户端啊,其实呢,这也就是拉取对应的发送结果。叫获取。发送。结果比如说你发送过去之后,那对方是不是得给你进行响应啊,那响应就是收数据啊,所谓的就是客户端往这边发数据,说哎,我把这边数据发过来,那发过来之后对方得进行一个应答呀,哎,就是一来一往哈,那下面我们来看一下这个发送过程。点击进去。进去之后,首先第一个。叫什么呢?叫meter data fetch,其实呢,还是抓取数据或者说获取。
02:02
原数据,那这块他为什么要获取原数据信息呢。大家想我这个三顿线程啊,要把这里的数据发送到卡法集群,那你是不是得知道我要发送到哪一分区,哎,哪一个节点有什么位置存储了什么样的数据,那你首先得把原数据拿过来哈,也就是说每次交互之前啊,哎,都必须获取对应的原数据哈,那获取完原数据之后,那下面看这个。这啥意思呢?说this accumulator,这个accumulator是谁了?还记得吗?是不是这块,哎,32道缓存说它是否已经准备好了,比如说他是否准备好,那什么情况下才会准备好呢?那还记得我们有一个by size,还有一个是不是ler Ms啊,那其实这里面就是判断这个L格MS相关时间有没有到啊。这里面。嗯,就是判断。32兆。缓存。是否?准备好。
03:01
那看他准备准备好进去呗,点击进来。进来之后啊,这里面的代码呢,还是相对说比较复杂的啊,一堆信息啊,那不要着急往下看,看哪个地方呢,首先往下找,说这里有一个free free是谁啊,记得吗?是不是这个buff for po,也就说你这个缓存区里面,哎,每批次的一个数据,那么这个队列呢,如果大于零,那就相当于这里面有数据,那有数据就往挨走呗,那好。那这呢是一个队列信息,有队列信息有数据的话,那我就获取这个队列里面信息,那这个队列信息往下看,那获取完之后,你看他每次啊都从这个队列的头先获取。还记得我们说它是一个双端队列吗?加数据的时候把它艾特到这个last等于说最末尾,但是发数据的时候呢,我们是把它最头的数据往外进行一个发送啊好。那么如果说它获取这个队列的头信息,这个bed size如果不等于空,那是往下走啊,走到这,走到这再往下走,说这有一个这个集群leader啊,For获取这个信息,比如说如果说这里面获取的这个leader啊,它等于空。
04:11
那就相当于哎,虽然说这批次有数据,但是呢,Leader为空,那其实呢是没有你要发生的目的地,那这时候呢,那这是要抛异常的啊,也就说这呢是不对的哈,不对的,那不对呢,往下走呗,那这块这块呢就是目的地啊,有对应的节点允许我们进行一个发送啊,那这才是真正的一个啊正常逻辑哈,比如说正常要走这个代码,那走这个代码,那什么条件下它能够发送数据呢?那往下看,往下看有一个这个说wait time是等待时间,每一批次数据的一个等待时间,那等待多久呢?继续往下看,说等待这个时间啊,如果说它小于一个。Back of Ms啊,啥意思?来看一下。说你这里面有数据,那有数据呢,这里面涉及到时间,一个是等待时间,其实数据放到这之后啊,如果这个等待时间小于一个back of,比如说重试时间,这重时间多长时间呢?100毫秒。
05:11
那前面还有一个说BA attempt尝试拉取的这个次数大于零。那这要注意所谓的尝试拉取是啊,比如说这里面,哎,把这个数据往这里面尝试拉取。那拉取的次数默认是多少呢?默认是零,那默认是零它就不可能大于零,那如果大于零的话,那数字说明拉取的一个重式啊,哎,是拉取的重试啊,记住如果说它大于零,那么切这个重试的这个等待时间还小于你这个重试时间,比如说没有到达重试时间,那怎么办呢?那它是不是就等于处啊。那如果说它等于出这样。记一下吧。说如果。是重试。比如说不是第一次吧,啊,如果不是。第一次拉去。
06:00
那不是第一次拉群,那且等待时间。小于。冲刺时间。默认是100毫秒啊。那小于这个值的话,那怎么办呢?那它是为处。那它等于处之后,那好,它是处的话,那这个三元运算符,那它就是处,他处的话是不是取这个重置时间啊。如果。这个值。吃醋。那么就取这个值。对吧,那如果不是呢,比如说如果不是第一次拉取。那取谁,哎,那就取对应的这个另格MS它的值,那么它的值默认是几啊,哎,默认是零,好,那取它的值之后,那就给了他,那给他那看一下他是谁。
07:02
他往下看,说这个队列啊,如果这个大小呢,大于一,那你说这里队列里面有数据,或者呢,这个BA已经满,比如说当前这个批次呢,已经满了,那是不是就是满了,那这个呢,就是。这个条件叫。批次。大小满足条件满足发送。条件,那还有一个,那下面这个呢。下面是时间判断,你看刚才那值啊,这个值如果小于等于这个它,比如说wait t time也说等待时间,这个等待时间大于等于它了。什么意思,超时了呗,哎,超时。也要发送。也要发送。那这个超时时间,那这个如果说是第一次发送的话,是不是就取这个link格MS的值啊,那0Ms默认是零,那当然你设置五毫秒十毫秒,那就是五或者十,好,那你设置完毕之后,那也就说它的等待时间如果超过了你这个link Ms,那什么,那这个也要发送,那好,那看下面这只。
08:07
这块说这个三的A包啥意思呢?就是你未来允许发送的这个条件有哪些呢?有这么几个条件,第一个条件。就是它呗,负等于说负的话是可以的。那谁呀,F的话呢,就是批次大小满足条件,那还有一个呢,就是它那它谁哎,它就是这个时间到零根MS时间到。这两个值是吧,这两个任意满足是不是都可以啊,那或者呢,是关闭了或者暂停了啊,像这个呃,队列里面筛选数据,还有呢,这个事物事务结束了,那都要进行一个发送数据,你看这么几个条件啊,那现在我们最关心的就是这个啊,一个呢是批次满了,或者呢,就是这个links啊,这个时间到了哈。行,那看到这儿之后呢,就是数据已经准备好了,准备好之后往后退。
09:00
上哪了?哎,这地方是不是,这是就是数据准备好了啊,这是我们第一个要干的核心事情,那继续往下走,下面往下走看什么呢。嗯,这块说no topic啊,这个是,呃,如果是这个leader topics啊,不知道,那不知道的话就直接就抛异常了啊,那往下走,那说remove any know we aren't啊,Ready to send,也就说如果说发送的对方这个节点啊,没有准备好。你看这边要往这个卡法集群发送数据,那如果对方这个note节点没有准备好怎么办呢?哎,那他会remove删除掉相关的一些数据的处理啊。那往下走呗,啊,这个都不是我们关心的啊,往往这走是吧,Create produce record啊这块呢,是我们关心的。这个呢,就是发网。每个节点。数据。进行一个合并。
10:01
进行封装吧。进行封装好,那这里面什么意思啊,Accumulator进来。进来之后,那这里面有一个BA size啊be size,那这个BA size呢,你看它是一个集合,什么集合呢?是map一个集合,哎,第一个呢,K呢是引T格,那value呢,是一个粒子集合,那集合里面是一个producer Bach,比如生产者的批次。啊,那他你看我们遍利谁呢?我们便利所有的节点,那节点就是对应的这个博节点啊,你这边有多个节点好。那比如说这是博和零,这是博荷一,那我变历之后,它要干一个什么事呢?看那块儿。哎,往这个BA里面添加数据,但是我添加数据的时候是怎么添加的,是以节点的方式进行添加的,比如说把我前面生产者对应的所有的请求。哎,我都按照节点进行一个封装,比如说这个节点A是放放这个博客零的,这些呢,数据呢是发博一的,哎这样的一个封装,那这样封装有什么好处。
11:09
哎,那其实你这批次往这发的话,是不是就是不用换链接了啊,直接一个请求啊,就把所有这一批次的数据都带过来,不需要建立两次连接,知道吗?好,那现在这个呢,就是以节点为单位来发送数据请求哈。啊,那回来之后那往下看,那这个再往下走是到哪儿了呢。这块呢,都不用看啊,不用看。往下看看这块。这块呢,有一个producer。下面处理完毕之后,那就开始发送请求。发生需求,那么看一下他发送请求,他发生了哪些事儿啊。点开,点开之后这里面呢,仍然调用的是发送请求,那继续点击。点击进来之后,那就来到这里面,那来到这里面看什么呢。
12:01
往下走。往下。在里面,在它下边这块有一个client.new一个client request,就说创建一个客户端请求啊,然后呢,返回这么一个对象,那返回这个对象呢,又由这个client点来发送这个请求给对应的这个服务端啊,那这呢是它的一个发送过程,那发送过程那看一下这个发送是怎么发的呗,来点点开之后你发现是一个接口,那你回来ctrl out在B。那这里面我们是一个network来啊是它啊点开之后读继续发送。继续发送的话,那看谁呢,这下往下看。这里面有一个do散对吧?啊,那点开它发送do散里面,哎,再往下这块。对吧,哎,这块呢,就是这个封装了一个in flat request啊request,然后呢,把这个in flat request放到了一个in flat request斯还记得这个吗?啊就是它。
13:07
那他谁啊,他在里面是不是就是类似于我们之前说的,它为每一个节点是不是封装了五个请求啊,啊,也就最多是五个,当然你可以配置多个哈,比如说这块啊,那它封装这么多请求之后呢,通过这个slide把数据呢,发送到对应的服务端,你看。底层走的话,是通过这个slide把数据呢,发送到对应的这个服务端啊。行,那这是这块啊,这是发送过去,那发送过去之后,服务端是不是得进行一个响应啊。那这个响应他怎么收呢?往回退往回退。再退再退。再退啊,别退晕了啊,再退一下。这退回到这了是吧,那我们进来进来之后呢,找到哪呢。这辈子。你刚才呢,是调用这个send producer date发送数据,那接下来我们就通过这个client点来拉取对应的结果,比如说服务端给他响应的结果,诶拉取,那你发现还是一个接口回退ctrl out加B。
14:12
进来之后。网络请求好,那在这里面其实就能够获取对应的这个,哎,服务端的一个结果了,比如说slide点啊,通过这个还是通过这个slide来发送数据啊,来等待对方的一个结果啊,那你看这下面有各种这个啊response相关的一些事情。好,那这里面我们通过这个slide点啊,发送对应的这个呃,数据到服务端,那就可以获取对的一个结果哈。
我来说两句