前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ与Spring的框架整合之Spring Boot实战

RabbitMQ与Spring的框架整合之Spring Boot实战

作者头像
别先生
发布2019-12-02 22:12:17
4770
发布2019-12-02 22:12:17
举报
文章被收录于专栏:别先生别先生

1、RabbitMQ与Spring的框架整合之Spring Boot实战。

首先创建maven项目的RabbitMQ的消息生产者rabbitmq-springboot-provider项目,配置pom.xml配置文件,如下所示:

代码语言:javascript
复制
 1 <project xmlns="http://maven.apache.org/POM/4.0.0"
 2     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 3     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
 4     http://maven.apache.org/xsd/maven-4.0.0.xsd">
 5     <modelVersion>4.0.0</modelVersion>
 6     <groupId>com.bie</groupId>
 7     <artifactId>rabbitmq-springboot-provider</artifactId>
 8     <version>0.0.1-SNAPSHOT</version>
 9 
10     <parent>
11         <groupId>org.springframework.boot</groupId>
12         <artifactId>spring-boot-starter-parent</artifactId>
13         <version>2.0.2.RELEASE</version>
14         <relativePath /> <!-- lookup parent from repository -->
15     </parent>
16 
17     <properties>
18         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
19         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
20         <java.version>1.8</java.version>
21     </properties>
22 
23     <dependencies>
24         <dependency>
25             <groupId>org.springframework.boot</groupId>
26             <artifactId>spring-boot-starter</artifactId>
27         </dependency>
28 
29         <dependency>
30             <groupId>org.springframework.boot</groupId>
31             <artifactId>spring-boot-starter-test</artifactId>
32             <scope>test</scope>
33         </dependency>
34         <dependency>
35             <groupId>org.springframework.boot</groupId>
36             <artifactId>spring-boot-starter-amqp</artifactId>
37         </dependency>
38     </dependencies>
39 
40     <build>
41         <plugins>
42             <plugin>
43                 <groupId>org.springframework.boot</groupId>
44                 <artifactId>spring-boot-maven-plugin</artifactId>
45             </plugin>
46         </plugins>
47     </build>
48 
49 </project>

修改rabbitmq-springboot-provider的配置文件application.yml,如下所示:

代码语言:javascript
复制
 1 spring:
 2   rabbitmq:
 3     addresses: 192.168.110.133:5672 # rabbitmq服务器的ip地址和端口号
 4     username: guest                 # rabbitmq服务器的账号
 5     password: guest                 # rabbitmq服务器的密码
 6     virtual-host: /                 # rabbitmq服务器的虚拟主机      
 7     connection-timeout:
 8       15000                         # rabbitmq服务器连接超时时间
 9     # publisher-confirms,实现一个监听器用于监听Broker端给我们返回的确认请求。RabbitTemplate.ConfirmCallback。  
10     publisher-confirms: true        # 消息确认模式
11     # publisher-returns,保证消息对Broker端是可达的,如果出现路由键不可达的情况,
12     # 则使用监听器对不可达的消息进行后续处理,保证消息的路由成功,RabbitTemplate.ReturnCallback
13     publisher-returns: true         # 消息返回模式
14     template: 
15       mandatory: true               # 配置mandatory=true保证监听有效。
16       

创建配置类,可以将bean添加到容器中,开启注解扫描。

代码语言:javascript
复制
 1 package com.bie.config;
 2 
 3 import org.springframework.context.annotation.ComponentScan;
 4 import org.springframework.context.annotation.Configuration;
 5 
 6 /**
 7  * 
 8  * @author biehl
 9  *
10  */
11 @Configuration // 配置类,可以将bean添加到容器中
12 @ComponentScan(basePackages = { "com.bie.*" }) // 扫描包注解
13 public class RabbitMQProducerConfig {
14 
15 }

创建实体类,用于测试消息的发送。

代码语言:javascript
复制
 1 package com.bie.po;
 2 
 3 import java.io.Serializable;
 4 
 5 /**
 6  * 
 7  * @author biehl
 8  *
 9  */
