前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >SpringBoot入门建站全系列(十八)整合RabbitMQ(AMQP类消息队列)

SpringBoot入门建站全系列(十八)整合RabbitMQ(AMQP类消息队列)

原创
作者头像
品茗IT
修改2019-08-22 10:10:06
4870
修改2019-08-22 10:10:06
举报
文章被收录于专栏:品茗IT品茗IT

SpringBoot入门建站全系列(十八)整合RabbitMQ(AMQP类消息队列)

一、概述
1.1 简介

消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。对于消息中间件,常见的角色大致也就有Producer(生产者)、Consumer(消费者)

常见的消息中间件产品:

(1)ActiveMQ

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。这里介绍的是ActiveMQ的使用。

(2)RabbitMQ

AMQP协议的领导实现,支持多种场景。淘宝的MySQL集群内部有使用它进行通讯,OpenStack开源云平台的通信组件,最先在金融行业得到运用。

(3)ZeroMQ

史上最快的消息队列系统

(4)Kafka

Apache下的一个子项目 。特点:高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统。适合处理海量数据。

Jms

JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

AMQP

AMQP是一种协议,更准确的说是一种binary wire-level protocol(链接协议)。这是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。这使得实现了AMQP的provider天然性就是跨平台的。意味着我们可以使用Java的AMQP provider,同时使用一个python的producer加一个rubby的consumer。从这一点看,AQMP可以用http来进行类比,不关心实现的语言,只要大家都按照相应的数据格式去发送报文请求,不同语言的client均可以和不同语言的server链接。

代码可以在SpringBoot组件化构建https://www.pomit.cn/java/spring/springboot.html中的RabbitMQ组件中查看,并下载。

本篇将通过介绍RabbitMQ的三种模式的配合使用来介绍下Springboot如何整合RabbitMQ。

1.2 RabbitMQ基础概念

说基础概念,这里就不多废话了,想必大家都已经百度过了,我这里只说下关于本篇的用到的一些概念。

几个概念:queue、exchange、Binding、routingKey:

queue:队列,和用什么模式无关。

exchange:交换机,就是个代理,决定你用哪种模式,每个模式需要定义一个exchange。

Binding:队列绑定,需要将queue与exchange绑定,可以交叉绑定,一个Binding,就代表一个路径,比如queue绑定到direct模式的exchange,这个queue就可以接收direct模式的消息;queue绑定到topic模式的exchange,这个queue就可以接收topic模式的消息。

routingKey:路由key,等于一条消息的归类。通过这个匹配到与exchange绑定的queue。因为绑定的时候是指定了routingKey的类型。

三种模式:direct、fanout、topic。不是说只有这三种,是这里只说这三种常见的。

direct模式: 一对一模式,发送路径是固定的,一个routingKey只会转发到绑定到指定exchange并绑定到exchange的queue。

fanout模式: 一对多,没有routingKey,消息发送的时候发送到exchange,因为没有routingKey,exchange会转发到绑定到该exchange的所有queue。

topic模式:一对多,可以绑定多个key,也可以绑定模式匹配的key。消息发送的时候发送到exchange,exchange会根据绑定的所有key和模式转发到相应的queue。所以这个模式叫做订阅。

二、配置

本文假设你已经引入spring-boot-starter-web。已经是个SpringBoot项目了,如果不会搭建,可以打开这篇文章看一看《SpringBoot入门建站全系列(一)项目建立》

2.1 Maven依赖

使用RabbitMQ可以使用spring-boot-starter-amqp,方便快捷,一般springboot对大多数开源项目都做了整合,提供了专用的stater。

代码语言:txt
复制
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.2 配置文件

在application.properties 中需要配置RabbitMQ的信息,也可以配置自定义的配置,如:

代码语言:txt
复制
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

rabbit.direct.queue=spring.queue.direct
rabbit.direct.exchange=spring.exchange.direct
rabbit.direct.key=spring.key.direct

rabbit.common.queue=spring.queue.common
rabbit.common.queue.pattern=spring.key.*

rabbit.common.queue2=spring.queue.common2
rabbit.common.queue2.pattern=*.key.topic

rabbit.fanout.exchange=spring.exchange.fanout

