前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ笔记(一)-基于SpringBoot使用RabbitMQ以及原理详解

RabbitMQ笔记(一)-基于SpringBoot使用RabbitMQ以及原理详解

作者头像
yingzi_code
发布2019-08-31 12:30:38
6320
发布2019-08-31 12:30:38
举报

RabbitMQ 使用与详解

RabbitMQ参考中文文档

1. RabbitMQ原理详解
  • Producer(生产者),产生消息并向RabbitMq发送消息
  • Consumer(消费者),等待RabbitMq消息到来并处理消息
  • Queue(队列), 依存于RabbitMQ内部, 虽然消息通过RabbitMQ在你的应用中传递,但是它们只能存储在queue中
  • message acknowledgment,我们可以要求消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除
  • message durability,我们可以将Queue与Message都设置为可持久化的(durable),这样可以保证绝大部分情况下我们的RabbitMQ消息不会丢失。
  • Prefetch Count,如果有多个消费者同时订阅同一个Queue中的消息,Queue中的消息会被平摊给多个消费者。这时如果每个消息的处理时间不同,就有可能会导致某些消费者一直在忙,而另外一些消费者很快就处理完手头工作并一直空闲的情况。我们可以通过设置prefetchCount来限制Queue每次发送给每个消费者的消息数,比如我们设置prefetchCount=1,则Queue每次给每个消费者发送一条消息;消费者处理完这条消息后Queue会再给该消费者发送一条消息。
  • Exchange(交换器),生产者将消息发送到Exchange(交换器,下图中的X),由Exchange将消息路由到一个或多个Queue中(或者丢弃)
  • routing key,生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,而这个routing key需要与Exchange Type及binding key联合使用才能最终生效,RabbitMQ为routing key设定的长度限制为255 bytes
  • Binding,RabbitMQ中通过Binding将Exchange与Queue关联起来,这样RabbitMQ就知道如何正确地将消息路由到指定的Queue了
  • binding key,在绑定(Binding)Exchange与Queue的同时,一般会指定一个binding key,binding key 并不是在所有情况下都生效,它依赖于Exchange Type,比如fanout类型的Exchange就会无视binding key,而是将消息路由到所有绑定到该Exchange的Queue。
  • Exchange Type,常见的有fanout、direct、topic、headers这四种 fanout fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中

direct 把消息路由到那些binding key与routing key完全匹配的Queue中

topic

​ 把消息路由到那些binding key与routing key模糊匹配的Queue中

匹配规则:

  1. routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“aa.bb.cc
  2. binding key与routing key一样也是句点号“. ”分隔的字符串
  3. binding key中可以存在两种特殊字符“”与“#”,用于做模糊匹配,其中“”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)

header

代码语言:javascript
复制
headers类型的Exchange不依赖于routingkey与binding key的匹配规则来路由消息,而是根据发送的消息内容中的					headers属性进行匹配。
2. 运行RabbitMQ

使用docker运行,要使用管理页面用management的版本

代码语言:javascript
复制
docker run -d --name rabbitmq --publish 5671:5671 --publish 5672:5672 --publish 4369:4369 --publish 25672:25672 --publish 15671:15671 --publish 15672:15672 rabbitmq:management

管理页面默认用户名和密码都是guest

3. 创建QUEUE

点击Queues,Add a new queue

填入queue名称保存即可

4. 创建Exchange

点击Exchanges,Add a new exchange

输入Echange名称,选择type

保存即可

5. 绑定queue和exchange

点击刚才创建的exchange,Bindings下面填入queue的名称和Routing Key即可

6. 创建springboot程序来收发消息

pom.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.mt.demo</groupId>
	<artifactId>spring-boot-rabbitmq-demo</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>spring-boot-rabbitmq-demo</name>
	<description>Demo project for Spring Boot</description>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.0.6.RELEASE</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<version>1.16.14</version>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>

application.yml

代码语言:javascript
复制
spring:
  application:
    name: rabbitmq-demo
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    publisher-confirms: true #生产者可以判断消息是否发送到了broker
    publisher-returns: true #生产者可以判断消息是否发送到了queue
    listener:
      direct:
        acknowledge-mode: manual
      simple:
        acknowledge-mode: manual
server:
  port: 10001

先在RabbitMQ管理页面上创建hello的队列,并且使用绑定到topic交换器上

创建一个消费者

代码语言:javascript
复制
@Slf4j
@Component
@RabbitListener(queues = "hello")
public class HelloListener {

    @RabbitHandler
    public void process(String hello) {
        log.info("Receiver: {}", hello);
    }
}

创建一个生产者

代码语言:javascript
复制
@GetMapping("/send")
public void send(@RequestParam String topic, @RequestParam String route, @RequestParam String msg) {
    log.info("send topic[{}], msg: {}", topic, msg);

    rabbitTemplate.convertAndSend(topic, route, msg);
}

如果再创建一个消费者绑定同样的队列,则可以看到两个消费者交替收到消息

代码语言:javascript
复制
@Slf4j
@Component
@RabbitListener(queues = "hello")
public class HelloListener2 {

    @RabbitHandler
    public void process(String hello) {
        log.info("Receiver2: {}", hello);
    }
}

如果再创建一个queue和前一个使用一样的bindingkey,则发送的消息会同是发送进两个queue

配置RabbitTemplate,加入消息确认机制回调

代码语言:javascript
复制
@Autowired
private ReturnCallBackListener returnCallBackListener;

@Autowired
private ConfirmCallbackListener confirmCallbackListener;

@Bean
public RabbitTemplate getRabbitTemplate(CachingConnectionFactory connectionFactory){
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setConfirmCallback(confirmCallbackListener);
    /**
    * 当mandatory标志位设置为true时
    * 如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息
    * 那么broker会调用basic.return方法将消息返还给生产者
    * 当mandatory设置为false时,出现上述情况broker会直接将消息丢弃
    */
    rabbitTemplate.setMandatory(true);
    rabbitTemplate.setReturnCallback(returnCallBackListener);

    return rabbitTemplate;
}

ConfirmCallback: ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调

ReturnCallback:ReturnCallback接口用于实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调

完整代码参考GITHUB

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018年11月01日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. RabbitMQ原理详解
  • 2. 运行RabbitMQ
  • 3. 创建QUEUE
  • 4. 创建Exchange
  • 5. 绑定queue和exchange
  • 6. 创建springboot程序来收发消息
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档