前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >深入剖析 RabbitMQ —— Spring 框架下实现 AMQP 高级消息队列协议(中)

深入剖析 RabbitMQ —— Spring 框架下实现 AMQP 高级消息队列协议(中)

原创
作者头像
风尘浪子
修改2019-08-13 14:36:57
1.3K0
修改2019-08-13 14:36:57
举报
文章被收录于专栏:JAVA 核心编程JAVA 核心编程

前言

上面章节已为大家介绍 RabbitMQ 在 Spring 框架下的结构及实现原理,这章里将(从Producer 端的事务、回调函数(ConfirmCallback / ReturnCallback)到 Consumer 端的 MessageListenerContainer 信息接收容器进行详细的分析。通过对 RabbitTemplate、SimpleMessageListenerContainer、DirectMessageListenerContainer 等常用类型介绍,深入剖析在消息处理各个传输环节中的原理及注意事项。

目录

一、RabbitMQ 与 AMQP 的关系

二、RabbitMQ 的实现原理

三、RabbitMQ 应用实例

四、Producer 端的消息发送与监控

五、Consumer 端的消息接收与监控

六、死信队列

七、持久化操作

四、Producer 端的消息发送与监控

前面一节已经介绍了RabbitMQ的基本使用方法,这一节将从更深入的层面讲述 Producer 的应用。

试想一下这种的情形,如果因 RabbitTemplate 发送时 Exchange 名称绑定错误,或 Broken Server 因网络问题或服务负荷过大引发异常,Producer 发送的队列丢失,系统无法正常工作。此时,开发人员应该进行一系列应对措施进行监测,确保每个数据都能正常推送到 Broken Server 。有见及此,RabbitMQ 专门为大家提供了两种解决方案,一是使用传统的事务模式,二是使用回调函数,下面为大家作详介绍。

4.1 Producer 端的事务管理

在需要使用事务时,可以通过两种方法

第一可以调用 channel 类的方法以传统模式进行管理,事务开始时调用 channel.txSelect(),信息发送后进行确认 channel.txCommit(),一旦捕捉到异常进行回滚 channel.txRollback(),最后关闭事务。

代码语言:txt
复制
 1 @Controller
 2 @RequestMapping("/producer")
 3 public class ProducerController {
 4 @Autowired
 5 private RabbitTemplate template;
 6 
 7 @RequestMapping("/send")
 8 public void send1(HttpServletResponse response) 
 9 throws InterruptedException, IOException, TimeoutException{
10 Channel channel=template.getConnectionFactory().createConnection().createChannel(true);
11 .......
12 try{
13 channel.txSelect();
14 channel.basicPublish("ErrorExchange", BindingConfig.Routing_Key_First, new AMQP.BasicProperties(),"Nothing".getBytes());
15 channel.txCommit();
16 }catch(Exception e){
17 channel.txRollback();
18 }finally{
19 channel.close();
20 }
21 ......
22 ......
23 ......
24 }
25 }

第二还可以直接通过 RabbitTemplate 的配置方法 void setChannelTransacted(bool isTransacted) 直接开启事务

代码语言:txt
复制
 1 public class ProducerController {
 2 @Autowired
 3 private ConnectionConfig connection;
 4 
 5 @Autowired
 6 @Bean
 7 private RabbitTemplate template(){
 8 RabbitTemplate template=new RabbitTemplate(connection.getConnectionFactory());
 9 template.setChannelTransacted(true);
10 return template;
11 }
12 
13 @RequestMapping("/send")
14 @Transactional(rollbackFor=Exception.class)
15 public void send(HttpServletResponse response) throws InterruptedException, IOException,TimeoutException{
16 ..........
17 ..........
18 ..........
19 }
20 }

4.2 利用 ConfirmCallback 回调确认消息是否成功发送到 Exchange

使用事务模式消耗的系统资源比较大,系统往往会处理长期等待的状态,在并发量较高的时候也有可能造成死锁的隐患。有见及此,系统提供了轻量级的回调函数方式进行异步处理。

当需要确认消息是否成功发送到 Exchange 的时候,可以使用 ConfirmCallback 回调函数。使用该函数,系统推送消息后,该线程便会得到释放,等 Exchange 接收到消息后系统便会异步调用 ConfirmCallback 绑定的方法进行处理。ConfirmCallback 只包含一个方法 void confirm(CorrelationData correlationData, boolean ack, String cause),此方法会把每条数据发送到 Exchange 时候的 ack 状态(成功/失败),cause 成败原因,及对应的 correlationData(CorrelationData 只包含一个属性 id,是绑定发送对象的唯一标识符) 返还到 Producer,让Producer 进行相应处理。

注意:在绑定 ConfirmCallback 回调函数前,请先把 publisher-confirms 属性设置为 true

代码语言:txt
复制
 1 spring:
 2 application:
 3 name: rabbitmqproducer
 4 rabbitmq:
 5 host: 127.0.0.1 
 6 port: 5672
 7 username: admin
 8 password: 12345678
 9 virtual-host: /LeslieHost

例如:下面的例子,特意将 RabbitTemplate 发送时所绑定的 Exchange 名称填写为错误名称 “ ErrorExchange ”,造成发送失败,然后在回调函数中检查失败的原因。

Producer 端代码:

代码语言:txt
复制
 1 @Configuration
 2 public class ConnectionConfig {
 3 @Value("${spring.rabbitmq.host}")
 4 public String host;
 5 
 6 @Value("${spring.rabbitmq.port}")
 7 public int port;
 8 
 9 @Value("${spring.rabbitmq.username}")
10 public String username;
11 
12 @Value("${spring.rabbitmq.password}")
13 public String password;
14 
15 @Value("${spring.rabbitmq.virtual-host}")
16 public String virtualHost;
17 
18 @Bean
19 public ConnectionFactory getConnectionFactory(){
20 CachingConnectionFactory factory=new CachingConnectionFactory();
21 System.out.println(host);
22 factory.setHost(host);
23 factory.setPort(port);
24 factory.setUsername(username);
25 factory.setPassword(password);
26 factory.setVirtualHost(virtualHost);
27 factory.setPublisherConfirms(true);
28 factory.setPublisherReturns(true);
29 return factory;
30 }
31 }
32 
33 @Configuration
34 public class BindingConfig {
35 public final static String first="direct.first";
36 public final static String Exchange_NAME="directExchange";
37 public final static String RoutingKey1="directKey1";
38 
39 @Bean
40 public Queue queueFirst(){
41 return new Queue(first);
42 }
43 
44 @Bean
45 public DirectExchange directExchange(){
46 return new DirectExchange(Exchange_NAME);
47 }
48 
49 @Bean
50 public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
51 return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
52 } 
53 }
54 
55 @Component
56 public class MyConfirmCallback implements ConfirmCallback {
57 
58 @Override
59 public void confirm(CorrelationData correlationData, boolean ack, String cause) {
60 // TODO 自动生成的方法存根
61 // TODO 自动生成的方法存根
62 if(ack){
63 System.out.println(correlationData.getId()+" ack is: true! \ncause:"+cause);
64 }else
65 System.out.println(correlationData.getId()+" ack is: false! \ncause:"+cause);
66 }
67 }
68 
69 @Controller
70 @RequestMapping("/producer")
71 public class ProducerController {
72 @Autowired
73 private RabbitTemplate template;
74 @Autowired
75 private MyConfirmCallback confirmCallback;
76 
77 @RequestMapping("/send")
78 public void send() {
79 template.setConfirmCallback(confirmCallback); 
80 for(int n=0;n<2;n++){ 
81 template.convertAndSend("ErrorExchange",
82 BindingConfig.RoutingKey1,"I'm the first queue! "
83 +String.valueOf(n),getCorrelationData());
84 }
85 }
86 
87 private CorrelationData getCorrelationData(){
88 return new CorrelationData(UUID.randomUUID().toString());
89 }
90 } 