rabbit.topic.exchange=spring.exchange.topic

rabbit.topic.key=spring.key.topic

这里面,

  • spring.rabbitmq.*是springboot自动装配的配置,rabbitmq的相关信息。
  • rabbit.direct.*是我自定义的配置,是direct模式下需要的配置。
  • rabbit.fanout.*是我自定义的配置,是fanout模式下需要的配置。
  • rabbit.topic.*是我自定义的配置,是topic模式下需要的配置。
  • rabbit.common.*是我自定义的配置,是topic和fanout模式下都可以使用的配置。

所以,这里定义了3个queue,2个key(fanout模式下不能使用key),2个key模式匹配(用于匹配topic模式下的key),三个exchange。概念可以会看1.1小节。

三、RabbitMQ的初始化配置

RabbitMQ需要将所有的queue、exchange、binding声明为bean。同时可以定义一个MessageConverter,用于消息的转换。

RabbitConfiguration :

代码语言:txt
复制
package com.cff.springbootwork.rabbitmq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@EnableRabbit
@Configuration
public class RabbitConfiguration {
	// 队列 分类配置
	@Value("${rabbit.direct.queue}")
	private String directQueueName;

	@Value("${rabbit.common.queue}")
	private String commonQueueName;

	@Value("${rabbit.common.queue2}")
	private String commonQueueName2;

	// key 分类配置
	@Value("${rabbit.direct.key}")
	private String rabbitDirectKey;

	@Value("${rabbit.common.queue.pattern}")
	private String rabbitCommonKeyPattern;

	@Value("${rabbit.common.queue2.pattern}")
	private String rabbitCommonKeyPattern2;

	// exchange 分类配置
	@Value("${rabbit.direct.exchange}")
	private String rabbitDirectExchange;

	@Value("${rabbit.fanout.exchange}")
	private String rabbitFanoutExchange;

	@Value("${rabbit.topic.exchange}")
	private String rabbitTopicExchange;

	// 队列声明
	@Bean(name = "directQueue")
	public Queue directQueue() {
		return new Queue(directQueueName);
	}

	@Bean(name = "commonQueueName")
	public Queue commonQueueName() {
		return new Queue(commonQueueName);
	}

	@Bean(name = "commonQueueName2")
	public Queue commonQueueName2() {
		return new Queue(commonQueueName2);
	}

	// exchange声明
	@Bean(name = "directExchange")
	public DirectExchange directExchange() {
		return new DirectExchange(rabbitDirectExchange);
	}

	@Bean(name = "fanoutExchange")
	public FanoutExchange fanoutExchange() {
		return new FanoutExchange(rabbitFanoutExchange);
	}

	@Bean(name = "topicExchange")
	public TopicExchange topicExchange() {
		return new TopicExchange(rabbitTopicExchange);
	}

	// 数据绑定声明
	@Bean(name = "directBinding")
	public Binding directBinding() {
		return BindingBuilder.bind(directQueue()).to(directExchange()).with(rabbitDirectKey);
	}

	@Bean(name = "fanoutBinding")
	public Binding fanoutBinding() {
		return BindingBuilder.bind(commonQueueName()).to(fanoutExchange());
	}

	@Bean(name = "fanoutBinding2")
	public Binding fanoutBinding2() {
		return BindingBuilder.bind(commonQueueName2()).to(fanoutExchange());
	}

	@Bean(name = "topicBinding1")
	public Binding topicBinding1() {
		return BindingBuilder.bind(commonQueueName()).to(topicExchange()).with(rabbitCommonKeyPattern);
	}

	@Bean(name = "topicBinding2")
	public Binding topicBinding2() {
		return BindingBuilder.bind(commonQueueName2()).to(topicExchange()).with(rabbitCommonKeyPattern2);
	}

	@Bean(name = "topicBinding3")
	public Binding topicBinding3() {
		return BindingBuilder.bind(directQueue()).to(topicExchange()).with(rabbitDirectKey);
	}

	// 数据转换声明
	@Bean
	public MessageConverter myMessageConverter() {
		return new Jackson2JsonMessageConverter();
	}