10 public class Order implements Serializable {
11 
12     /**
13      * 
14      */
15     private static final long serialVersionUID = 1L;
16 
17     private String id;
18     private String name;
19 
20     public String getId() {
21         return id;
22     }
23 
24     public void setId(String id) {
25         this.id = id;
26     }
27 
28     public String getName() {
29         return name;
30     }
31 
32     public void setName(String name) {
33         this.name = name;
34     }
35 
36     public Order(String id, String name) {
37         super();
38         this.id = id;
39         this.name = name;
40     }
41 
42     @Override
43     public String toString() {
44         return "Order [id=" + id + ", name=" + name + "]";
45     }
46 
47     public Order() {
48         super();
49     }
50 
51 }

创建RabbitMQ的生产者,用于消息的发送。

代码语言:javascript
复制
  1 package com.bie.producer;
  2 
  3 import java.text.SimpleDateFormat;
  4 import java.util.Date;
  5 import java.util.Map;
  6 import java.util.UUID;
  7 
  8 import org.springframework.amqp.rabbit.core.RabbitTemplate;
  9 import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
 10 import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
 11 import org.springframework.amqp.rabbit.support.CorrelationData;
 12 import org.springframework.beans.factory.annotation.Autowired;
 13 import org.springframework.messaging.Message;
 14 import org.springframework.messaging.MessageHeaders;
 15 import org.springframework.messaging.support.MessageBuilder;
 16 import org.springframework.stereotype.Component;
 17 
 18 import com.bie.po.Order;
 19 
 20 /**
 21  * 
 22  * @author biehl
 23  *
 24  */
 25 @Component
 26 public class RabbitMQProducerMessage {
 27 
 28     @Autowired
 29     private RabbitTemplate rabbitTemplate;
 30 
 31     // publisher-confirms,实现一个监听器用于监听Broker端给我们返回的确认请求。RabbitTemplate.ConfirmCallback。
 32     // publisher-returns,保证消息对Broker端是可达的,如果出现路由键不可达的情况,则使用监听器对不可达的消息进行后续处理,保证消息的路由成功,RabbitTemplate.ReturnCallback
 33     // 注意:在发送消息的时候对template进行配置mandatory=true保证监听有效。
 34     // 生产端还可以配置其他属性,比如发送重试、超时时间、次数、间隔等等。
 35 
 36     // 回调函数,confirm确认
 37     final ConfirmCallback confirmCallback = new ConfirmCallback() {
 38 
 39         @Override
 40         public void confirm(CorrelationData correlationData, boolean ack, String cause) {
 41             System.out.println("correlationData : " + correlationData);
 42             System.out.println("ack : " + ack);
 43             if (!ack) {
 44                 System.out.println("异常处理,将后续继续处理.......");
 45             }
 46             System.out.println();
 47         }
 48 
 49     };
 50 
 51     // 回调函数,return返回
 52     final ReturnCallback returnCallback = new ReturnCallback() {
 53 
 54         @Override
 55         public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
 56                 String exchange, String routingKey) {
 57             System.err.println("return exchange: " + exchange + ", routingKey: " + routingKey + ", replyCode: "
 58                     + replyCode + ", replyText: " + replyText);
 59         }
 60 
 61     };
 62 
 63     /**
 64      * 发送消息的方法
 65      * 
 66      * @param message
 67      * @param properties
 68      */
 69     public void send(Object message, Map<String, Object> properties) {
 70         // 设置消息头信息
 71         MessageHeaders messageHeaders = new MessageHeaders(properties);
 72         // 创建消息
 73         Message msg = MessageBuilder.createMessage(message, messageHeaders);
 74         // 消息确认和消息返回机制的回调
 75         rabbitTemplate.setConfirmCallback(confirmCallback);
 76         rabbitTemplate.setReturnCallback(returnCallback);
 77         // id + 时间戳的格式,保证全局唯一性
 78         CorrelationData correlationData = new CorrelationData();
 79         String id = UUID.randomUUID().toString();
 80         // 唯一性id,做ack可靠性投递的时候、补偿策略的时候,根据该id可以找到唯一条消息。
 81         correlationData.setId(id);
 82         String exchange = "exchange-1"; // 交换机名称。需要自己创建好该交换机,然后创建一个队列,使用路由键将该交换机和队列进行绑定即可。
 83         String routingkey = "springboot.helloRabbitmq"; // 路由键
 84         rabbitTemplate.convertAndSend(exchange, routingkey, msg, correlationData);
 85     }
 86 
 87     /**
 88      * 
 89      * @param order
 90      */
 91     public void sendOrder(Order order) {
 92         // 消息确认和消息返回机制的回调
 93         rabbitTemplate.setConfirmCallback(confirmCallback);
 94         rabbitTemplate.setReturnCallback(returnCallback);
 95         // id + 时间戳的格式,保证全局唯一性
 96         CorrelationData correlationData = new CorrelationData();
 97         String id = UUID.randomUUID().toString();
 98         correlationData.setId(id);
 99         rabbitTemplate.convertAndSend("exchange-1", "springboot.def", order, correlationData);
100     }
101 
102 }