Consumer端代码

代码语言:txt
复制
 1 @Configuration
 2 public class ConnectionConfig {
 3 @Value("${spring.rabbitmq.host}")
 4 public String host;
 5 
 6 @Value("${spring.rabbitmq.port}")
 7 public int port;
 8 
 9 @Value("${spring.rabbitmq.username}")
10 public String username;
11 
12 @Value("${spring.rabbitmq.password}")
13 public String password;
14 
15 @Value("${spring.rabbitmq.virtual-host}")
16 public String virtualHost;
17 
18 @Bean
19 public ConnectionFactory getConnectionFactory(){
20 CachingConnectionFactory factory=new CachingConnectionFactory();
21 factory.setHost(host);
22 factory.setPort(port);
23 factory.setUsername(username);
24 factory.setPassword(password);
25 factory.setVirtualHost(virtualHost);
26 return factory;
27 }
28 }
29 
30 @Configuration
31 @RabbitListener(bindings=@QueueBinding(
32 exchange=@Exchange(value="directExchange"),
33 value=@Queue(value="direct.first"),
34 key="directKey1"))
35 public class RabbitMqListener {
36 
37 @RabbitHandler
38 public void handler(String message){
39 System.out.println(message);
40 }
41 }
42 
43 @SpringBootApplication
44 public class App {
45 
46 public static void main(String[] args){
47 SpringApplication.run(App.class, args);
48 }
49 }

运行结果:

4.3 绑定 CorrelationData 与发送对象的关系

上面的例子当中,CorrelationData 只是用一个随机的 UUID 作为 CorrelationID,而在现实的应用场景中,由于 ConfirmCallback 只反回标识值 CorrelationData,而没有把队列里的对象值也一同返回。所以,在推送队列时可以先用 Key-Value 保存 CorrelationID 与所发送信息的关系,这样当 ConfirmCallback 回调时,就可根据 CorrelationID 找回对象,作进一步处理。

下面例子,我们把要发送的对象放在虚拟数据 DataSource 类中,用 DataRelation 记录 CorrelationID 与发送对象 OrderID 的关系,然后在回调函数 ConfirmCallback 中根据 CorrelationID 查找对应的 OrderEntity,如果发送成功,则删除绑定。如果发送失败,可以重新发送或根据情况再作处理。

Producer端代码:

代码语言:txt
复制
 1 @Configuration
 2 public class ConnectionConfig {
 3 @Value("${spring.rabbitmq.host}")
 4 public String host;
 5 
 6 @Value("${spring.rabbitmq.port}")
 7 public int port;
 8 
 9 @Value("${spring.rabbitmq.username}")
 10 public String username;
 11 
 12 @Value("${spring.rabbitmq.password}")
 13 public String password;
 14 
 15 @Value("${spring.rabbitmq.virtual-host}")
 16 public String virtualHost;
 17 
 18 @Bean
 19 public ConnectionFactory getConnectionFactory(){
 20 CachingConnectionFactory factory=new CachingConnectionFactory();
 21 System.out.println(host);
 22 factory.setHost(host);
 23 factory.setPort(port);
 24 factory.setUsername(username);
 25 factory.setPassword(password);
 26 factory.setVirtualHost(virtualHost);
 27 factory.setPublisherConfirms(true);
 28 factory.setPublisherReturns(true);
 29 return factory;
 30 }
 31 }
 32 
 33 @Configuration
 34 public class BindingConfig {
 35 public final static String first="direct.first";
 36 //Exchange 使用 direct 模式 
 37 public final static String Exchange_NAME="directExchange";
 38 public final static String RoutingKey1="directKey1";
 39 
 40 @Bean
 41 public Queue queueFirst(){
 42 return new Queue(first);
 43 }
 44 
 45 @Bean
 46 public DirectExchange directExchange(){
 47 return new DirectExchange(Exchange_NAME);
 48 }
 49 
 50 @Bean
 51 public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
 52 return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
 53 }
 54 }
 55 
 56 @Data
 57 public class OrderEntity implements Serializable{
 58 private String id;
 59 private String goods;
 60 private Double price;
 61 private Integer count;
 62 
 63 public OrderEntity(String id,String goods,Double price,Integer count){
 64 this.id=id;
 65 this.goods=goods;
 66 this.price=price;
 67 this.count=count;
 68 }
 69 
 70 public OrderEntity(){}
 71 
 72 public String getId() {
 73 return id;
 74 }
 75 public void setId(String id) {
 76 this.id = id;
 77 }
 78 
 79 public String getGoods() {
 80 return goods;
 81 }
 82 
 83 public void setGoodsId(String goods) {
 84 this.goods = goods;
 85 }
 86 
 87 public Integer getCount() {
 88 return count;
 89 }
 90 
 91 public void setCount(Integer count) {
 92 this.count = count;
 93 }
 94 
 95 public Double getPrice() {
 96 return price;
 97 }
 98 
 99 public void setPrice(Double price) {
100 this.price = price;
101 }
102 }
103 
104 @Component
105 public class DataSource {
106 //加入虚拟数据
107 private static List<OrderEntity> list=new ArrayList<OrderEntity>(
108 Arrays.asList(new OrderEntity("001","Nikon D750",13990.00,1),
109 new OrderEntity("002","Huwei P30 Plus",5400.00,1),
110 ..........));
111 
112 public DataSource(){
113 }
114 
115 public List<OrderEntity> getOrderList(){
116 return list;
117 }
118 
119 //根据Id获取对应order
120 public OrderEntity getOrder(String id){
121 for(OrderEntity order:list){
122 if(order.getId()==id)
123 return order;
124 }
125 return null;
126 }
127 }
128 
129 public class DataRelation {
130 public static Map map=new HashMap();
131 
132 //绑定关系
133 public static void add(String key,String value){
134 if(!map.containsKey(key))
135 map.put(key,value);
136 }
137 
138 //返回orderId
139 public static Object get(String key){
140 if(map.containsKey(key))
141 return map.get(key);
142 else
143 return null;
144 }
145 
146 //根据 orderId 删除绑定关系
147 public static void del(String key){
148 if(map.containsKey(key))
149 map.remove(key);
150 }
151 }
152 
153 @Component
154 public class MyConfirmCallback implements ConfirmCallback {
155 @Autowired
156 private DataSource datasource;
157 
158 @Override
159 public void confirm(CorrelationData correlationData, boolean ack, String cause) {
160 String correlationId=correlationData.getId();
161 //根据 correclationId取回对应的orderId
162 String orderId=DataRelation.get(correlationId).toString();
163 //在datasource中找回对应的order
164 OrderEntity order=datasource.getOrder(orderId);
165 
166 if(ack){
167 System.out.println("--------------------ConfirmCallback-------------------\n" 
168 +" order's ack is true!\nId:"+order.getId()+" Goods:"+order.getGoods()
169 +" Count:"+order.getCount().toString()+" Price:"+order.getPrice());
170 DataRelation.del(correlationId); //操作完成删除对应绑定
171 }else {
172 System.out.println(order.getId()+" order's ack is: false! \ncause:"+cause);
173 //可在记录日志后把Order推送到队列进行重新发送
174 .......
175 }
176 }
177 }
178 
179 @Controller
180 @RequestMapping("/producer")
181 public class ProducerController {
182 @Autowired
183 private RabbitTemplate template;
184 @Autowired
185 private MyConfirmCallback confirmCallback;
186 @Autowired
187 private DataSource dataSource;
188 
189 @RequestMapping("/send")
190 public void send() throws InterruptedException, IOException{
191 //绑定 ConfirmCallback 回调函数
192 template.setConfirmCallback(confirmCallback);
193 
194 for(OrderEntity order:dataSource.getOrderList()){
195 CorrelationData correlationData=getCorrelationData();
196 //保存 CorrelationId 与 orderId关系
197 DataRelation.add(correlationData.getId(), order.getId());
198 //把 order 插入队列
199 template.convertAndSend("directExchange",BindingConfig.RoutingKey1,order,correlationData);
200 }
201 }
202 
203 private CorrelationData getCorrelationData(){
204 return new CorrelationData(UUID.randomUUID().toString());
205 }
206 }