	@Bean
	public SimpleRabbitListenerContainerFactory myFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
			ConnectionFactory connectionFactory) {
		SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
		configurer.configure(factory, connectionFactory);
		factory.setMessageConverter(myMessageConverter());
		return factory;
	}
}

这里面:

  • Queue是声明队列,上面声明了三个队列。
  • DirectExchange是Direct模式下的Exchange;
  • FanoutExchange是Fanout模式下的Exchange;
  • TopicExchange是Topic模式下的Exchange;
  • Binding,这里定义了6个绑定,一个绑定到DirectExchange;两个绑定到FanoutExchange;三个绑定到TopicExchange;
  • 声明了一个MessageConverter,用于将实体转换为json,或者将json转换为实体。过程是spring控制自动转换的。
  • 声明一个SimpleRabbitListenerContainerFactory,将MessageConverter传递给SimpleRabbitListenerContainerFactory。

四、配置发送方(生产者)

下面介绍下三种模式是怎么发送数据的。

4.1 Direct模式

PomitDirectMessageService :

代码语言:txt
复制
package com.cff.springbootwork.rabbitmq.service;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import com.cff.springbootwork.rabbitmq.dto.ParamReq;

@Service
public class PomitDirectMessageService {
	@Value("${rabbit.direct.key}")
	private String rabbitDirectKey;	
	@Value("${rabbit.direct.exchange}")
	private String rabbitDirectExchange;
	
	@Autowired
	AmqpTemplate amqpTemplate;
	
	public void sendMessage(ParamReq paramReq){
		amqpTemplate.convertAndSend(rabbitDirectExchange,rabbitDirectKey,paramReq);
	}
}

这里的amqpTemplate是Spring自动生成的bean。消息的发送指定了rabbitDirectExchange、指定了rabbitDirectKey。

4.2 Fanout模式

PomitFanoutMessageService :

代码语言:txt
复制
package com.cff.springbootwork.rabbitmq.service;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import com.cff.springbootwork.rabbitmq.dto.ParamReq;

@Service
public class PomitFanoutMessageService {
	@Value("${rabbit.fanout.exchange}")
	private String rabbitFanoutExchange;
	@Autowired
	AmqpTemplate amqpTemplate;

	public void sendMessage(ParamReq paramReq) {
		amqpTemplate.convertAndSend(rabbitFanoutExchange, null, paramReq);
	}
}

这里的amqpTemplate是Spring自动生成的bean。消息的发送指定了rabbitFanoutExchange,并没有routingKey.

4.3 Topic模式

PomitTopicMessageService :

代码语言:txt
复制
package com.cff.springbootwork.rabbitmq.service;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import com.cff.springbootwork.rabbitmq.dto.ParamReq;

@Service
public class PomitTopicMessageService {
	@Value("${rabbit.topic.exchange}")
	private String rabbitTopicExchange;
	
	@Value("${rabbit.direct.key}")
	private String rabbitDirectKey;
	
	@Value("${rabbit.topic.key}")
	private String rabbitTopicKey;
	
	@Autowired
	AmqpTemplate amqpTemplateTopic;
	
	public void sendMessage(ParamReq paramReq){
		amqpTemplateTopic.convertAndSend(rabbitTopicExchange, rabbitDirectKey, paramReq);
	}
	
	public void sendMessage2(ParamReq paramReq){
		amqpTemplateTopic.convertAndSend(rabbitTopicExchange, rabbitTopicKey, paramReq);
	}
}

这里的amqpTemplate是Spring自动生成的bean。消息的发送指定了rabbitTopicExchange、指定了routingKey,这里发送了两个key,是为了分别测试指定key和模糊匹配key的接收情况。

五、消息接收方(消费者)

我么可以使用@RabbitListener来定义接收方,每一个queue定义一个@RabbitListener即可。因为queue是和模式无关的。参数可以直接传递实体过来,因为Spring根据我们上面定义的MessageConverter会自动将数据转换为实体。

PomitMessageListener:

代码语言:txt
复制
package com.cff.springbootwork.rabbitmq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import com.cff.springbootwork.rabbitmq.dto.ParamReq;

@Component
public class PomitMessageListener{

