00:02
好,下边我们来看这个代码举例啊。呃,这个代码举例是这样,呃,我这里面举了两个例子,一个是批量。消息生产者,一个是批量消息消费者,这两个没啥关系啊,呃,这个生产者啊,就是我们批量生产啊,批量发送啊,给大家举个例子,呃,这是批量消费,给大家举个例子啊。那么首先我们直接打开代码。呃,这是批量生产啊,大家看。这前面这都没啥啊,然后这这是什么,这就是设置啊。这什么指定对吧,指定要发送的消息。最大。大小是吧,啊默认是。
01:01
四兆。这就。当然我这写的就是默认值啊,你比如说我要给它改成八兆,你这样光这样改是不行的啊。你这个改了以后还需要再改谁?还需要再改它。还是?他需要再再改,再改它就是我们这个,呃,那个配置文件是吧,所以。Again。修改该属。属性是。不行的。不行的,还需要。呃呃,同时修改。Broke加载配置文件中的什么?Max就跟这个属性没有一模一样啊,Max。
02:07
属性啊,需要再修改这个属性啊。呃,这个我们就给它注释掉。再往下。嗯,再往下我这儿。你看这是什么,一一百条消息啊。100条消息,我把这100条消息给它放到message里边是吧,这是个集合啊,消息集合,所以这个呢,就是。嗯,定义啊。呃,定义要发送的消息。这几何搞定好这个几何。呃。然后。然后大家看啊,我这里边儿。这里边儿你看我们这个代码,这发送的时候,我这是什么,该批量发送的需求是。需求是啊。
03:01
必就是。不修改最大发送四兆的默认值,不修改这个值。但还要防止发送的批量消息超出四兆的限制,防止它出现限制,所以我就要判断你这个如果超出了四兆这个限制,或者给你分割。我这给你分割。啊,分成小于四兆的再给你发送,所以我这里边定义了一个什么消息列表分割器啊。变为。嗯。这个消息列表。煤气啊,将。呃,消息列表。三。歌。呃。Worker。呃,不。超出。
04:00
四兆大小的。小,嗯。列表。所以它能拆出来一个小列表,但所以这个小列表怎么怎么搞出来的,我我让他啊,呃,一会儿我们再来分析它。呃,这个我让他,呃,就是可迭代的。啊,分出来这个这个小的列表啊,它是可迭代的,所以你看可迭代的它这个范型是什么,是个消息列表。这是个消息列表啊。哎,我这分出来,分出来以后,紧接着只要它里边有。有元素诶干嘛呢,我获取到这个元素给发送,那这个肯定就是我们消息列表啊,发这个肯定是不超自照的。对吧。这这这个这个获取到这个啊,肯定是不会超四兆的,给他发送就行了。对不对,这就可以了啊。呃,那么这个,呃,下面我们重点就要研究它了啊,这是我们的。
05:05
这是我们的。消息列表起。对吧。呃,首先我这儿我的极限值是多少,40。呃,指令。呃,这个几几几。极限。42对吧,啊,你这个极限值是40啊,那么。呃。再往下啊,这个是什么,这个里边放的啊,将来这个里边存放的就是我们所有的消息。所有的消息你看我通过它传进来的呀。对不对,就这个。通过它传进来的是吧?啊,它里边存放的是所有的消息啊。存放所有要发送的。消息对吧,然后这个cant值。
06:02
比较重要。它是什么?他想要进行批量。发送。消息的,我不是把它很多集合了吗。小集和小。集合。其实。所以。小集合的汽水。然后构造系是吧,传进来这个message再往下。因为我这个是实现了什么。啊,实现it是吧,迭代。啊,所以我最终那个结果啊,实际上是在这个就是在这个迭代器里边的是吧。Next next这边。你只要。什么叫有什么叫还有下一个,只要你这个。就是你要你要发送这个小集合啊,你当前的啊,这个这个小集合它的起始,所以小于谁。
07:03
小于五这个大小。我这是所有的嘛,它里面放的是所有的消息,只有小于他说明后边还有。对吧。所以呃,这个这个这个。嗯。判断是吧,呃,当前开始便利的消息。所以。要小于。消息。是不是总数。你当前便利的这个,所以在起始吗。是吧,你也想集合在起始,就是你当前要变列它是吧,下一套好,Next,我们看N。为什么?这就是我当前要变临这个起始的缩影是吧,我现在要找这个小集合了啊,我获取这个小集合呢,所以我从这开始啊,从这开始。塞是零是吧,然后。这个这个我开始便利对吧,便利整个这个这个这个这个messages是吧,我所有的所有的,呃,这个这个。
08:06
呃,消息列表是吧,这总三是他们啊,然后呢,Message,我获取到这个当天的message。会掉它以后就再往下啊,你注意这在干嘛。你看这在干嘛?这是me get total点,这是不是你那个total,呃,那个他,呃,Topic跟你主题的长度,这是body的长度。这俩长度在加上这什么。Get所有的属性。是吧,所有的属性。我,我把所有的首相都拿出来,拿出来绘取它们的长度,那都是字符串。会长度都给它加到哪,加到这个临时是吧,这个临临时啊S里边,然后再给它加上20,这这这什么东西啊。
09:01
这不就是他吗?他的。长度加上它的长度,加上它的所有的key value的长度,再加上20。是不是一个当前你这个消息的大小。对吧,所以整个这个是是在。做什么?在做什么,是不是在这个,呃,统计。呃,当前。电力的。Message的。呃。是吧,我统计这个大小,当然这个是获取啊,写上获取当前呃。便利的。消息是吧,然后统计的大小统计出来以后。哦,这样啊,呃,这个这个我们先不用管它这个啊,呃,我们先不管它,先不用管啊,然后我们看这。
10:06
看这啊,这个是不是我们当前。呃,这个这个消息的大小是吧,现在消息大小,这假设这第一条消息啊,这大小。呃。这个大小。他又是谁呢?我现在也不用管他,我这个大纲有了,我就给他放到他里边。这个是用于记录。呃,我们超不超四兆,我拿一个消息的大小加里边,第二个消息的大小也加里边儿,第三个加里边是不是也加里边,所以这个就是用于统计我当前要发送的这小批数据的大小。是吧,所以它里边记录的。记录。呃,当前要。发送的这角P。是。
11:00
呃。消息列表的。大小。是吧,啊,这这个东西啊,它从零开始的,诶我就往里边加加加加加是吧,然后呢。看一下啊。哎。然后我再再来下一条消息啊,大小是吧,我A统计统计完了,然后我就看你的大小加上它以后。你当前这个大小,再加上你原来之前的所有的消息。是不是比我这极限值大,如果比极限值大了。那我就要结束了。对不对,Break结束。结束,然后呢,我就开始取取谁。取这个,这是我从这开始的呀,List啊,这个list注意。To south east。他是做什么的,呃,这个可以完成什么功能,这个啊是或。
12:07
关键就边边界值的问题啊,当前。Messages列表的。集合。谁呀?这是半壁。半开。就是半闭半开的,就是包含它,不包含它。明白吧,不包含这个,为什么不包含这个东西啊,当前这个,因为你加上当前的,大家看你加上当前的这个是不是已经超了,那你就不能加他呀。不能像当前的这个,所以我不包含它。是不是这个意思,我娶了他?第二,他以后就返回了。那我下一次,因为这次就没有统计这个,所以下一次从哪开始,是不是从他开始。
13:03
对不对,下一次。来继续,呃。下次啊开始,呃,这个这个这个这个。呃,开始便利啊。开始,所以是吧,下次便利。的开始,所以哎,就从这儿开始是吧,又又又又开始了啊。这边是这这次再再来啊,再来N的时候,是不是就这个就记录下来了啊,从从这开始,然后就这这要小于它是吧,获取或者统一大小,统一大小,然后再。是不是在往里边加,你这次来的时候,这个是不是又变成零了,再一个一个加加加加。是吧,整个这个过程应该没啥问题。对不对,应该没啥问题啊,那么这里边儿有这样一个问题。这我们需要判断一下,它是什么呢?它是这样啊,大家看啊,我们还还说刚才那个啊,这个啊,写一个子集合啊,已经已经啊已经找到了,给他返回了,下次又来啊,然后我就把这个给它记到它里边了啊,记到它里边了,下一次又来了。
14:12
又来了,又来了,哎呀,我这个从这开始了是吧,从这开始,哎,开始以后紧接着。拿到这条消息,统计了它的大小。然后这。最大小大吗?不大。假设啊,这个不大不大,然后再往下走,那走是不是肯定就给它加里边了,对吧,加里边然后再走,是不是又又又开始便利下一个了,对吧?便利下一个消息,便利下一个啊走。呃,下一个消息,然后大小来了,注意。这次夸下来了这个东西。这个消息比较大。这个消息啊,一统计这个消息,它本身消息本身大于四兆了。他这是判断。干嘛?
15:00
判断。当前消息。本身。大于。四角。看看当前消息本身是否吧。是否大于四兆?是不是就判断它是不是大于四兆啊,呃,如果说现在大于四兆了,这个消息大于四兆,前面是不是已经,呃,有一个前面已经有一条消息了啊,你注意有一条消息,呃。这个加加到这个total size里边了是吧?啊然后呢,再判断一下啊,他说这个n index减去current index。Dance的是谁?Can index。他原本ne恩ex斯是从那开始的,但那一次是不是已经执行了,已经拿过一次消息了,那个消息不会大S找,所以这个奈斯in代是不是加了一了。
16:00
所以它减它等于几等于一,它不等于零。是吧,它不等于零,不等于零。那是没有做,没有做这个。然后这就结束了。直接结束,结束怎么样,现在走哪要走这呢。所以看这谁还是你刚才那看,但这个是不是比这个看大一啊,所以它是不是就。获取了一个,这次这个子集合是不是就获取了这一个。他要没有要那个超过四兆的。超过四兆的那个那个。消息啊,没有。然后他没有要那个消息,紧接着又从他开始了,你注意啊,这一次他看变成他了,这个是谁,这是那个超过四兆的那个消息啊。它那个索引对吧,好过来了啊,直接走到这儿。下次又来next。然后回答他走。
17:02
这个直接是不是第一次获取到这个消息就是超过四兆的,那统计这个是不是它大于它成立了吧,然后它减它是不是等于零也没问题吧。因为你最开始走的时候,肯定他俩是负的值啊是吧。所以它减它等于零乘以吧,直接让它加加。他加价的目的是啥?往下走呗,加过一个break break又走到这。走到这大家看一下啊。这个。是谁?这个是谁?这个。就是。我们是不是这这这个就这个还是我们刚才那个记录下来的,就是那个超过四兆的那个那个缩引,然后他加加就到下一个了。然后。这里边儿。是不是取了这一个。这次是不是就取了这一个,这一个是谁,就那个超过四兆的那个。
18:00
超过四兆的这个消息。我给他返回了。那哥们说,那你你返回这个消息,单独这一个消息,他已经查出四兆了,你给他返回,他能给你发送了吗?这个就已经不再是我的这个分割器处理的能力范围了。你注意啊,我这个消息列表分割器。七只会处理什么?呃,这个。每条消息的大小。超。四兆的。情况。他只会处理这种。是吧?就说,呃。若,呃。呦。呃,存在是吧,某条消息。呃,七。
19:01
本身大小。大于。自证。这个。分割。无法处理。七。嗯,直接将这条消息。呃,这个。呃,构成呃一个。是列表。大会。是吧,啊,并没有在进行。他不会对消息本身进行分割。明白吧,不过对消息本身,他只能说这个集合,哎,我给你若干个消息给你构成啊,这个这个不超四兆的。但一条消息超死掉,它处理不了,那这个怎么处理,这种情况怎么处理,只能说明你那个消息分割的不好。
20:01
只能说明你生产的这个消息的时候啊,你这。就是这个东西,你在生产它的时候,构建这个消息的时候。你这个消息分割的不好,太大了。你得从根源上来改。明白吧,超过了它的范围了,所以这个代码稍微有些复杂,但你要理解它是干嘛的,这是干嘛的。是吧,啊理解这个啊。OK,这是我们说的哪个东西啊。这个还是给大家拷贝一下。拷贝。生产者是吧?然后。
21:05
这是分割器。呃,这个运行啊,它不会有啥问题啊,咱们。因为我这里边是肯定不超四兆啊,所以它分割器其实也没起啥作用,就是发送了一下。就是那他因为这里边儿没有啥输出啊,我们这也没啥,没啥输出就结束了。呃,然后我们,呃。这这some topic是吧,Some topic里边。肯肯定我们我们看这买赛季。Some topic,我们来看这个时间。
22:00
嗯,他他肯定有最近的是吧,我们我们这个。嗯。还还是这个是吧,我们。让他。呃,17点是吧,就我们之前一直没有生产这个这个消息。是吧,这是没问题的是吧。是不是,呃,它存储的时间啊,是是当天这个时间是吧。当天这个时间啊。嗯,7月8号。十。18点。那不是就就我们当前这个时间啊,我这这会儿这个时间。就是时间就是就是18点啊呃,18:15是吧。
23:01
呃,18:15。我怎么觉得不对呀?我怎么觉得不对,我看看。100条消息。这样这这咱这这么麻烦干嘛呢,是吧,我再再换一个不就行了吗。他。PE。C,我们和C肯定是没有的B。是吧,这肯定是没有的啊。这么麻烦干嘛呢?啊。嗯,看这。Topic AB是不是D有了吧,没问题吧。哎,呃,这里边啊,别忘了,呃,我们之前讲这儿的时候,我们之前讲这个是延迟消息的时候,其实就我这想看到了啊,想起来了,讲这个延迟消息的时候,你看。
24:09
这里边儿记不记得,我们当时说,呃,这里边儿过程里边有一个。是吧?你看topic叉叉叉是不是现在有了这个嘛,我们就看到它了啊。你看它里边就有二,为什么,因为我们当时指定的。这会儿在说他呢。呃,就是。延迟效性。我们当时指定的。它的延迟。等级是三级。3G是不是对的,就是012。他他他对的是不是那个QID就是二。对吧,那这会儿就是想起来了啊,就是想起来了。呃,OKOK。
25:00
那就我们看到他了啊就行了。好。呃,这是我们说到的什么?生这个生产者,然我们再这个批量消息消费者消费。看一眼就行了,这个简单哈。这大码里边唯独变化的就这大家看。就这样。就这变化了,其他没有动啊,这这个是什么呀。这个就是,呃,这个指令啊。每次。所以。消费。呃,十条。消息。末。啊,默认嗯一啊,这个默认值是一。这个呢,指定。呃,每次什么,所以呃从。
26:04
Broker。拉取。40条消息。默认是多少的?不是32。OK吧?呃。其他的没有啥啊,其他没没啥变化。当然,呃,我们刚才给大家说到啊,就是这你你不是一次消费十条吗,这十条。他同一个线程在消费的时候啊,这十条它是相同的结果,你看这个都成功是吧,我们前面一直说的这个都是成功的啊。嗯。如果你消费出现异常失败。你就得返回谁。就是。啊,就是消费成功。的返回结果。
27:03
这个是。消费。异常使的。那这个,那你这里边肯定得做判断是吧?啊,发现消费异常了,然后你跑跑着车开去。肯定要这样做啊。OK,这就我们那个。批量消费啊,批量消费就就主要就在这儿。这个大家也拷贝一下吧。放哪呢?OK。那么这样的话,我们就把什么啊,批量消息是吧,啊批量消息啊啊,我们也就给大家说过。
我来说两句