前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spring和RabbitMQ消息队列(AMQP)整合详解

Spring和RabbitMQ消息队列(AMQP)整合详解

作者头像
品茗IT
发布2019-09-12 10:35:14
1.7K0
发布2019-09-12 10:35:14
举报
文章被收录于专栏:品茗IT品茗IT

Spring和RabbitMQ消息队列(AMQP)整合详解

官方主页

Spring AMQP

一、概述

消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。对于消息中间件,常见的角色大致也就有Producer(生产者)、Consumer(消费者)

常见的消息中间件产品:

(1)ActiveMQ

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。这里介绍的是ActiveMQ的使用。

(2)RabbitMQ

AMQP协议的领导实现,支持多种场景。淘宝的MySQL集群内部有使用它进行通讯,OpenStack开源云平台的通信组件,最先在金融行业得到运用。

(3)ZeroMQ

史上最快的消息队列系统

(4)Kafka

Apache下的一个子项目 。特点:高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统。适合处理海量数据。

Jms

JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

AMQP

AMQP是一种协议,更准确的说是一种binary wire-level protocol(链接协议)。这是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。这使得实现了AMQP的provider天然性就是跨平台的。意味着我们可以使用Java的AMQP provider,同时使用一个python的producer加一个rubby的consumer。从这一点看,AQMP可以用http来进行类比,不关心实现的语言,只要大家都按照相应的数据格式去发送报文请求,不同语言的client均可以和不同语言的server链接。

上一篇《Spring和ActiveMq消息队列整合详解》介绍了ActiveMq的整合

本篇通过介绍下RabbitMQ的整合过程。

建议访问首发地址查看,自动生成目录树方便查看章节。

代码可以在Spring组件化构建https://www.pomit.cn/java/spring/spring.html中的RabbitMQ组件中查看,并下载。

**如果大家正在寻找一个java的学习环境,或者在开发中遇到困难,可以<a

href="https://jq.qq.com/?_wv=1027&k=52sgH1J"

target="_blank">

加入我们的java学习圈,点击即可加入

</a>

,共同学习,节约学习时间,减少很多在学习中遇到的难题。**

二、环境配置

2.1 RabbitMQ的安装部署

RabbitMQ可以在RabbitMQ官网 下载并安装。

这里只说windows的安装过程:

  • 1.下载rabbitmq-server-3.7.15.exe,下载完成后,双击安装,会提示先安装erlang,erlang的官方地址是:https://www.erlang.org/downloads
  • 2.安装完erlang,然后安装RabbitMQ,安装过程没有任何需要注意的,就是一路往下。
  • 3.RabbitMQ安装完成后。找到RabbitMQ的路径:

比如我的windows10会出现这个:

在这里插入图片描述

然后我点击RabbitMQ Service - start:

在这里插入图片描述

为了启动管理服务,查看RabbitMQ的界面,我们需要打开RabbitMQ Command Prompt命令行,然后切换到RabbitMQ安装目录下的sbin目录,输入命令rabbitmq-plugins.bat enable rabbitmq_management:

在这里插入图片描述

这里不建议自己启动命令提示符,或者使用powershell,因为会出现ERLANG_HOME not set correctly.错误。

浏览器输入http://127.0.0.1:15672/进行访问:

在这里插入图片描述

使用默认账号登录:guest/guest

在这里插入图片描述

到这里,安装工作搞一段落了。

2.2 Web项目建立

本文假设你已经引入Spring必备的一切了,已经是个Spring项目了,如果不会搭建,可以打开这篇文章看一看《Spring和Spring Mvc 5整合详解》

本项目将RabbitMQ的exchange三种模式的生产者和消费者都放在一个项目中,通过调用web接口发送消息,并监听每个队列的消息。

2.2.1 maven依赖
代码语言:javascript
复制
<?xml version="1.0"?>
<project
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
	xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>cn.pomit</groupId>
		<artifactId>SpringWork</artifactId>
		<version>0.0.1-SNAPSHOT</version>
	</parent>
	<artifactId>RabbitMQ</artifactId>
	<packaging>jar</packaging>
	<name>RabbitMQ</name>
	<url>http://maven.apache.org</url>
	<properties>
		<spring-rabbit.version>2.1.7.RELEASE</spring-rabbit.version>
		<jsonlib.version>2.4</jsonlib.version>
		<spring.security.version>5.1.0.RELEASE</spring.security.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-webmvc</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.amqp</groupId>
			<artifactId>spring-rabbit</artifactId>
			<version>${spring-rabbit.version}</version>
		</dependency>
		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>fastjson</artifactId>
		</dependency>
	</dependencies>