	@RabbitListener(queues = "${rabbit.direct.queue}")
	public void handleMessage(ParamReq paramReq) {
		
		System.out.println(paramReq);
	}
	@RabbitListener(queues = "${rabbit.common.queue}")
	public void handleCommonMessage(ParamReq paramReq) {
		
		System.out.println("一、" + paramReq);
	}

	@RabbitListener(queues = "${rabbit.common.queue2}")
	public void handleCommonMessage2(ParamReq paramReq) {
		
		System.out.println("二、" + paramReq);
	}
}

六、过程中用到的实体及测试类

ParamReq :

代码语言:txt
复制
package com.cff.springbootwork.rabbitmq.dto;

public class ParamReq {
	private String messageType;

	private String content;

	public String getMessageType() {
		return messageType;
	}

	public void setMessageType(String messageType) {
		this.messageType = messageType;
	}

	public String getContent() {
		return content;
	}

	public void setContent(String content) {
		this.content = content;
	}

	@Override
	public String toString() {
		return "ParamReq [messageType=" + messageType + ", content=" + content + "]";
	}

}

RabbitWeb:

代码语言:txt
复制
package com.cff.springbootwork.rabbitmq.web;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import com.cff.springbootwork.rabbitmq.dto.ParamReq;
import com.cff.springbootwork.rabbitmq.service.PomitDirectMessageService;
import com.cff.springbootwork.rabbitmq.service.PomitFanoutMessageService;
import com.cff.springbootwork.rabbitmq.service.PomitTopicMessageService;

@RestController
@RequestMapping("/rabbit")
public class RabbitWeb {
	@Autowired
	PomitDirectMessageService pomitDirectMessageService;
	
	@Autowired
	PomitFanoutMessageService pomitFanoutMessageService;
	
	@Autowired
	PomitTopicMessageService pomitTopicMessageService;

	@RequestMapping(value = "/direct", method = { RequestMethod.GET })
	public String direct() {
		ParamReq paramReq = new ParamReq();
		paramReq.setContent("asdasdxbcvbcb");
		paramReq.setMessageType("direct");
		pomitDirectMessageService.sendMessage(paramReq);
		return "0000";
	}
	
	@RequestMapping(value = "/fanout", method = { RequestMethod.GET })
	public String fanout() {
		ParamReq paramReq = new ParamReq();
		paramReq.setContent("asdasdxbcvbcb");
		paramReq.setMessageType("fanout");
		pomitFanoutMessageService.sendMessage(paramReq);
		return "0000";
	}
	
	@RequestMapping(value = "/topic1", method = { RequestMethod.GET })
	public String topic1() {
		ParamReq paramReq = new ParamReq();
		paramReq.setContent("asdasdxbcvbcb");
		paramReq.setMessageType("topic1");
		pomitTopicMessageService.sendMessage(paramReq);
		return "0000";
	}
	
	@RequestMapping(value = "/topic2", method = { RequestMethod.GET })
	public String topic2() {
		ParamReq paramReq = new ParamReq();
		paramReq.setContent("asdasdxbcvbcb");
		paramReq.setMessageType("topic2");
		pomitTopicMessageService.sendMessage2(paramReq);
		return "0000";
	}
}

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • SpringBoot入门建站全系列(十八)整合RabbitMQ(AMQP类消息队列)
    • 一、概述
      • 1.1 简介
        • 1.2 RabbitMQ基础概念
          • 二、配置
            • 2.1 Maven依赖
            • 2.2 配置文件
          • 三、RabbitMQ的初始化配置
            • 四、配置发送方(生产者)
              • 4.1 Direct模式
              • 4.2 Fanout模式
              • 4.3 Topic模式
            • 五、消息接收方(消费者)
              • 六、过程中用到的实体及测试类
              相关产品与服务
              消息队列 TDMQ
              消息队列 TDMQ (Tencent Distributed Message Queue)是腾讯基于 Apache Pulsar 自研的一个云原生消息中间件系列,其中包含兼容Pulsar、RabbitMQ、RocketMQ 等协议的消息队列子产品,得益于其底层计算与存储分离的架构,TDMQ 具备良好的弹性伸缩以及故障恢复能力。
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档