Consumer 端代码

代码语言:txt
复制
 1 @Configuration
 2 public class ConnectionConfig {
 3 @Value("${spring.rabbitmq.host}")
 4 public String host;
 5 
 6 @Value("${spring.rabbitmq.port}")
 7 public int port;
 8 
 9 @Value("${spring.rabbitmq.username}")
10 public String username;
11 
12 @Value("${spring.rabbitmq.password}")
13 public String password;
14 
15 @Value("${spring.rabbitmq.virtual-host}")
16 public String virtualHost;
17 
18 @Bean
19 public ConnectionFactory getConnectionFactory(){
20 CachingConnectionFactory factory=new CachingConnectionFactory();
21 factory.setHost(host);
22 factory.setPort(port);
23 factory.setUsername(username);
24 factory.setPassword(password);
25 factory.setVirtualHost(virtualHost);
26 return factory;
27 }
28 }
29 
30 @Configuration
31 @RabbitListener(bindings=@QueueBinding(
32 exchange=@Exchange(value="directExchange"),
33 value=@Queue(value="direct.first"),
34 key="directKey1"))
35 public class RabbitMqListener {
36 
37 @RabbitHandler
38 public void handler(String message){
39 System.out.println(message);
40 }
41 }
42 
43 @SpringBootApplication
44 public class App {
45 
46 public static void main(String[] args){
47 SpringApplication.run(App.class, args);
48 }
49 }

运行结果

4.4 利用 ReturnCallback 处理队列 Queue 错误

使用 ConfirmCallback 函数只能判断消息是否成功发送到 Exchange,但并不能保证消息已经成功进行队列 Queue。所以,系统预备了另一个回调函数 ReturnCallback 来监听 Queue 队列处理的成败。如果队列错误绑定不存在的 queue,或者 Broken Server 瞬间出现问题末能找到对应的 queue,系统就会激发 Producer 端 ReturnCallback 的回调函数来进行错误处理。 ReturnCallback 回调接口只包含一个方法 void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey),它会把出错的 replyCode,replyText,exchange,routingKey等值都一起返还。与 ConfirmCallback 不同的是,returnedMessage 会把队列中的对象保存到 Message 的 Body 属性中并返还到回调函数。

注意:在绑定 ReturnCallback 回调函数前,请先把 publisher-returns 及 mandatory 属性设置为 true 。 mandatory 参数默认为 false,用于判断 broken server是否把错误的对象返还到 Producer。如末进行设置,系统将把错误的消息丢弃。

下面例子我们在调用 convertAndSend 方法时特意把 routingKey 设置为 ErrorKey,触发 ReturnCallback 回调,然后在 ReturenCallback 的回调方法显示 replyCode,replyText,exchange,routingKey 等值,并把队列中对象属性一并显示。

Producer 端代码

代码语言:txt
复制
 1 @Configuration
 2 public class ConnectionConfig {
 3 @Value("${spring.rabbitmq.host}")
 4 public String host;
 5 
 6 @Value("${spring.rabbitmq.port}")
 7 public int port;
 8 
 9 @Value("${spring.rabbitmq.username}")
 10 public String username;
 11 
 12 @Value("${spring.rabbitmq.password}")
 13 public String password;
 14 
 15 @Value("${spring.rabbitmq.virtual-host}")
 16 public String virtualHost;
 17 
 18 @Bean
 19 public ConnectionFactory getConnectionFactory(){
 20 CachingConnectionFactory factory=new CachingConnectionFactory();
 21 System.out.println(host);
 22 factory.setHost(host);
 23 factory.setPort(port);
 24 factory.setUsername(username);
 25 factory.setPassword(password);
 26 factory.setVirtualHost(virtualHost);
 27 factory.setPublisherConfirms(true);
 28 factory.setPublisherReturns(true);
 29 return factory;
 30 }
 31 }
 32 
 33 @Configuration
 34 public class BindingConfig {
 35 public final static String first="direct.first";
 36 public final static String Exchange_NAME="directExchange";
 37 public final static String RoutingKey1="directKey1";
 38 
 39 @Bean
 40 public Queue queueFirst(){
 41 return new Queue(first);
 42 }
 43 
 44 @Bean
 45 public DirectExchange directExchange(){
 46 return new DirectExchange(Exchange_NAME);
 47 }
 48 
 49 @Bean
 50 public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
 51 return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
 52 } 
 53 }
 54 
 55 @Data
 56 public class OrderEntity implements Serializable{
 57 private String id;
 58 private String goods;
 59 private Double price;
 60 private Integer count;
 61 
 62 public OrderEntity(String id,String goods,Double price,Integer count){
 63 this.id=id;
 64 this.goods=goods;
 65 this.price=price;
 66 this.count=count;
 67 }
 68 
 69 public OrderEntity(){}
 70 
 71 public String getId() {
 72 return id;
 73 }
 74 public void setId(String id) {
 75 this.id = id;
 76 }
 77 
 78 public String getGoods() {
 79 return goods;
 80 }
 81 
 82 public void setGoodsId(String goods) {
 83 this.goods = goods;
 84 }
 85 
 86 public Integer getCount() {
 87 return count;
 88 }
 89 
 90 public void setCount(Integer count) {
 91 this.count = count;
 92 }
 93 
 94 public Double getPrice() {
 95 return price;
 96 }
 97 
 98 public void setPrice(Double price) {
 99 this.price = price;
100 }
101 }
102 
103 @Component
104 public class DataSource {
105 //虚拟数据
106 private static List<OrderEntity> list=new ArrayList<OrderEntity>(
107 Arrays.asList(new OrderEntity("001","Nikon D750",13990.00,1),
108 new OrderEntity("002","Huwei P30 Plus",5400.00,1),
109 ......));
110 public DataSource(){
111 }
112 
113 public List<OrderEntity> getOrderList(){
114 return list;
115 }
116 
117 //根据Id获取对应order
118 public OrderEntity getOrder(String id){
119 for(OrderEntity order:list){
120 if(order.getId()==id)
121 return order;
122 }
123 return null;
124 }
125 }
126 
127 @Component
128 public class MyReturnCallback implements ReturnCallback {
129 
130 @Override
131 public void returnedMessage(Message message, int replyCode, 
132 String replyText, String exchange, String routingKey){
133 //把messageBody反序列化为 OrderEntity对象
134 OrderEntity order=convertToOrder(message.getBody());
135 //显示错误原因
136 System.out.println("-------------ReturnCallback!------------\n"
137 +" exchange:"+exchange+" replyCode:"+String.valueOf(replyCode)
138 +" replyText:"+replyText+" key:"+routingKey+"\n OrderId:"+order.getId()
139 +" Goods:"+order.getGoods()+" Count:"+order.getCount().toString()
140 +" Price:"+order.getPrice()+" ");
141 }
142 
143 //把byte[]反序列化为 OrderEntity对象
144 private OrderEntity convertToOrder(byte[] bytes){
145 OrderEntity order=null;
146 ByteArrayInputStream bis = new ByteArrayInputStream (bytes); 
147 ObjectInputStream ois;
148 try {
149 ois = new ObjectInputStream (bis);
150 Object obj = ois.readObject();
151 order=(OrderEntity)obj;
152 ois.close(); 
153 bis.close(); 
154 } catch (IOException | ClassNotFoundException e) {
155 // TODO 自动生成的 catch 块
156 e.printStackTrace();
157 } 
158 return order;
159 }
160 }
161 
162 @Controller
163 @RequestMapping("/producer")
164 public class ProducerController {
165 @Autowired
166 private RabbitTemplate template;
167 @Autowired
168 private MyReturnCallback returnCallback;
169 @Autowired
170 private DataSource dataSource;
171 
172 
173 @RequestMapping("/send")
174 public void send() throws InterruptedException, IOException{
175 //把 mandatory 属性设定为true
176 template.setMandatory(true);
177 //绑定 ReturnCallback 回调函数
178 template.setReturnCallback(returnCallback);
179 
180 for(OrderEntity order:dataSource.getOrderList()){
181 CorrelationData correlationData=getCorrelationData();
182 template.convertAndSend("directExchange","ErrorKey",order,correlationData);
183 }
184 }
185 
186 private CorrelationData getCorrelationData(){
187 return new CorrelationData(UUID.randomUUID().toString());
188 }
189 }