创建主启动类,进行项目的启动,如下所示:

代码语言:javascript
复制
 1 package com.bie;
 2 
 3 import org.springframework.boot.SpringApplication;
 4 import org.springframework.boot.autoconfigure.SpringBootApplication;
 5 
 6 /**
 7  * 
 8  * @author biehl
 9  *
10  */
11 @SpringBootApplication
12 public class SpringBootRabbitMQProviderApplication {
13 
14     public static void main(String[] args) {
15         SpringApplication.run(SpringBootRabbitMQProviderApplication.class, args);
16     }
17 
18 }

创建生产者的测试类,进行生产者消息的发送。

代码语言:javascript
复制
 1 package com.bie.springboot;
 2 
 3 import java.text.SimpleDateFormat;
 4 import java.util.Date;
 5 import java.util.HashMap;
 6 import java.util.Map;
 7 
 8 import org.junit.Test;
 9 import org.junit.runner.RunWith;
10 import org.springframework.beans.factory.annotation.Autowired;
11 import org.springframework.boot.test.context.SpringBootTest;
12 import org.springframework.test.context.junit4.SpringRunner;
13 
14 import com.bie.po.Order;
15 import com.bie.producer.RabbitMQProducerMessage;
16 
17 @RunWith(SpringRunner.class)
18 @SpringBootTest
19 public class ApplicationTests {
20 
21     @Autowired
22     private RabbitMQProducerMessage rabbitMQProducerMessage;
23 
24     private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
25 
26     @Test
27     public void rabbitMQProducerMessage() throws Exception {
28         Map<String, Object> properties = new HashMap<>();
29         properties.put("idCard", "410725195545815685x");
30         properties.put("createdate", simpleDateFormat.format(new Date()));
31         for (int i = 0; i < 1000; i++) {
32             rabbitMQProducerMessage.send("Hello RabbitMQ For Spring Boot!" + i, properties);
33         }
34 
35         // 线程休眠,消息ack确认
36         // Thread.sleep(500000);
37     }
38 
39     @Test
40     public void rabbitMQProducerOrder() throws Exception {
41         Order order = new Order("001", "第一个订单");
42         rabbitMQProducerMessage.sendOrder(order);
43     }
44 
45 }

生产者发送消息,可以在RabbitMQ的管控台进行观察效果的。上面这种方式,需要手动创建交换机,队列,以及使用路由键将交换机和队列进行绑定。可以在管控台进行交换机、队列、以及使用路由键将交换机和队列进行绑定的。

2、首先创建maven项目的RabbitMQ的消息消费者rabbitmq-springboot-consumer项目,配置pom.xml配置文件,如下所示:

代码语言:javascript
复制
 1 <project xmlns="http://maven.apache.org/POM/4.0.0"
 2     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 3     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
 4     http://maven.apache.org/xsd/maven-4.0.0.xsd">
 5     <modelVersion>4.0.0</modelVersion>
 6     <groupId>com.bie</groupId>
 7     <artifactId>rabbitmq-springboot-consumer</artifactId>
 8     <version>0.0.1-SNAPSHOT</version>
 9 
10     <parent>
11         <groupId>org.springframework.boot</groupId>
12         <artifactId>spring-boot-starter-parent</artifactId>
13         <version>2.0.2.RELEASE</version>
14         <relativePath /> <!-- lookup parent from repository -->
15     </parent>
16 
17     <properties>
18         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
19         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
20         <java.version>1.8</java.version>
21     </properties>
22 
23     <dependencies>
24         <dependency>
25             <groupId>org.springframework.boot</groupId>
26             <artifactId>spring-boot-starter</artifactId>
27         </dependency>
28 
29         <dependency>
30             <groupId>org.springframework.boot</groupId>
31             <artifactId>spring-boot-starter-test</artifactId>
32             <scope>test</scope>
33         </dependency>
34         <dependency>
35             <groupId>org.springframework.boot</groupId>
36             <artifactId>spring-boot-starter-amqp</artifactId>
37         </dependency>
38     </dependencies>
39 
40     <build>
41         <plugins>
42             <plugin>
43                 <groupId>org.springframework.boot</groupId>
44                 <artifactId>spring-boot-maven-plugin</artifactId>
45             </plugin>
46         </plugins>
47     </build>
48 
49 </project>

