00:00
前面我们使用了rabbit tempt测试了消息的发送,那接下来你再来测试一下接收消息,首先我们想要接收来自于这个队列里边的消息,那么就需要来监听我们这个队列里边的内容,只要有内容,我们就应该可以来收到消息。那想要监听队列这件事情,Spring呢也会做的非常简单,专门为我们抽取了一个注解叫rabbit listener,我们来看一下有一个叫rabbit listener,那这个注解呢,翻译过来它就叫rabbit的监听器,它的作用就是来监听我们指定的队列,比如它下边有一个属性叫QS,它可以指定我们想要监听的队列,只要这个队列里边有内容,我们就可以收到内容,而且呢,我们这个要监听的队列必须存在,所以以后我们想要监听我们某一个队列的消息,只需要一个注解就行了,好,我把这个注解复制过来,然后我们接下来第五步,我们想要来监听,监听我们这个消息。
01:00
监听消息,那监听消息我们来使用这个注解,使用我们这个注解,而且呢,这个注解大家一定注意使用这个注解,这个注解呢必须要开启enable rabbit,相当开启rabbit的功能才能使用,所以呢必须有我们这个,因为我们这个注解这一块呢,也提示的很清楚了,Rabbit listener我们必须呢是有我们这个enable rabbit相关的功能支持我们才可以,然后我们想要监听的话,我们这个队列呢,还必须存在,我们想要监听的也都必须存在,而且如果是我们以前测试,我们只是来测试收发消息,我们来发消息之类,不要监听消息,我们其实都可以不用标注这个enable rabbit注解,我们只想给消息队列里边创建一些组件等等,我们可以不标注解,但是一旦想要监听消息,我们一定得来写上这个注解,好,那我们就来测试一下来监听消息,我们现在就来监听来自于我们这个hello Java q这个队列里边的消息。
02:00
那么现在就来随便来进一个业务逻辑,比如我们某个业务,我们这个order item service这个业务他想要监听我们这个消息,那怎么监听呢?就来写一个public word来写一个空方法,这个方法呢,我们就叫receive来接收消息的,那我们想要接收消息,只要收到消息,我们在这就打印能接收到的消息,接收到消息,那消息的内容是什么,我们来打印一下内容,内容应该是什么呢?我们先放在这,那我们想要监听消息,就必须用一个注解叫rabbit listener,那我们这个注解标注在我们业务逻辑组件上,而且我们这个组件必须在容器中,我们这个注解才能起作用,在这个注解里边呢,有一个属性叫Q,这个Q呢,来点进去,我们发现呢,它是一个数组,相当于它可以来指定监听多个队列,所以我们在这来写一下QS,既然是一个数组,我们数组的写法就是大括号,第一个队列的名字,第二个队列的名字等等。
03:00
那么现在呢,只来监听一个队列,那就是hello Java q,我们把这个队列呢传进来,好,我们来说明一下,我们把这个QS,这个QS呢就是来声明,声明我们要监听的队列,声明需要监听的所有队列,只要这些队列里边有内容,我们都可以收到,那真正的内容是什么?我们可以在这方法上随便来写一个参数,就叫object,我们比如呢,就叫我们监听到的这个消息,们把这个消息呢,内容一打印,并且这个消息的类型是什么,我们也可以来打印一下,比如这个类型,看一下我们真正的这个object对应的类型是什么,我们的message,好,那们现在来做一个测试,我们来启动我们的这个订单服务,我们在订单的这一块啊,我们来直接来启动这个订单服务,走启动起来好,我们发现我们服务这一块呢,启动失败了,他说tomcat的这个9000端口,9000端口呢,他说可能已经被占用了。
04:00
但我们实际上这个订单服务一个都没起,应该没有被占用,来检查一下Windows的端口占用情况,CMD在Windows里边呢,我们可以来输入这个命令,我们叫net state,相当于网络的状况来杠a no,所有的端口情况,我们所有的端口,我们在查出的这个结果里边,这能查出所有的这个端口占用,我们想来找9000端口,我们就叫放的str来,我们这个杠就跟我们Linux里边管道服务一样,在已查询到的结果里边来找9000端口,走9000端口呢,我发现确实有人占用,那是这个进程19796,那这个进程是一个什么程序,来可以看一下,有一个叫task list,来列出我们当前系统里边的所有进程,来看一下task list这个进程里边,我们继续放的str来看我们刚才的那个叫19796这个进程号是哪个来点过来,诶发现呢,它是一个。
05:00
阿idea尔这个控制台,这相当于是我们这个阿idea尔占用了这个9000端口,那实际上我们这个阿idea尔还要启动,还要使用,那么现在呢,没办法,我们只能把我们这个订单服务,把这个端口号呢来改成九千一十,好,9010,我们就以它吧,好,我们现在重新来启动我们的订单服务,咱们这个订单服务里边呢,专门有一个人来监听我们hello Java q这个队列,只要这个队列里边有消息,我们就可以来收到这个消息内容。好,那现在来让我们这个订单服务启动,好,现在呢,订单服务已经启动成功了,把控制台清空,我专门给消息队列里边来发一个消息们以我们以前的这个单元测试类为准,好,现在还是把这个消息再来发一个,走,使用rabbit temp来发消息,只要消息发送成功,我就应该能监听到我们这个消息内容,好,我们发现呢,我们这一块控制台打印,我们这个消息发送完成一号,哈哈,我们来到我们的订单的这个控制台。
06:01
他也打印了,接收到了消息,这个内容呢非常详息,有body,我们整个消息体内容就是我们这个杰森字符串发的对象转成了杰森,然后还有我们的消息属性,属性里边比如有我们的消息头,这个消息的类型的ID,我们指定是什么类型,我们之前在获取到消息里边我们都看到,而消息的内容类型是一个杰森等等等等,咱将消息的体内容,而消息的头信息全部都拿到了,而且我们现在拿到的这个消息,我们是标注了object类型,那么这个object这个类型来看一下它的这个类型呢?应该是一个什么类型的,我们有没有打印它的这个类型,我们在这没有打印类型,我们可以来get class,我们拿到的这个object确实能获取到我们消息的完整信息,但是呢,它是一个什么类型,我们来重新来启动,好,我们来清空控制台,我们来重新来发送一个消息。
07:00
我们在这来重新来发一个消息走好,我们现在呢,还是消息发送成功,来看我们的控制台先呢,收到消息没问题,我们这个消息的详细信息有打印,而我们这个object的类型,我发现呢,它是这个AMQP核心包的这个message这个类型,所以呢,我们想要接收消息,我们拿到的这个东西,它其实呢是这个类型的,所以我们说在我们这个监听消息的这个方法里边,参数可以来写以下类型,参数可以写以下类型,第一个我们就来直接写成他的这个message,这个message是AMQP核心包里边的,但这个message里边的消息内容非常的详细,它里边呢有get body,这是消息体,消息体就是我们之前打印的这个杰森数据,我们来可以看一下走,这是我们body消息体的内容,就是我们的这个杰森数据,如果我们get body的话,我们可能拿到的就是这个。
08:00
内容,而且呢,我们在这个消息里边还有我们的get message properties,我消息的属性,这其实就是我们消息头的这些信息,哎,我们当时这个发消息的时候定义的一些消息头的一些属性信息,这一块呢,也都有,所以呢,这一块就是我们拿到的消息头属性信息。所以我们说AMQP,我们这个消息呢,有消息头和消息体,消息体我们拿到了,但是我们这个消息体我们当时发送的时候发的其实是一个对象,叫order return entity这个实体类,如果我们想要真正来使用这个对象还是挺麻烦的,那可能需要自己来转化,比如Ja.pass object,那将这个对象,那将这个字节类型的对象转成我们指定的类型,我们可以用我们以前的转换方法把这个数据拿来转成我们指定的类型等等,那这样呢很麻烦,其实在spring加我们要写这个方法的时候,我们第一个可以来写它的原声消息类型,Message,好,这个是原声的消息,原声消息类型,原声消息详细信息里边有消息头和消息体,它都有头加体,那么这个呢都有,那我们现在呢?其实最终只想要获取到消。
09:23
A题里边的这个对象,那当时发的是这个对象叫order return entity,所以我们想要在这获取消息的时候,那还可以给这个位置来写一下我们想发的消息是什么类型,我们就直接写什么类型,我们就可以来接到这个消息,所以我们直接写一个content,好,我们当时接收到这个消息,这个消息呢,我们详细的打印在这,这是我们的message,但是我们的内容我们可以再次这样来打印,我们就叫内容,那内容的真正数据我们可以打印一下我们的这个content,这个content呢,我们来加上了to string方法,所以相当于消息体的真正内容,我们当时发的是什么类型,我们也可以自己来编写这个类型。接下来我们第二个参数就是object来写一个应该是一个泛型类型的,我们来写一个T,我们任意类型的,任意类型的这个消息内容这一块呢,就是我们这个发送的消息的内容发送的。
10:23
咱们这个消息的类型,那当时发的什么类型就可以写什么类型,比如我们这个例子,好,我们就不用我们手动转化spring呢,就会给我们自动转化,我来测试一下这种行不行,我来重新来启动一下订单服务,启动成功以后呢,再来发送一个消息,还是我们这个内容类型的消息,好,我们来测试一下,我来先来清空控制台,我们现在来发送一下这个消息,走,我们还是发了一个order return的这个实体类消息,好,我们单元测试这一块提示我们的消息发送成功,来看我们控制台,我们接收到这个消息,Body,诶,我发现呢,接收到的这个消息,我们当时在打印的时候,这一块的详细信息,Message我们可以打印到,我们主要来看它的内容,我们一直来往下翻它的这块内容,诶直接是我们这个实体类型,我们可以直接打印过来,所以呢,我们当时发的是什么类型的消息,我们在这儿直接写这个消息类型也能自动转化。
11:23
加第三个它还可以写一个参数,这个参数呢叫这个channel,好我们来写一下叫channel通道,这个通道注意我们用的是这个amqp client这个客户端这个接口,所以我们用的是这个通道,那这个通道的意思是什么?我们都知道,我们收发消息呢,是我们这个客户端,客户端就是我们订单服务,订单服务只跟我们消息队列建立一条连接,那建立好连接以后,所有的数据传输都在通道里边,包括我们来看我们的控制台,订单服务我们现在是在启动过程中,好订单服务那在控制台里边看连接,诶我发现呢,现在连接只有一条链接,所以呢,我们一个客户端只会建立一条连接,所有的数据都在通道里边传输,所以我们这儿呢有一个通道,那接下来我们就可以获取到我们的这个通道,至于通道后来有什么用,我们后来再说,所以呢,这就是当前传输。
12:23
数据的通道,传输数据的咱们这个通道好,那我们在这个参输位置,我们就可以写这几种,当然这几种也是非常常用的,这是我们来接收消息,使用rabbit listener一个注解就行了,那接下来再来测试另外一个场景,我们以前说呢,我们消息队列最大的特点就是我们的这个队列QQ呢可以有很多人同时监听,相当于有很多的接收者,可以有很多好。可以很多。人都来监听,那都来监听,但是呢,最终我们消息队列里边的消息只能有一个人收到,而且收到我们消息队列里边就没消息了,就像我们之前我们一直来发送消息,而且我们控制台打印收到了这一块呢,就没消息了,所以可以很多人来监听,只要。
13:16
收到消息,收到消息,咱们这个队列就会删除消息,队列删除消息,而且只能有一个人收到这个消息,而且只能有一个人。只能有一个收到此消息,收到此消息,那现在呢,我们来演示这么一种场景,如果是这种场景应该是怎么办?假设我们这个订单服务启动了多个,因为在分布式下,一个服务肯定会启动多个,订单服务启动多个,如果我们启动多个,那就应该是这样子的,我订单服务一号机器,二号机器,三号机器,当然订单服务呢,都拥有这一段的代码,相当于我们这三个订单客户端同时都来监听我们消息队列MQ里边的内容,那接下来我们MQ里边有消息是这三个都收到,还是只能有一个人收到?
14:11
我们接下来做一个测试,好我们把这个消息呢放在这儿,我们来模拟我们这个订单服务启动多个,现在呢,这有一个是9010端口们来把它的配置复制一份,好我们就将现在变成我们的9011订单端口,好我们想要变端口,我们就使用杠杠,我们的启动参数叫server.port所有配置文件里边都可以在这来点一个9011,好,这是另外一个订单服务来启动起来,那下来启动9011这个订单服务。行,我们把这两个呢都来启动一个这个订单,还有我们这一个订单,好我们两个同时启动,只要订单服务我们现在同时启动起来,那接下来我们再来测试一下我们消息的接收,我们看是谁能收到消息,我们来等待他们两个都来启动成功,好两个服务呢都启动成功,我们来清空控制台,只要有人接到消息,我们相应的控制台就会有打印,那么现在来发消息,我们把这个消息呢多发几份,比如这有一个消息来写一个for循环来进行发送,好for我们从in特I从零开始到I小于十,我们就来发上十个消息,诶加价,然后呢,我们来发送这十个消息,那为了区别我们这十个消息,先把这一块呢提出来放在这儿,然后我们来这个消息,我们把这一块呢都拿过来,想要发送十个消息,那这十个消息,每一个消息的名字都是,哈哈,来加上当前的A数字啊。
15:46
那现在来看一下我们这个消息发送,我们来启动消息发送走,那现在会发十条消息,来看这十条消息都是哪些人收到。是都能收到呢,还只有一个人能收到这些消息,好我们来看控制台,我控制台呢,打印消息呢,都已经发送成功了,零到九号消息们来看我们收到的内容,第一个9010他收到了147号消息,9011他收到了0369号消息,但是虽然两个客户端都收到消息,但是同一个消息只能被一个人收到,确定没问题,但是我们现在这好像还有一个小问题,启动多个,但是呢,同一个消息,同一个消息只能有一个收到,只能有一个客户端收到,所以他们呢,相当于是一个竞争关系,谁手快谁收到的就多,但是我们虽然是竞争关系,我们发现这是第一个是147收到三个,第二个呢收到四个,其实收到了七个消息,好像漏了三个消息,这三个消息是丢失了吗?其实不是的,是我们单元测试这一块的问题。我们单元测试。
16:57
因为我们只要一测试,相当于STEM boot这些服务呢,又启动一遍,所以我们只要单元测试运行期间,相当于我们还启动了一个客户端,但是我们来看整个我们这一块单元测试的这个启动,他这里边有没有收到一些消息,诶我们发现258消息确实被我们单元测试给收到了,而其他人收到的这个147和0369是被其他人收到了,所以呢,消息是没有丢的,只是我们单元测试的问题,这是我们来测试来接收消息,我们第一个场景,也就说我们只能一个消息,只能有一个客户端收到,第二个场景我们再来测试一下,因为我们收到消息以后呢,我们可能要进行一段的业务逻辑处理,不是我们非常简单的c out,控制台打印,假设我们来模拟我们这个业务处理很耗时,我们在耗时期间能不能接收其他消息,好,我来停上一个客户端,我们就留上这个90101个客户端来模拟。
17:57
这块打印我们来模拟时间很长,我们system点一个sleep,好,我们使用thread sleep thread sleep我们当线程呢来模拟要很长时间,Thread sleep,我们来给它睡上三秒,我们来写一个3000,这是一个毫秒值。好我们来看一下我们在业务处理期间是不是还能收到别的消息,我们来启动一下我们的这个订单,这个我们把这个消息的这一块接收,我们打印到前边,然后呢,我们在后边就打印消息处理完成,这个就叫消息处理完成,那哪一个消息,我们把这个消息内容打印过来,好我们就直接用content里边的get name,我们这个name呢标志不一样,接收到消息我们也不打印这么多了,就直接打印它的content,来看一下我们的最终效果,来启动订单服务,我们呢,还是来发上十个消息,来看这十个消息,第一个消息在处理期间有没有再来接收,其他。
18:57
发消息,好,我们来到我们的单人测试,等待他控制台来重新启动,来到我们的单人测试,这是我们单人测试这个控制台,我们来等待它启动成功,好,我们来清空控制台,那现在再来给他发上我们十个消息,这十个消息呢,先来看它们是怎么样进行结束处理的。
19:16
走好,我们这个消息呢,都发送完成,来看一下我们这一块,诶我们发现呢,只有一个消息处理完,第二个消息呢,就会到达,一个消息处理完第二个消息就会到达,所以呢,我们这个接消息,那就是只有当我们当前的所有这个接消息,我们的这个方法处理完了,释放了,我们才可以接下一个消息,所以我们整个控制台的打印是非常有序的,一个消息处理完成,那下一个消息再接收,再处理完成再接收,不是说一次批量接收到很多消息。所以接下来我们在这一块第二个场景就是呢,只有只有咱们这个一个消息,一个消息完全处理完,完全处理完,我们这个处理完,相当于我们这个方法运行完,我们才可以来接下一个消息,方法运行结束,我们就可以来接下一个消息,那就可以接收到下一个消息。
20:15
下一个消息,好,这就是我们使用rabbit listener来接收消息,而且呢在spring加还有一个注解叫handler来可以给大家看一下,还有一个注解呢,叫rabbit handler,这个呢大家也来了解一下rabbit handler,那么翻译过来就叫rabbit处理器来看,它呢可以标在我们这个方法位置,但是rabbit listener我们来看它能标在什么位置,它可以标在我们类位置,所以呢,我们发现这两个关系应该是这样子的,我们以后呢,Rabbit handle用的也非常多,来说一下,我们来接消息呢,两个注解我们直接到主类里边说rabbit listener可以标在我们的类加我们这个方法上,我们都可以标,而我们还有一个注解叫rabbit handler,它呢,只能标在方法上,它们的区别是什么?每一个例子呢,大家就会非常清楚的rabbit handler。它可以标在。
21:16
标在方法上。来给大家举这么一个例子,既然能标在类上,所以呢我们就可以在order service这个类上,我们将这一块呢来标注过来,我们说这个类呢,它可以来监听我们hello Java这个消息队列的内容,好,我们把之前的这个位置我们挪到这儿了,我们把这个注掉,然后呢,接下来接消息到底是哪个方法接消息我们可以给这个方法上标一个rabbit handle,到了那些同学说这么标不麻烦吗?直接给上它上面标一个rabbit listener不就可以了吗?但是接下来的场景就在这,如果说我们可以接收到多种类型的消息,比如我们可以发一个order return reason的这个实体类,我们可能在发消息的时候,比如我们来判断if,我们I对二,我们这个求余求与等等,零偶数的话呢,我们给你发的是这个消息,好,我们把这个消息拿过来,如果奇数呢,我们发的是另外一个类型的消息,相当于我们相当于有两种类型的。
22:21
的消息,可能呢,会在这个队列里边来存在,好else,或者呢,我们监听了两个队列,这两个队列里边他们的消息类型不一样,这都好,我接下来写,让两种消息比,比如第二种消息就是你有一个order创建了一个订单,好,这个订单呢是我们这一款order en来把它也发出去,把这个订单呢拿过来们给它里边保存上一些东西就行了,来set一个叫order SN来写一个UU id.random u ID点一个to string来模拟一个订单号,好来把它也发出去,我们还是来调用这个方法,CTRLC,但是呢,我们发的是不同对象,那接下来我们想要接不同对象,按照以前我们挺麻烦的,我们来到这个service里边,如果我们在这儿来写这个对象类型,它又不能匹配,所以很麻烦,我们只能写成message,写成message以后拿到内容,然后进行判断,再来自己转换,现在呢就没有这么麻烦了。把这个方法。
23:21
我再来复制一个,好,我们就叫receive message2,然后呢,我们还给它标上一个叫rabbit handler,它呢也是一个接消息的方法,但是呢,我们在这个方法上相当于我们使用重载的方式,好我们比如拿过来,那它上一个方法,它说它只接的是order return reason这个类型的消息,而下一个我们说我们接的是order entity这个类型的消息,所以我们接下来把这个消息拿过来,大家看一下,接收到消息给大家看一下就行,那我们把这一块的休眠,我们也给它停掉来测试,所以相当于我们使用rabbit handleler标注了各种不同的接消息方法,我们在这儿呢,可以监听多个队列,每一个队列可能消息不一样,或者一个队列里边他们的消息也不一样,我们接下来就使用rabbit handleler来进行重载处理,我们说这个呢是来接受这种类型消息,这个呢接受这种类型消息来测试一下,好来同时来启动我们这个订单服务。
24:21
那服务启起来以后呢,接下来再来发上十个消息,来看一下每一个是不是都能打印过来,好,我们还是来到我们的单元测试来准备发消息,那先来看我们这个控制台,等待它启动成功,清空掉我们这个控制台,我们现在来做一个测试来发消息。我们调用一下发消息的这个测试方法,来发十条消息,它们的类型各自还不一样,好,我们这一块单元测试打印消息呢,发送完成,当然这十条消息有我们这个类型的,也有我们这个类型的,来我们控制台看一下,好,那这个order,这个控制台我们现在能看到,我们现在接收到了order reason这个试题,Entity没问题,那剩下这个类型的呢,来看一下,我们把这个代码得重新启动一下,走,你们重新改了代码,我们这个。
25:09
控制台,我们来重新启动看一下,或者呢,我们剩下的这个东西是不是被我们这个控制台收到了,我们来看一下,来往下翻,那单元测试的这一块控制台,哎,我们发现单元测试的这个控制台都收到了order entity,我们之前的这个控制台收到了order reason entity,相当于我们这两个方法呢,都起作用了,那为了明显期间我就不调用单元测试了,我们来专门来写一个controller来测试,我们发消息,那么就叫rabbit controller。好,我们这个controller它是一个rest controller,我们来写一个controller来进行一个完整测试,Public string就叫send MQ来发送MQ的这个消息,我们要发几个消息,我们在这来写一个数量in的number,你给我指定数量,我把消发消息的方法,想要发消息只需要注入rabbit tempilet就行了。然我们把rabbit tempilet拿过来at一个owa。
26:08
然后我们把发消息的方法我们复制过来,我们在这儿来进行调用control c好,们来到我们的这个controller里边。走,那么这个消息只要发送完成,那么这一块呢,它叫rabbit tempt消息呢,只要发送完成,我们就给这来return一个OK,那么这个OK呢。直接返回出去,它是一个rest ctrler,好,我们来发一个get map get map,我们就叫send MQ,你给我发送这个请求,传入我们的这个参数,我就给你来发消息,到底发几个消息,哎呢,就小于我们指定的这个number,好给我传一个参数,Request和参数呢,我们需要一个叫number,如果没有传,那默认值,比如我们就给一个十,好现在我们来启动一下我们这个服务,我们不要单元测试来走了,单元测试它还会收一些消息,我们不方便看。
27:07
那现在呢,就以请求的方式,我们就直接来访问我们这个订单这一块的send MQ请求,我们来顶,等待订单服务启动完成,我们来清空控制台,来发送这个请求,来测试一下好9010端口,那接下来要调一个我们的这个请求send MQ。他呢帮我们来模拟发十个消息走好,我们消息发送成功了,然后我们来看控制台接收到的,我发现呢,接收到的有enity,也有reason entity,所以呢我们直接可以使用这个rabbit handler来区分各种不同的消息,所以我们最终的使用场景就是rabbit listener加上rabbit handle了,标在方法上可以来重载不同的消息,重载区分不同的这个消息,而我们它标在类上来说明我们要监听哪些队列,监听哪些队列即可,好。
28:03
这就是我们boot来整合rabbit MQ收发消息。
我来说两句