Consumer 代码

代码语言:txt
复制
 1 @Configuration
 2 public class ConnectionConfig {
 3 @Value("${spring.rabbitmq.host}")
 4 public String host;
 5 
 6 @Value("${spring.rabbitmq.port}")
 7 public int port;
 8 
 9 @Value("${spring.rabbitmq.username}")
10 public String username;
11 
12 @Value("${spring.rabbitmq.password}")
13 public String password;
14 
15 @Value("${spring.rabbitmq.virtual-host}")
16 public String virtualHost;
17 
18 @Bean
19 public ConnectionFactory getConnectionFactory(){
20 CachingConnectionFactory factory=new CachingConnectionFactory();
21 factory.setHost(host);
22 factory.setPort(port);
23 factory.setUsername(username);
24 factory.setPassword(password);
25 factory.setVirtualHost(virtualHost);
26 return factory;
27 }
28 }
29 
30 @Configuration
31 @RabbitListener(bindings=@QueueBinding(
32 exchange=@Exchange(value="directExchange"),
33 value=@Queue(value="direct.first"),
34 key="directKey1"))
35 public class RabbitMqListener {
36 
37 @RabbitHandler
38 public void handler(String message){
39 System.out.println(message);
40 }
41 }
42 
43 @SpringBootApplication
44 public class App {
45 
46 public static void main(String[] args){
47 SpringApplication.run(App.class, args);
48 }
49 }

运行结果:

五、Consumer 消息接收管控

在第四节主要介绍了 Producer 端的队列发送与监控,它只能管理 Producer 与 Broker Server 之间的通信,但并不能确认 Consumer 是否能成功接收到队列,在这节内容将介绍 Consumer 端的队列接收与监听。前面几节里,Consumer 端都是简单地直接使用 RabbitListener 对队列进行监听,其实 RabbitMQ 已经为用户准备了功能更强大的 MessageListenerContainer 容器用于管理 Message ,下面将为大家介绍。

5.1 AbstractMessageListenerContainer 介绍

AbstractMeessageListenerContainer 虚拟类是 RabbitMQ 封装好的一个容器,本身并没有对消息进行处理,而是把消息的处理方式交给了 MessageListener 。而它的主要功能是实现 MessageListener 的绑定,ApplicationContext 上下文的绑定,ErrorHandler 的错误处理方法的绑定、对消息消费的开始、结束等等默认参数进行配置,让开发人员可以在容器中对 Consumer 实现统一管理。SimpleMessageListenerContainer、DirectMessageLinstenerCoontainer 都是它的子类,分别应用于不同的场景,在下面会再作详细介绍。

方法

说明

void setAcknowledgeMode(AcknowledgeMode acknowledgeMode)

设定消息接收确认的模式(下文会有详细介绍)

AcknowledgeMode getAcknowledgeMode()

获取消息接收确认的模式(下文会有详细介绍)

void setPrefetchCount(int prefetchCount)

设置每个 consumer 每次可接收到消息的最大数量

void setQueues(Queue... queues)

设定监听Queue队列

void addQueues(Queue... queues)

加入监听Queue队列

void setMessageListener(Object messageListener)

绑定MessageListener,对信息进行处理

void setChannelAwareMessageListener(ChannelAwareMessageListener messageListener)

绑定ChannelAwareMessageListener,对信息进行处理,同时可获取当前使用的channel信息

Object getMessageListener()

获取MessageListener对象

void setMessageConverter(MessageConverter messageConverter)

绑定MessageConverter消息转换对象

void setApplicationContext(ApplicationContext applicationContext)

绑定ApplicationContext上下文

ConnectionFactory getConnectionFactory()

获取ConnectionFactory连接工厂

void setListenerId(String listenerId)

设定ListenerId

SimpleMessageListenerContainer 是最常用的 MessageListener 容器,它可以通过下面的方法设置默认消费者数量与最大的消费者数量。下面例子中尝试把 consurrentConsumers 设置为3,把maxConcurrentConsumers 设置为4,并同时监控 direct 模式交换器的 direct.first,direct.second 队列。

方法

说明

void setConcurrentConsumers(final int concurrentConsumers)

设定当前队列中消费者数量

void setMaxConcurrentConsumers(int maxConcurrentConsumers)

设定当前队列中最大消费者数量

通过截图可以看到,系统默认会为每个 queue 都创建 3 个 consumers,不同的 queue 中的 consumers 是共享相同的 3 个 channel 。

当 Producer 端发送消息时,consumers 的实际数量可根据 maxConcurrentConsumers 的配置限制进行扩展。

Producer 端代码