</project>

父模块可以在https://www.pomit.cn/spring/SpringWork/pom.xml获取。

三、Direct模式

3.1 配置

spring-rabbitmq.xml:

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
	xsi:schemaLocation="http://www.springframework.org/schema/rabbit
           https://www.springframework.org/schema/rabbit/spring-rabbit.xsd
           http://www.springframework.org/schema/beans
           https://www.springframework.org/schema/beans/spring-beans.xsd">

	<bean id="annotationPropertyConfigurerAmqp"
		class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
		<property name="order" value="1" />
		<property name="ignoreUnresolvablePlaceholders" value="true" />
		<property name="locations">
			<list>
				<value>classpath:rabbit.properties</value>
			</list>
		</property>
	</bean>

	<bean id="jsonMessageConverter"
		class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />

	<rabbit:connection-factory id="connectionFactory"
		host="${rabbit.host}" port="${rabbit.port}" username="${rabbit.user}"
		password="${rabbit.password}" />

	<rabbit:admin connection-factory="connectionFactory" />

	<!-- queue 队列声明 -->
	<rabbit:queue durable="true" auto-delete="false"
		exclusive="false" name="${rabbit.direct.queue}" />
	<!-- exchange queue binging key 绑定 -->
	<rabbit:direct-exchange name="${rabbit.direct.exchange}"
		durable="true" auto-delete="false">
		<rabbit:bindings>
			<rabbit:binding queue="${rabbit.direct.queue}" key="${rabbit.direct.key}" />
		</rabbit:bindings>
	</rabbit:direct-exchange>
	<!--fanout spring template声明 -->
	<rabbit:template id="amqpDirectTemplate" exchange="${rabbit.direct.exchange}"
		routing-key="${rabbit.direct.key}" connection-factory="connectionFactory"
		message-converter="jsonMessageConverter" />

	<rabbit:annotation-driven />

	<bean id="rabbitListenerContainerFactory"
		class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
		<property name="connectionFactory" ref="connectionFactory" />
		<property name="concurrentConsumers" value="3" />
		<property name="maxConcurrentConsumers" value="10" />
		<property name="messageConverter" ref="jsonMessageConverter" />

	</bean>
</beans>

这里面的配置可以分为连接、管理、队列、exchange、发送、容器、数据转换这几类:

  • 连接:connectionFactory定义了连接工厂。
  • 管理:rabbit:admin 是rabbit对所有队列、exchange的管理,这个过程是由Spring自动完成的。
  • 队列:rabbit:queue定义了一个队列,队列只是负责接收消息。
  • exchange:交换器,rabbit:direct-exchange定义了一个点对点的传输过程。这个exchange要和队列进行绑定,routing-key是路由标识,如果使用rabbit:fanout-exchange,则不能有routing-key。rabbit:topic-exchange模式下可以有routing-key,而且routing-key可以使用模糊匹配。
  • 发送:rabbit:template负责发送消息,和exchange和routing-key(fanout-exchange不必须)绑定。
  • 容器:jmsContainer是将消息队列和监听bean整合起来,这样就保证用的时候能找到对应的bean。
  • 数据转换:jsonMessageConverter的bean使用Jackson2JsonMessageConverter,可以将发送和接收消息自动转为实体。

<rabbit:annotation-driven /> 以开启注解,我们把监听者用注解来启动,用配置文件不够灵活。

rabbit.properties配置文件:

代码语言:javascript
复制
rabbit.host=127.0.0.1
rabbit.port=5672
rabbit.user=guest
rabbit.password=guest

rabbit.direct.queue=spring.queue.direct
rabbit.direct.exchange=spring.exchange.direct
rabbit.direct.key=spring.key.direct

3.2 发送方(生产者)

我们使用web服务进行调用发送消息。

RabbitWeb :

代码语言:javascript
复制
package cn.pomit.springwork.rabbitmq.web;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import cn.pomit.springwork.rabbitmq.dto.ParamReq;
import cn.pomit.springwork.rabbitmq.service.PomitDirectMessageService;

@RestController
@RequestMapping("/rabbit")
public class RabbitWeb {
	@Autowired
	PomitDirectMessageService pomitDirectMessageService;