由于生产者端和消费者端是分项目开发的,但是配置类RabbitMQProducerConfig和实体类Order都一样,主启动类修改一下名称即可,这里就省略了。

代码语言:javascript
复制
 1 package com.bie.consumer;
 2 
 3 import java.util.Map;
 4 
 5 import org.springframework.amqp.rabbit.annotation.Exchange;
 6 import org.springframework.amqp.rabbit.annotation.Queue;
 7 import org.springframework.amqp.rabbit.annotation.QueueBinding;
 8 import org.springframework.amqp.rabbit.annotation.RabbitHandler;
 9 import org.springframework.amqp.rabbit.annotation.RabbitListener;
10 import org.springframework.amqp.support.AmqpHeaders;
11 import org.springframework.messaging.Message;
12 import org.springframework.messaging.handler.annotation.Headers;
13 import org.springframework.messaging.handler.annotation.Payload;
14 import org.springframework.stereotype.Component;
15 
16 import com.bie.po.Order;
17 import com.rabbitmq.client.Channel;
18 
19 /**
20  * 
21  * @author biehl
22  * 
23  *         1、签收模式,首先配置手动确认模式,用于ack的手工处理,这样我们可以保证消息的可靠性送达,
24  *         或者在消费端消费失败的时候可以做到重回队列,根据业务记录日志等处理。
25  * 
26  *         2、可以设置消费端的监听个数和最大个数,用于控制消费端的并发情况
27  * 
28  *         3、消费端最重要的就是注解@RabbitListener注解的使用。消息端监听注解。
29  * 
30  *         该注解是一个组合注解,里面可以注解配置@QueueBinding、@Queue、@Exchange。
31  * 
32  *         直接通过这个组合注解一次性搞定消费端交换机、队列、绑定、路由、并且配置监听功能等等。
33  * 
34  */
35 @Component
36 public class RabbitMQConsumerMessage {
37     
38     @RabbitListener(bindings = @QueueBinding(
39 
40             value = @Queue(value = "${order.queue.name}", durable = "${order.queue.durable}"),
41 
42             exchange = @Exchange(value = "${order.exchange.name}", durable = "${order.exchange.durable}", type = "${order.exchange.type}", ignoreDeclarationExceptions = "${order.exchange.ignoreDeclarationExceptions}"),
43 
44             key = "${order.key}"
45 
46             )
47 
48     )
49     @RabbitHandler
50     public void onMessage(Message message, Channel channel) throws Exception {
51         System.out.println("消费者: " + message.getPayload());
52         Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
53         // 手工ack
54         channel.basicAck(deliveryTag, false);
55     }
56 
57     /**
58      * 
59      * order.queue.name=queue-2 order.queue.durable=true
60      * order.exchange.name=exchange-1 order.exchange.durable=true
61      * order.exchange.type=topic order.exchange.ignoreDeclarationExceptions=true
62      * order.key=springboot.*
63      * 
64      * @param order
65      * @param channel
66      * @param headers
67      * @throws Exception
68      */
69     @RabbitListener(bindings = @QueueBinding(
70 
71             value = @Queue(value = "${order.queue.name}", durable = "${order.queue.durable}"),
72 
73             exchange = @Exchange(value = "${order.exchange.name}", durable = "${order.exchange.durable}", type = "${order.exchange.type}", ignoreDeclarationExceptions = "${order.exchange.ignoreDeclarationExceptions}"),
74 
75             key = "${order.key}"
76 
77             )
78 
79     )
80     @RabbitHandler // @Payload指定实际消息体内容,可以定义到形参上。
81     public void onOrderMessage(@Payload Order order, Channel channel, @Headers Map<String, Object> headers)
82             throws Exception {
83         System.out.println("消费端order: " + order.getId());
84         Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
85         // 手工ACK
86         channel.basicAck(deliveryTag, false);
87     }
88 
89 }

直接启动消费者的启动类,然后在生产者测试类开始发送消息,消费端就可以监听到了消息。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-11-30 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档