代码语言:txt
复制
 1 @Configuration
 2 public class BindingConfig {
 3 public final static String first="direct.first";
 4 public final static String second="direct.second";
 5 public final static String Exchange_NAME="directExchange";
 6 public final static String RoutingKey1="directKey1";
 7 public final static String RoutingKey2="directKey2";
 8 
 9 @Bean
10 public Queue queueFirst(){
11 return new Queue(first);
12 }
13 
14 @Bean
15 public Queue queueSecond(){
16 return new Queue(second);
17 }
18 
19 @Bean
20 public DirectExchange directExchange(){
21 return new DirectExchange(Exchange_NAME);
22 }
23 
24 //利用BindingBuilder绑定Direct与queueFirst
25 @Bean
26 public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
27 return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
28 }
29 
30 //利用BindingBuilder绑定Direct与queueSecond
31 @Bean
32 public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){ 
33 return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
34 } 
35 }
36 
37 @Configuration
38 public class ConnectionConfig {
39 @Value("${spring.rabbitmq.host}")
40 public String host;
41 
42 @Value("${spring.rabbitmq.port}")
43 public int port;
44 
45 @Value("${spring.rabbitmq.username}")
46 public String username;
47 
48 @Value("${spring.rabbitmq.password}")
49 public String password;
50 
51 @Value("${spring.rabbitmq.virtual-host}")
52 public String virtualHost;
53 
54 @Bean
55 public ConnectionFactory getConnectionFactory(){
56 CachingConnectionFactory factory=new CachingConnectionFactory();
57 factory.setHost(host);
58 factory.setPort(port);
59 factory.setUsername(username);
60 factory.setPassword(password);
61 factory.setVirtualHost(virtualHost);
62 return factory;
63 }
64 }
65 
66 @Controller
67 @RequestMapping("/producer")
68 public class ProducerController {
69 @Autowired
70 private RabbitTemplate template;
71 
72 @RequestMapping("/send")
73 public void send(HttpServletResponse response) throws InterruptedException, IOException{
74 for(Integer n=0;n<100;n++){
75 CorrelationData correlationData=getCorrelationData();
76 template.convertAndSend("directExchange","directKey1", 
77 "queue1"+" "+n.toString(),correlationData);
78 template.convertAndSend("directExchange","directKey2"," queue2"+" "+n.toString(),correlationData); 
79 Thread.currentThread().sleep(30);
80 }
81 }
82 
83 private CorrelationData getCorrelationData(){
84 return new CorrelationData(UUID.randomUUID().toString());
85 }
86 }

Consumer 端代码:

代码语言:txt
复制
 1 @Configuration
 2 public class ConnectionConfig {
 3 @Value("${spring.rabbitmq.host}")
 4 public String host;
 5 
 6 @Value("${spring.rabbitmq.port}")
 7 public int port;
 8 
 9 @Value("${spring.rabbitmq.username}")
 10 public String username;
 11 
 12 @Value("${spring.rabbitmq.password}")
 13 public String password;
 14 
 15 @Value("${spring.rabbitmq.virtual-host}")
 16 public String virtualHost;
 17 
 18 @Bean
 19 public ConnectionFactory getConnectionFactory(){
 20 CachingConnectionFactory factory=new CachingConnectionFactory();
 21 factory.setHost(host);
 22 factory.setPort(port);
 23 factory.setUsername(username);
 24 factory.setPassword(password);
 25 factory.setVirtualHost(virtualHost);
 26 return factory;
 27 }
 28 }
 29 
 30 @Configuration
 31 public class BindingConfig {
 32 public final static String first="direct.first";
 33 public final static String second="direct.second";
 34 public final static String Exchange_NAME="directExchange";
 35 public final static String RoutingKey1="directKey1";
 36 public final static String RoutingKey2="directKey2";
 37 
 38 @Bean
 39 public Queue queueFirst(){
 40 return new Queue(first);
 41 }
 42 
 43 @Bean
 44 public Queue queueSecond(){
 45 return new Queue(second);
 46 }
 47 
 48 @Bean
 49 public DirectExchange directExchange(){
 50 return new DirectExchange(Exchange_NAME);
 51 }
 52 
 53 //利用BindingBuilder绑定Direct与queueFirst
 54 @Bean
 55 public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
 56 return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
 57 }
 58 
 59 //利用BindingBuilder绑定Direct与queueSecond
 60 @Bean
 61 public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){ 
 62 return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
 63 } 
 64 }
 65 @Configuration
 66 public class SimpleMessListener {
 67 @Autowired
 68 private RabbitTemplate template;
 69 private int index=0;
 70 
 71 @Bean
 72 public SimpleMessageListenerContainer messageContainer(){
 73 SimpleMessageListenerContainer container=new SimpleMessageListenerContainer();
 74 container.setConnectionFactory(connectionConfig.getConnectionFactory());
 75 // 绑定Queue1/Queue2
 76 container.setQueueNames("direct.first"); 
 77 container.addQueueNames("direct.second");
 78 //设置默认 consumer 数为3
 79 container.setConcurrentConsumers(3);
 80 //设置最大 consumer 数为4
 81 container.setMaxConcurrentConsumers(4);
 82 //标记 consumerTag
 83 container.setConsumerTagStrategy(queue -> "consumer"+(++index));
 84 //绑定MessageListener显示接收信息
 85 container.setMessageListener(new MessageListener(){
 86 @Override
 87 public void onMessage(Message message) {
 88 // TODO 自动生成的方法存根
 89 Thread thread=Thread.currentThread();
 90 MessageProperties messProp=message.getMessageProperties();
 91 try {
 92 System.out.println(" ConsumerTag:"+messProp.getConsumerTag()
 93 +" ThreadId is:"+thread.getId()+" Queue:"+messProp.getConsumerQueue()
 94 +" "+new String(message.getBody(),"UTF-8"));
 95 } catch (UnsupportedEncodingException e) {
 96 // TODO 自动生成的 catch 块
 97 e.printStackTrace();
 98 }
 99 }
100 
101 });
102 return container;
103 }
104 }

运行结果

5.3 SimpleMessageListenerContainer 的运作原理

在 SimpleMessageListenerContainer 模式中,无论系统监听多少个 queue 队列,channel 都是共享的,类似上面的例子,4个 channel 会把接收到不同的队列请求并分发到对应的 consumer 进行处理。这样做的好处是系统可以通过 concurrentConsumers、maxConcurrentConsumers 灵活设定当前队列中消费者的数量,系统可以跟据实际需求灵活处理。但由于每个 channel 都是在固定线程中运行的,一个 channel 要游走于多个 consumer 当中,这无疑增加了系统在上下文切换中的开销。下面用系统提供的 ChannelAwareMessageListener 接口,以更直观的例子说明一下 SimpleMessageListenerContainer 当中 channel、queue、consumer 之间的关系。

Producer 端代码