	@RequestMapping(value = "/direct", method = { RequestMethod.GET })
	public String direct() {
		ParamReq paramReq = new ParamReq();
		paramReq.setContent("asdasdxbcvbcb");
		paramReq.setMessageType("direct");
		pomitDirectMessageService.sendMessage(paramReq);
		return "0000";
	}
}

PomitDirectMessageService :

代码语言:javascript
复制
package cn.pomit.springwork.rabbitmq.service;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;

import cn.pomit.springwork.rabbitmq.dto.ParamReq;

@Service
public class PomitDirectMessageService {
	@Autowired
	@Qualifier("amqpDirectTemplate")
	AmqpTemplate amqpTemplate;
	
	public void sendMessage(ParamReq paramReq){
		amqpTemplate.convertAndSend(paramReq);
	}
}

这里面的amqpTemplate使用配置文件中的amqpDirectTemplate。点对点进行发送。

3.3 监听者(消费者)

为了灵活方便监听数据,我们使用注解来完成这一处理,而且配置文件中配置了messageConverter之后,Spring能够自动将数据转为实体。

PomitMessageListener:

代码语言:javascript
复制
package cn.pomit.springwork.rabbitmq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import cn.pomit.springwork.rabbitmq.dto.ParamReq;

@Component
public class PomitMessageListener{

	@RabbitListener(queues = "${rabbit.direct.queue}")
	public void handleMessage(ParamReq paramReq) {
		
		System.out.println(paramReq);
	}

}

四、Fanout模式

在上面的Direct模式中,稍微加点东西,就可以变为fanout模式。

4.1 配置

在spring-rabbit.xml中加入下面配置:

代码语言:javascript
复制
<!--广播 queue 队列声明 -->
	<rabbit:queue durable="true" auto-delete="false"
		exclusive="false" name="${rabbit.common.queue}" />
	<rabbit:queue durable="true" auto-delete="false"
		exclusive="false" name="${rabbit.common.queue2}" />


	<!--fanout exchange queue binging key 绑定 -->
	<rabbit:fanout-exchange name="${rabbit.fanout.exchange}"
		durable="true" auto-delete="false">
		<rabbit:bindings>
			<rabbit:binding queue="${rabbit.common.queue}" />
			<rabbit:binding queue="${rabbit.common.queue2}" />
		</rabbit:bindings>
	</rabbit:fanout-exchange>
	<!--fanout spring template声明 -->
	<rabbit:template id="amqpFanoutTemplate" exchange="${rabbit.fanout.exchange}"
		connection-factory="connectionFactory" message-converter="jsonMessageConverter" />

在rabbit.pproperities中配置文件中加入:

代码语言:javascript
复制
rabbit.common.queue=spring.queue.common
rabbit.common.queue.pattern=spring.key.*
rabbit.fanout.exchange=spring.exchange.fanout

4.2 发送方(生产者)

我们使用web服务进行调用发送消息。RabbitWeb中加入发送fanout模式消息的接口:

代码语言:javascript
复制
@Autowired
PomitTopicMessageService pomitTopicMessageService;
代码语言:javascript
复制
@RequestMapping(value = "/fanout", method = { RequestMethod.GET })
public String fanout() {
    ParamReq paramReq = new ParamReq();
    paramReq.setContent("asdasdxbcvbcb");
    paramReq.setMessageType("fanout");
    pomitFanoutMessageService.sendMessage(paramReq);
    return "0000";
}

PomitFanoutMessageService:

代码语言:javascript
复制
package cn.pomit.springwork.rabbitmq.service;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;

import cn.pomit.springwork.rabbitmq.dto.ParamReq;

@Service
public class PomitFanoutMessageService {
	@Autowired
	@Qualifier("amqpFanoutTemplate")
	AmqpTemplate amqpTemplate;
	
	public void sendMessage(ParamReq paramReq){
		amqpTemplate.convertAndSend(paramReq);
	}
}

这里面的amqpTemplate使用配置文件中的amqpFanoutTemplate。广播进行发送。

4.3 监听者(消费者)

为了灵活方便监听数据,我们使用注解来完成这一处理,而且配置文件中配置了messageConverter之后,Spring能够自动将数据转为实体。

在PomitMessageListener中加入监听即可:

代码语言:javascript
复制
   @RabbitListener(queues = "${rabbit.common.queue}")
	public void handleCommonMessage(ParamReq paramReq) {
		
		System.out.println("一、" + paramReq);
	}

	@RabbitListener(queues = "${rabbit.common.queue2}")
	public void handleCommonMessage2(ParamReq paramReq) {
		
		System.out.println("二、" + paramReq);
	}

