00:00
好,同学们,我们继续队列有了,队列里面也有消息了,那接下来如何消费呢?我们就通过消费组的相关指令来消息消化处理我们的message。开高第一个X group create。用于创建消费者组,我们现在呢要消费队列里面的message,那么他呢需要消费者,但这个消费者呢,他需要分组来创建。像一个同学必然属于某一个班级,那么这个班级就是我们的group消费组。好,那么同学们请看一下它的命令X。Grape。Create key group,诶,那么好,同学们,那么X group,那么下面你要干嘛?Create哪个my stream,那么你的分组名称group,假设X,那么现在我是按照Dollar。
01:01
OK,那么下面我又分了一个组,这是A,我就按照零减零,或者直接就写个零都可以,那么这两者的区别就是Dollar表示从尾,零表示从头,那么前面说过这个Dollar是不是比当前已有的最新的还要新下一条说白了就是对尾新来,OK,好,那么下面我们这儿至少可以获得我们创建了一个分组X,我们又创建了一个分组,哎,那么我们接下来我们按照零从头到尾去读,那么有分组了,那是不是要有消费者,那必然然1234567或者ABCDEG。某一个消费者必然属于某一个组,这个组里面的某一个具体的消费者去消费了我们stream里面的message。好,下面请看X RA group group,那么干嘛呢?请读一下。右箭头表示从第一条尚未被消费的消息开始读取,就是我们那个邮标往前走读一条往前走读一条往前走OK,那么假设我们消费组A里面的消费者consumer之从my stream里面消息队列当中要读取所有的消息,那么在这它是会有一些讲究和特殊的要求,同学们请看我案例演示哦,这是个重点,来同学们我们这啊。
02:18
现在我们现在是不是创建了一个A从头读啊,那么下面我们再创建一个B,好,同学们请看啊。没问题吧,那么下面我就要开始读了,很简单,那么X read group group分组来读,那么下面你是哪一个呢?Group a组里面的哪个消费者consumer。一号,那么大家请看啊,我这可没有写,看我没有写,默认就是让一号消费者A组里面的蓝框框里面的这个A组里面的一号消费者给我全部去读,明白吗?那么streams读流失信息哪一个my stream怎么个读取法游标?
03:10
分发往前一步步的过,那么现在一读,同学们请看读了几条,在这个K里面,你只要这么写,我是不是给你把当前我再买stream里面的六条消息一次性全部给你读出来,OK,好,那接下来我们继续。A组里面已经有一个group consumer11号消费者读完了全部六条信息,那么下面A组里面的二号消费者我如法炮制,还能不能读到呢?一会车。晕,是个空。这是为什么?哎,不对啊,A组里面二号消费者为什么读出来是空的,因为他这有个特殊的要求。请同学们露眼。
04:02
Stream当中的消息一旦被消费组里面的任意一个消费者读取了,假设你不加控制的话,默认它就全读,那么全读就说明本组,比如说就是我们这的A组已经有一个人全部便利了。尾读的消息,那么这个指针或者叫这个游标已经全部走到尾部了,读完了,那么本组中就不,本组中的其他消费者就不能再读了,听懂了吧?也就是说就不能再被该消费组内的其他消费者读取,因为这个游标已经从头走到尾了,你A组里面的第二个消费者来,对不起。后来的没有,也即同一个消费组里面的消费者不能消费同一条消息,刚才的X RA group命令再执行一次,此时多的消息就是空,能理解了吗?同学们,OK,那么假设啊,你即便是A组里面的同一个人,一号消费者还是空,因为你这个指针已经走完了,那么假设二号消费者还是空,但是。
05:05
如果是。不同消费组的其他消费者是可以消费到同一条消息的,我们是按组来读,兄弟们大家要搞清楚啊,你现在啊,A组里面的二号消费者,我读的是空,因为你A组碰过了,那不好意思,那我B组还没读啊,有没有,那么B组是不是也是给他全部撸了一遍,这么说能跟上,哎,那么同理,如果B组里面的二号消费者,那么大家觉得有戏吗?也没有,因为B组也把这个游标从头刷到尾了,OK,所以呢,同学们,这个就是我们的。第二种情况好,那么下面消费组的目的,大家猜一下是什么呢?哎,为什么你只设置一个组里面的一个消费者读呢?你别忘了我们这儿啊,兄弟们。是不是有个can,那么我们默认就是一组里面的第一个消费者就可以全部把它读完,那么如果这个can,我们能不能实现类似于负载均衡一样,大家匀着一点,掐头去尾,一个消费者读三条,我们是不是可以指定它读取的看头?
06:15
调速啊,哎,这样的话大家请看。我们的目的第一种,一组一个消费者全读,第二种,让组内的多个消费者共同分担读取信息。所以我们通常会让每一个消费者读取一部分消息,从而实现消息读取的负载。均衡分布,OK,你比如说同学们,我们这建了个C组,那么也是从头读到尾,那么C组里面的一号消费者读几条读一条,C组里面的二号消费者读几条一条,OK,好,我们知道全部读完,那么我们也可以来试一下啊,那么同学们请看啊,假设,哎,我们这个C组好像我见过了,对不对,那么。这个时候的话呢。我们这儿啊,看好,那不好意思啊,这个命令它的编写顺序啊,那么group,那么假设啊。
07:09
Group c,那么consumer,那么这个呢,就是一号消费者看读几条,假设我读两条嘛,好不好?那么stream is,那么这个是my stream依旧弄过来没有group my啊,我还没见过这个group c是吧?好的,X group,那么现在create哪个key my stream。Group c,那么现在我呢,就是让他从零开始来,这个时候没问题了吧?那么来,咱们请看,是不是我们C组里面的一号消费者给我读两条,好,C组里面的二号消费者再给我读两条,好,同学们再请看,那么C组里面的三号消费者再给我读两条,那么假设啊,C组里面的四号消费者,对不起,指针走完了,后来的没有了。
08:05
OK,所以呢,这样的话呢,就是实现了一个组里面的,你可以设定某一个消费者,你让他读几条,按照指针游标往前走着读,按下标来读,OK,好,那么同学们,这个就是我们的什么X RAID group,那么接下来我们就要回答一个重点问题,那么也就是消息的已读未签收和已读已签收ack机制,下面重点问题啊,同学们请看。我先把下面的呢给大家来遮挡一下啊,我们先看文字,再读图图。基于stream实现的消息队列,如何保证消费者在发生故障或宕机以后仍然可以读取到没有处理完的消息呢?注意,刚才我们只是读了我没有签收,听懂了吗?没有A,那么stream会自动使用内部的队列,就叫pending list,俗称悬而未决待定的那留存消费组里面每个消费者读取到的消息来进行保底,直到消费者使用XA以后,他才会把这个消息说已经读取且已经签收,他才会把它处理掉,否则都会给你留存备案,兜底消费者确认增加。这样呢,诶签签收机制呢,就可以保证消息的可靠性,一般在业务处理完成以后,就是说假设你已经确实读到,也用完了,那么需要执行XA命令,确认消息已被消费完成,消费者应该给队列stream一个回执,OK,好,那么同学们,上面是文字的说明,下面就是一图。
09:39
左边消息的生产者,右边客户端消息的消费者,消息来了,生产用XA把我们一条消息放到STEM消息队列里面,然后分组,每组里面有多个消费者来读X group,那么呢?Create,那么X RA group按照分组来读,读完以后大家请看前面我们从来没有用过X啊A对吧?也就是我们确实读了,那没什么好说,大家请看。
10:07
刚刚执行的命令,按照游表往前走,这个是不是C组里面的三号用户给我读了两条,读了这一条,K7和K8,但是我读完以后,我并没有告诉STEM你可以删掉,可以处理,我没有给你发A确认签收,所以我现在还缺少XA确认机制,你看所有的都是从左到右,唯独只有它是不是从。后面返回前面,OK,好,我们同学们pen点来查看一下啊,查询每个消费组内所有消费者已读取,比如说啊这种情况就是已读了,但是还没有确认,就是没有A的消息,那么大家请看假设expanding。My group a啊,那么这个时候就是我们的X pending my group,假设我是C。
11:00
消息OK,那么大家请看啥意思啊,总共有六条消息,开始啊结束时间,那么这个C组里面消费者一号,一号消费者读了两条,二号读了两条,三号读了两条,那么来这儿给大家写好了,A组里面啊,只不过我现在演示的是C都一样,这个就是时间戳,就是消息的范围,起始和结束,那比如说啊,咱们咱们就说再来一个AA组。怎么着,我是不是读了六条,就是A组里面只有一个用户吃独食,那把六条全给我干了,OK一回事,那么来这个呢,就是X pending,来看看某个组里面的消息,已读但是未确认的一个。清单列表是什么情况?请你罗列在这,好,那接下来查看。某个消费者读了哪些数据,那么大家请看,刚才呢,我们也给大家演示了,他这个意思就是。我们在这儿就是这个蓝色的这个啊,就是我们这呢,就可以非常清晰的看到C组里面一号读了哪一条哪一条哪一条,那么来吧,我们在这啊,也可以看出X extreme在罗列一下这个ID,这个ID就是它上面这些IDOK,好,那么接下来我们呢,要干的这个活就来了。
12:21
一旦消息啊,假设这条记录被卡尔处理了,卡二就可以使用XA来通知STEM,你可以删了啊,不用判定,不用悬而未决,我表示,我确认我确实读取,我确实收到了,OK,好,那么同学们。我们呢,搁到这儿。来吧,最后一个A命令。读取已确认,那么来expanding,那么假设my group c,现在我要看看啊,从小到大消费者二消费了多少?那么来吧,同学们,很简单啊,一个个来expanding,然后呢,MYSTEM,然后呢,Group c,然后。
13:07
十条。CONSUMER2号,那么二号记录他呢,就读了两条,分别那么C组里面的二号消费者就读了两条,分别是哪两条啊,这两条好,那么下面啊,我那就X a my stream下面告诉你。Group c组里面的哪一个,我把这个时间戳拷贝下来,C组里面的二号消费者我读过了,那么我把第一条告诉你,我签收了,OK,成高,那么此时我再来执行我们的判定,大家请看啥情况,那是不是就是第一次啊,没有签收之前就是已读未确认的有两条,那么ack确认签收以后是不是就已读?未签收的只有一条了,那么再执行一个,那么它是不是就是空了,OK,好,那么同学们,第二条留做家庭作业,自己来执行一下,很简单啊,那么在这儿下面实操讲过了,下面呢,就是给大家的一个最后的一个复习,红框框第一组没有A之前是两条代表读过但未确认,那么蓝框框的。
14:22
这个时候AC确认成功一条时间戳对应AA再来查,那么A科AC签收过一条以后,那么只剩下另外一条未签收的,好,那么同学们,这个就是我们什么?Stream相关的消费者对应的常用基本操作命令,那么最后一个命令我也就不敲了,很简单,X in four就是用于打印stream消费者分组的一些详细情况,那么X in four stream固定写死哪个t my stream,那么告诉你这个里面有多少条记录第一条到最后一条一些其他的基本信息,好,那么最后给大家呢做一下小总结和使用建议,那么大家思考一下,看完这个以后啊,你们觉得你们会不会用right stream流逝?
15:13
数据结构来替换你们实际生产项目当中的,买CQ呢?哦,抱歉,买MQ呢,也就是替换卡夫卡或者MQ呢,大家可以讨论一下,我暂停一下录屏,嗯,说什么的都有啊,好处嘛,就是说我只要有它了,假设我的消息中间键不是特别复杂。我不用引入一个新的技术,我还是在right范围以内就给大家解决,但是呢,也有部分同学呢,还是觉得呢,专业的事情交给专业的人来做,OK,那所以说呢,在这儿啊,那么这是杨哥仅代表本人愚见,不权威。我觉得呢,就是从我的工作经验,工程经验来说呢,STEM还是不能够100%的等价于卡夫卡和Q生产商,目前实际使用的案例呢,还比较少,慎用还是分开做好缓存,做好消息中间键的。
16:04
各司其职,各归其位,OK,好,那么同学们stream就给大家介绍到这儿。
我来说两句