代码语言:txt
复制
 1 @Configuration
 2 public class BindingConfig {
 3 public final static String first="direct.first";
 4 public final static String second="direct.second";
 5 public final static String Exchange_NAME="directExchange";
 6 public final static String RoutingKey1="directKey1";
 7 public final static String RoutingKey2="directKey2";
 8 
 9 @Bean
10 public Queue queueFirst(){
11 return new Queue(first);
12 }
13 
14 @Bean
15 public Queue queueSecond(){
16 return new Queue(second);
17 }
18 
19 @Bean
20 public DirectExchange directExchange(){
21 return new DirectExchange(Exchange_NAME);
22 }
23 
24 //利用BindingBuilder绑定Direct与queueFirst
25 @Bean
26 public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
27 return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
28 }
29 
30 //利用BindingBuilder绑定Direct与queueSecond
31 @Bean
32 public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){ 
33 return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
34 } 
35 }
36 
37 @Configuration
38 public class ConnectionConfig {
39 @Value("${spring.rabbitmq.host}")
40 public String host;
41 
42 @Value("${spring.rabbitmq.port}")
43 public int port;
44 
45 @Value("${spring.rabbitmq.username}")
46 public String username;
47 
48 @Value("${spring.rabbitmq.password}")
49 public String password;
50 
51 @Value("${spring.rabbitmq.virtual-host}")
52 public String virtualHost;
53 
54 @Bean
55 public ConnectionFactory getConnectionFactory(){
56 CachingConnectionFactory factory=new CachingConnectionFactory();
57 factory.setHost(host);
58 factory.setPort(port);
59 factory.setUsername(username);
60 factory.setPassword(password);
61 factory.setVirtualHost(virtualHost);
62 return factory;
63 }
64 }
65 
66 @Controller
67 @RequestMapping("/producer")
68 public class ProducerController {
69 @Autowired
70 private RabbitTemplate template;
71 
72 @RequestMapping("/send")
73 public void send(HttpServletResponse response) throws InterruptedException, IOException{
74 for(Integer n=0;n<100;n++){
75 CorrelationData correlationData=getCorrelationData();
76 template.convertAndSend("directExchange","directKey1",
77 " queue1"+" "+n.toString(),correlationData);
78 template.convertAndSend("directExchange","directKey2",
79 "queue2"+" "+n.toString(),correlationData); 
80 Thread.currentThread().sleep(30);
81 }
82 }
83 
84 private CorrelationData getCorrelationData(){
85 return new CorrelationData(UUID.randomUUID().toString());
86 }
87 }

Consumer 端代码

代码语言:txt
复制
 1 @Configuration
 2 public class ConnectionConfig {
 3 @Value("${spring.rabbitmq.host}")
 4 public String host;
 5 
 6 @Value("${spring.rabbitmq.port}")
 7 public int port;
 8 
 9 @Value("${spring.rabbitmq.username}")
 10 public String username;
 11 
 12 @Value("${spring.rabbitmq.password}")
 13 public String password;
 14 
 15 @Value("${spring.rabbitmq.virtual-host}")
 16 public String virtualHost;
 17 
 18 @Bean
 19 public ConnectionFactory getConnectionFactory(){
 20 CachingConnectionFactory factory=new CachingConnectionFactory();
 21 factory.setHost(host);
 22 factory.setPort(port);
 23 factory.setUsername(username);
 24 factory.setPassword(password);
 25 factory.setVirtualHost(virtualHost);
 26 return factory;
 27 }
 28 }
 29 
 30 @Configuration
 31 public class BindingConfig {
 32 public final static String first="direct.first";
 33 public final static String second="direct.second";
 34 public final static String Exchange_NAME="directExchange";
 35 public final static String RoutingKey1="directKey1";
 36 public final static String RoutingKey2="directKey2";
 37 
 38 @Bean
 39 public Queue queueFirst(){
 40 return new Queue(first);
 41 }
 42 
 43 @Bean
 44 public Queue queueSecond(){
 45 return new Queue(second);
 46 }
 47 
 48 @Bean
 49 public DirectExchange directExchange(){
 50 return new DirectExchange(Exchange_NAME);
 51 }
 52 
 53 //利用BindingBuilder绑定Direct与queueFirst
 54 @Bean
 55 public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
 56 return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
 57 }
 58 
 59 //利用BindingBuilder绑定Direct与queueSecond
 60 @Bean
 61 public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){ 
 62 return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
 63 } 
 64 }
 65 @Configuration
 66 public class SimpleMessListener {
 67 @Autowired
 68 private RabbitTemplate template;
 69 @Autowired
 70 private ConnectionConfig connectionConfig;
 71 private int index=0;
 72 
 73 @Bean
 74 public SimpleMessageListenerContainer messageContainer(){
 75 SimpleMessageListenerContainer container=new SimpleMessageListenerContainer();
 76 container.setConnectionFactory(connectionConfig.getConnectionFactory());
 77 // 绑定Queue1/Queue2
 78 container.setQueueNames("direct.first"); 
 79 container.addQueueNames("direct.second");
 80 //设置默认 consumer 数为3
 81 container.setConcurrentConsumers(3);
 82 //设置最大 consumer 数为4
 83 container.setMaxConcurrentConsumers(4);
 84 //标记 consumerTag
 85 container.setConsumerTagStrategy(queue -> "consumer"+(++index));
 86 //绑定ChannelAwareMessageListener显示接收信息
 87 container.setChannelAwareMessageListener(new ChannelAwareMessageListener(){
 88 @Override
 89 public void onMessage(Message message, com.rabbitmq.client.Channel channel) 
 90 throws Exception {
 91 // TODO 自动生成的方法存根
 92 // TODO 自动生成的方法存根
 93 Thread thread=Thread.currentThread();
 94 System.out.println("Channel:"+channel.getChannelNumber() 
 95 +" ThreadId is:"+thread.getId()
 96 +" ConsumerTag:"+message.getMessageProperties().getConsumerTag()
 97 +" Queue:"+message.getMessageProperties().getConsumerQueue());
 98 
 99 }
100 
101 });
102 return container;
103 }
104 } 

运行结果:

观察运行结果可以看到:每个 channel 都在固定的线程中运行,一个 channel 会向不同的 consumer 发送队列信息。了解 channel、thread、queue、consumer 之间的关系,会对 SimpleMessageListenerContainer 有更深入认识。

5.4 DirectMessageListenerContainer

SimpleMessageListenerContainer 是经典的容器,使用 channel 共享,一旦某个 channel 关闭或重启,意味着每个队列 queue 中使用当前 channel 的 consumer 都会受到影响。 有见及此,在 RabbitMQ 2.0 后,系统引入了 DirectMessageListenerContainer ,它允许每个 consumer 都有各自的对应的 channel 的,channel 只管理负责管理当前 consumer 的通道。这样令 consumer 运用更灵活,同时线程并没有跟 channel 绑定,而是由独立的线程池进行管理,这是更好地解决了 SimpleMessageListenerContainer 中上下文切换所带来的资源消耗问题。

下面的例子,我们尝试使用把 consumersPerQueue 设置为 4,并同时监控 direct 模式 exchange 的 direct.first,direct.second 队列。

从管理界面可以看到,系统会为每个 consumer 都生成一个独立的 channel 进行管理。

Producer 端代码