五、Topic模式

在上面的Fanout模式中,稍微加点东西,就可以变为Topic模式。

5.1 配置

在spring-rabbit.xml中加入下面topic的配置:

在这里插入图片描述

如图所示,可以个fanout模式和direct模式公用同一个队列,但是这里的topic绑定队列要求填写pattern。

pattern是routing-key的模糊匹配写法,你可以直接写成routing-key,这样发送的时候如果绑定了routing-key,就能直接接收到,也可以写成xxx.,.xxx,这样只要能模糊匹配到发送的routing-key,就能接收到,否则绑定的队列是接收不到的。

在rabbit.pproperities中配置文件中加入:

代码语言:javascript
复制
rabbit.common.queue2=spring.queue.common2
rabbit.common.queue2.pattern=*.key.topic
rabbit.topic.exchange=spring.exchange.topic
rabbit.topic.key=spring.key.topic

5.2 发送方(生产者)

我们使用web服务进行调用发送消息。RabbitWeb中加入发送fanout模式消息的接口:

代码语言:javascript
复制
@Autowired
PomitTopicMessageService pomitTopicMessageService;
代码语言:javascript
复制
@RequestMapping(value = "/topic1", method = { RequestMethod.GET })
public String topic1() {
	ParamReq paramReq = new ParamReq();
	paramReq.setContent("asdasdxbcvbcb");
	paramReq.setMessageType("topic1");
	pomitTopicMessageService.sendMessage(paramReq);
	return "0000";
}

@RequestMapping(value = "/topic2", method = { RequestMethod.GET })
public String topic2() {
	ParamReq paramReq = new ParamReq();
	paramReq.setContent("asdasdxbcvbcb");
	paramReq.setMessageType("topic2");
	pomitTopicMessageService.sendMessage2(paramReq);
	return "0000";
}

PomitTopicMessageService:

代码语言:javascript
复制
package cn.pomit.springwork.rabbitmq.service;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;

import cn.pomit.springwork.rabbitmq.dto.ParamReq;

@Service
public class PomitTopicMessageService {
	@Autowired
	@Qualifier("amqpTopicTemplate1")
	AmqpTemplate amqpTemplateTopic;
	
	@Autowired
	@Qualifier("amqpTopicTemplate2")
	AmqpTemplate amqpTemplateTopic2;
	
	public void sendMessage(ParamReq paramReq){
		amqpTemplateTopic.convertAndSend(paramReq);
	}
	
	public void sendMessage2(ParamReq paramReq){
		amqpTemplateTopic2.convertAndSend(paramReq);
	}
}

这里面的amqpTemplate使用配置文件中的两个amqpTemplateTopic、amqpTemplateTopic2。分别发送不同的routing-key进行测试。

5.3 监听者(消费者)

为了灵活方便监听数据,我们使用注解来完成这一处理,而且配置文件中配置了messageConverter之后,Spring能够自动将数据转为实体。

PomitMessageListener的监听可以不变,因为队列并没有新增,还是那三个。

六、过程中用到的实体

ParamReq:

代码语言:javascript
复制
package cn.pomit.springwork.rabbitmq.dto;

public class ParamReq {
	private String messageType;

	private String content;

	public String getMessageType() {
		return messageType;
	}

	public void setMessageType(String messageType) {
		this.messageType = messageType;
	}

	public String getContent() {
		return content;
	}

	public void setContent(String content) {
		this.content = content;
	}

	@Override
	public String toString() {
		return "ParamReq [messageType=" + messageType + ", content=" + content + "]";
	}

}

全部代码可以在Spring组件化构建https://www.pomit.cn/java/spring/spring.html中的RabbitMQ组件中查看,并下载。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-06-26 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Spring和RabbitMQ消息队列(AMQP)整合详解
  • 官方主页
  • 一、概述
  • 二、环境配置
    • 2.1 RabbitMQ的安装部署
      • 2.2 Web项目建立
        • 2.2.1 maven依赖
    • 三、Direct模式
      • 3.1 配置
        • 3.2 发送方(生产者)
          • 3.3 监听者(消费者)
          • 四、Fanout模式
            • 4.1 配置
              • 4.2 发送方(生产者)
                • 4.3 监听者(消费者)
                • 五、Topic模式
                  • 5.1 配置
                    • 5.2 发送方(生产者)
                      • 5.3 监听者(消费者)
                      • 六、过程中用到的实体
                      相关产品与服务
                      领券
                      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档