代码语言:txt
复制
 1 @Configuration
 2 public class BindingConfig {
 3 public final static String first="direct.first";
 4 public final static String second="direct.second";
 5 public final static String Exchange_NAME="directExchange";
 6 public final static String RoutingKey1="directKey1";
 7 public final static String RoutingKey2="directKey2";
 8 
 9 @Bean
10 public Queue queueFirst(){
11 return new Queue(first);
12 }
13 
14 @Bean
15 public Queue queueSecond(){
16 return new Queue(second);
17 }
18 
19 @Bean
20 public DirectExchange directExchange(){
21 return new DirectExchange(Exchange_NAME);
22 }
23 
24 //利用BindingBuilder绑定Direct与queueFirst
25 @Bean
26 public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
27 return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
28 }
29 
30 //利用BindingBuilder绑定Direct与queueSecond
31 @Bean
32 public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){ 
33 return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
34 } 
35 }
36 
37 @Configuration
38 public class ConnectionConfig {
39 @Value("${spring.rabbitmq.host}")
40 public String host;
41 
42 @Value("${spring.rabbitmq.port}")
43 public int port;
44 
45 @Value("${spring.rabbitmq.username}")
46 public String username;
47 
48 @Value("${spring.rabbitmq.password}")
49 public String password;
50 
51 @Value("${spring.rabbitmq.virtual-host}")
52 public String virtualHost;
53 
54 @Bean
55 public ConnectionFactory getConnectionFactory(){
56 CachingConnectionFactory factory=new CachingConnectionFactory();
57 factory.setHost(host);
58 factory.setPort(port);
59 factory.setUsername(username);
60 factory.setPassword(password);
61 factory.setVirtualHost(virtualHost);
62 return factory;
63 }
64 }
65 
66 @Controller
67 @RequestMapping("/producer")
68 public class ProducerController {
69 @Autowired
70 private RabbitTemplate template;
71 
72 @RequestMapping("/send")
73 public void send(HttpServletResponse response) throws InterruptedException, IOException{
74 for(Integer n=0;n<100;n++){
75 CorrelationData correlationData=getCorrelationData();
76 template.convertAndSend("directExchange","directKey1",
77 " queue1"+" "+n.toString(),correlationData);
78 template.convertAndSend("directExchange","directKey2",
79 "queue2"+" "+n.toString(),correlationData); 
80 Thread.currentThread().sleep(30);
81 }
82 }
83 
84 private CorrelationData getCorrelationData(){
85 return new CorrelationData(UUID.randomUUID().toString());
86 }
87 }

Consumer 端代码

代码语言:txt
复制
 1 @Configuration
 2 public class ConnectionConfig {
 3 @Value("${spring.rabbitmq.host}")
 4 public String host;
 5 
 6 @Value("${spring.rabbitmq.port}")
 7 public int port;
 8 
 9 @Value("${spring.rabbitmq.username}")
10 public String username;
11 
12 @Value("${spring.rabbitmq.password}")
13 public String password;
14 
15 @Value("${spring.rabbitmq.virtual-host}")
16 public String virtualHost;
17 
18 @Bean
19 public ConnectionFactory getConnectionFactory(){
20 CachingConnectionFactory factory=new CachingConnectionFactory();
21 factory.setHost(host);
22 factory.setPort(port);
23 factory.setUsername(username);
24 factory.setPassword(password);
25 factory.setVirtualHost(virtualHost);
26 return factory;
27 }
28 }
29 
30 @Configuration
31 public class BindingConfig {
32 public final static String first="direct.first";
33 public final static String second="direct.second";
34 public final static String Exchange_NAME="directExchange";
35 public final static String RoutingKey1="directKey1";
36 public final static String RoutingKey2="directKey2";
37 
38 @Bean
39 public Queue queueFirst(){
40 return new Queue(first);
41 }
42 
43 @Bean
44 public Queue queueSecond(){
45 return new Queue(second);
46 }
47 
48 @Bean
49 public DirectExchange directExchange(){
50 return new DirectExchange(Exchange_NAME);
51 }
52 
53 //利用BindingBuilder绑定Direct与queueFirst
54 @Bean
55 public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
56 return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
57 }
58 
59 //利用BindingBuilder绑定Direct与queueSecond
60 @Bean
61 public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){ 
62 return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
63 } 
64 }
65 
66 @Configuration
67 public class DirectMessListener {
68 @Autowired
69 private ConnectionConfig connectionConfig;
70 @Autowired
71 private RabbitTemplate template;
72 private int index=0;
73 
74 @Bean
75 public DirectMessageListenerContainer messageContainer(){
76 DirectMessageListenerContainer container=new DirectMessageListenerContainer();
77 container.setConnectionFactory(connectionConfig.getConnectionFactory());
78 // 设置每个队列的 consumer 数量
79 container.setConsumersPerQueue(4);
80 container.addQueueNames("direct.first");
81 container.addQueueNames("direct.second");
82 container.setConsumerTagStrategy(queue -> "consumer"+(++index));
83 container.setMessageListener(new ChannelAwareMessageListener(){
84     @Override
85  public void onMessage(Message message, com.rabbitmq.client.Channel channel) 
86 throws Exception {
87 // TODO 自动生成的方法存根
88 // TODO 自动生成的方法存根
89 Thread thread=Thread.currentThread();
90 
91 System.out.println("Channel:"+channel.getChannelNumber() 
92 +" ThreadId is:"+thread.getId()
93 +" ConsumerTag:"+message.getMessageProperties().getConsumerTag()
94 +" Queue:"+message.getMessageProperties().getConsumerQueue());
95 } 
96 });
97 return container;
98 }
99 }

通过运行结果进一步可以证实,consumer 信息接收是由独立的线程池进行管理的,并没有与 channel 绑定,每个 consumer 都有自己单独的 channel,即使 channel 发生问题时,也不会对其他的 consumer 发生影响,这正是 DirectMessageListenerContainer 的优胜之处。

5.5 Consumer 的信息接收确认方式

在第四节曾经介绍过在 Producer 端利用 ConfirmCallback / ReturnCallback 监控信息的发送,在这节将为大家在 Consumer 端监控信息的接收。

Consumer 的信息接收确认模式可以通过 AcknowledgeMode 设定,一共有三种模式:NONE、MANUAL、AUTO,默认是 AUTO 模式。其中 NONE 为系统确认,MANUAL 是手动确认。

而 AUTO 为自动模式,系统可以根据执行情况自动发送 ack / nack。如果方法未抛出异常,则发送 ack。如果抛出异常 AmqpRejectAndDontRequeueException 顾名思义消息被拒绝且不会重新加入队列。如果方法抛出非 AmqpRejectAndDontRequeueException 异常,则系统发送 nack 消息重归队列。

Channel 消息接收的常用方法

方法

说明

void basicAck(long deliveryTag, boolean multiple)

deliveryTag 为该消息的标识,multiple 为 true 代表批量确认同一批次的信息接收成功,为 false 时代表单独判定某个消息接收成功。

void basicReject(long deliveryTag, boolean requeue)

deliveryTag 为该消息的标识,requeue 为 true时,被拒绝的消息会重新进入队列进行推送,为false时消息将不再进入队列

void basicNack(long deliveryTag, boolean multiple, boolean requeue)

deliveryTag 为该消息的标识,multiple 为 true 代表批量确认同一批次的信息接收失败,为 false 时代表单独判定某个消息接收失败。requeue 为 true时,消息会重新进入队列进行推送,为false时消息将不再进入队列

AcknowledgeMode 配置为 MANUAL 后,用户可通过 Channel 类的 void basicAck(long deliveryTag, boolean multiple) 方法手动确认消息接收是否成功。

若检测到有异常,可通过void basicReject(long deliveryTag, boolean requeue) 或 void basicNack(long deliveryTag, boolean multiple, boolean requeue) 确认是否重新把消息推送。

通过配置 prefetchCount 可设置 consumer 每次接收到的信息数量,系统默认值为 250,这表示当 consumer 队列接收到 250 请求其状态皆为 unacked 时,broker server 将暂停向 consumer 发送消息,待消息处理后再继续。

下面例子中我们尝试把 prefetchCount 设置为 10,即每个 consumer 单次最多接收到的消息为 10 条,并把 consumersPerQueue 设置为 4,然后把 AcknowledgeMode 设置为 MANUAL,通过手动确认消息接收,一旦发生错误,消息重新加入队列。

Producer 端代码

代码语言:txt
复制
 1 @Configuration
 2 public class BindingConfig {
 3 public final static String first="direct.first";
 4 public final static String second="direct.second";
 5 public final static String Exchange_NAME="directExchange";
 6 public final static String RoutingKey1="directKey1";
 7 public final static String RoutingKey2="directKey2";
 8 
 9 @Bean
10 public Queue queueFirst(){
11 return new Queue(first);
12 }
13 
14 @Bean
15 public Queue queueSecond(){
16 return new Queue(second);
17 }
18 
19 @Bean
20 public DirectExchange directExchange(){
21 return new DirectExchange(Exchange_NAME);
22 }
23 
24 //利用BindingBuilder绑定Direct与queueFirst
25 @Bean
26 public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
27 return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
28 }
29 
30 //利用BindingBuilder绑定Direct与queueSecond
31 @Bean
32 public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){ 
33 return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
34 } 
35 }
36 
37 @Configuration
38 public class ConnectionConfig {
39 @Value("${spring.rabbitmq.host}")
40 public String host;
41 
42 @Value("${spring.rabbitmq.port}")
43 public int port;
44 
45 @Value("${spring.rabbitmq.username}")
46 public String username;
47 
48 @Value("${spring.rabbitmq.password}")
49 public String password;
50 
51 @Value("${spring.rabbitmq.virtual-host}")
52 public String virtualHost;
53 
54 @Bean
55 public ConnectionFactory getConnectionFactory(){
56 CachingConnectionFactory factory=new CachingConnectionFactory();
57 factory.setHost(host);
58 factory.setPort(port);
59 factory.setUsername(username);
60 factory.setPassword(password);
61 factory.setVirtualHost(virtualHost);
62 return factory;
63 }
64 }
65 
66 @Controller
67 @RequestMapping("/producer")
68 public class ProducerController {
69 @Autowired
70 private RabbitTemplate template;
71 
72 @RequestMapping("/send")
73 public void send(HttpServletResponse response) throws InterruptedException, IOException{
74 for(Integer n=0;n<100;n++){
75 CorrelationData correlationData=getCorrelationData();
76 template.convertAndSend("directExchange","directKey1",
77 " queue1"+" "+n.toString(),correlationData);
78 template.convertAndSend("directExchange","directKey2",
79 "queue2"+" "+n.toString(),correlationData); 
80 }
81 }
82 
83 private CorrelationData getCorrelationData(){
84 return new CorrelationData(UUID.randomUUID().toString());
85 }
86 }

运行后可看到 Broker Server 每条 queue 会有 100 条数据处于待处理状态

Consumer 端代码

代码语言:txt
复制
 1 @Configuration
 2 public class ConnectionConfig {
 3 @Value("${spring.rabbitmq.host}")
 4 public String host;
 5 
 6 @Value("${spring.rabbitmq.port}")
 7 public int port;
 8 
 9 @Value("${spring.rabbitmq.username}")
 10 public String username;
 11 
 12 @Value("${spring.rabbitmq.password}")
 13 public String password;
 14 
 15 @Value("${spring.rabbitmq.virtual-host}")
 16 public String virtualHost;
 17 
 18 @Bean
 19 public ConnectionFactory getConnectionFactory(){
 20 CachingConnectionFactory factory=new CachingConnectionFactory();
 21 factory.setHost(host);
 22 factory.setPort(port);
 23 factory.setUsername(username);
 24 factory.setPassword(password);
 25 factory.setVirtualHost(virtualHost);
 26 return factory;
 27 }
 28 }
 29 
 30 @Configuration
 31 public class BindingConfig {
 32 public final static String first="direct.first";
 33 public final static String second="direct.second";
 34 public final static String Exchange_NAME="directExchange";
 35 public final static String RoutingKey1="directKey1";
 36 public final static String RoutingKey2="directKey2";
 37 
 38 @Bean
 39 public Queue queueFirst(){
 40 return new Queue(first);
 41 }
 42 
 43 @Bean
 44 public Queue queueSecond(){
 45 return new Queue(second);
 46 }
 47 
 48 @Bean
 49 public DirectExchange directExchange(){
 50 return new DirectExchange(Exchange_NAME);
 51 }
 52 
 53 //利用BindingBuilder绑定Direct与queueFirst
 54 @Bean
 55 public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
 56 return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
 57 }
 58 
 59 //利用BindingBuilder绑定Direct与queueSecond
 60 @Bean
 61 public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){ 
 62 return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
 63 } 
 64 }
 65 
 66 @Configuration
 67 public class DirectMessListener {
 68 @Autowired
 69 private ConnectionConfig connectionConfig;
 70 @Autowired
 71 private RabbitTemplate template;
 72 private int index=0;
 73 
 74 @Bean
 75 public DirectMessageListenerContainer messageContainer(){
 76 DirectMessageListenerContainer container=new DirectMessageListenerContainer();
 77 container.setConnectionFactory(connectionConfig.getConnectionFactory());
 78 // 设置每个队列的 consumer 数量
 79 container.setConsumersPerQueue(4);
 80 // 设置每个 consumer 每次的接收的消息数量为10个
 81 container.setPrefetchCount(10);
 82 // 使用MANUAL进行手动确认 
 83 container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
 84 container.addQueueNames("direct.first");
 85 container.addQueueNames("direct.second");
 86 container.setConsumerTagStrategy(queue -> "consumer"+(++index));
 87 container.setMessageListener(new ChannelAwareMessageListener(){
 88 @Override
 89 public void onMessage(Message message, com.rabbitmq.client.Channel channel) 
 90 throws Exception {
 91 Thread thread=Thread.currentThread();
 92 MessageProperties prop=message.getMessageProperties();
 93 try{
 94 System.out.println("Channel:"+channel.getChannelNumber() 
 95 +" ThreadId is:"+thread.getId()
 96 +" ConsumerTag:"+prop.getConsumerTag()
 97 +" Queue:"+prop.getConsumerQueue());
 98 //通过Tag单个确认
 99 channel.basicAck(prop.getDeliveryTag(), false);
100 }catch(Exception ex){
101 //判定单个接收失败,重新加入consumer队列
102 channel.basicReject(prop.getDeliveryTag(), true);
103 }
104 thread.sleep(1000);
105 } 
106 });
107 return container;
108 }
109 }

观察信息接收情况,每个 consumer 一次可处理10条信息,对队列进行分批处理。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
消息队列 CMQ 版
消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档