前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ与SpringBoot2.0整合

RabbitMQ与SpringBoot2.0整合

作者头像
用户1212940
发布2022-04-13 15:34:31
2540
发布2022-04-13 15:34:31
举报
文章被收录于专栏:LambdaLambda

application.properties:

代码语言:javascript
复制
spring.rabbitmq.addresses=192.
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000

spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true

RabbitMQ与SpringBoot整合配置详解:

1. 生产端核心配置


  • publisher-confirms,实现一个监听器用于监听Broker端为我们返回的确认请求: RabbitTemplate.ConfirmCallback
  • publisher-returns,保证消息对Broker端是可达的,如果出现路由键不可达的情况,则使用监听器对不可达的消息进行后续的处理,保证消息的路由成功: RabbitTemplate.ReturnCallback
  • 注意一点,在发送消息时候对template进行设置mandatory=true保证监听有效
  • 生产端还可以配置其他属性,比如发送重试、超时时间、次数、间隔等。

RabbitSender:

代码语言:javascript
复制
package com.pyy.springboot.producer;


import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
public class RabbitSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.err.println("correlationData:" + correlationData);
            System.err.println("ack:" + ack);
            if(!ack) {
                System.err.println("异常处理...");
            }else {
                // 更新数据库对应的消息状态:已发送
            }
        }
    };


    final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
        @Override
        public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText, String exchange, String routingKey) {
            System.err.println("return exchange:" + exchange + " , routingKey:" + routingKey + ", replyCode:" + replyCode + ", replyText:" + replyText);
        }
    };

    public void send(Object message, Map<String, Object> headerProperties) throws Exception {
        MessageHeaders messageHeaders = new MessageHeaders(headerProperties);
        Message msg = MessageBuilder.createMessage(message, messageHeaders);

        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);

        CorrelationData correlationData = new CorrelationData();
        correlationData.setId("userid" + System.currentTimeMillis());// id + 时间戳 全局唯一 实际消息的id
        //rabbitTemplate.convertAndSend("pyy.exchange", "springboot.hello", msg, correlationData);

        rabbitTemplate.convertAndSend("pyy.exchange", "fasdfsf.hello", msg, correlationData);

    }
}

2. 消费端核心配置


代码语言:javascript
复制
spring.rabbitmq.listener.simple.acknowledge-mode=MANUAL
spring.rabbitmq.listener.simple.concurrency=1
spring.rabbitmq.listener.simple.max-concurrency=5
  • 首先配置ACK手工确认模式,用于ACK的手工处理,这样我可以保证消息的可靠性送达,或者在消费失败时候可以做到重回队列、根据业务记录日志等处理。
  • 可以设置消费端的监听个数和最大个数,用于控制消费端的并发情况

@RabbitListener注解使用

  • 消费端监听@RabbitMQListener注解,这个对于在实际工作中非常的好用
  • @RabbitListener只一个组合的注解,里面可以注解配置@QueueBinding@Queue@Exchange直接通过这个组合注解一次性搞定消费端交换机、队列、绑定、路由、并且配置监听功能等
代码语言:javascript
复制
package com.pyy.mq.service;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

/**
 * 消息接收者
 * @RabbitListener bindings:绑定队列
 *   @QueueBinding  value:绑定队列的名称
 *                exchange:配置交换器
 * 
 *     @Queue value:配置队列名称
 *        autoDelete:是否是一个可删除的临时队列
 * 
 *     @Exchange value:为交换器起个名称
 *           type:指定具体的交换器类型
 */
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "${mq.config.queue.info}", autoDelete = "true"),
                exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.DIRECT),
                key = "${mq.config.queue.info.routing.key}"
        )
)
public class InfoReceiver {

    /**
     * 接收消息方法,采用消息队列监听机制
     * @param msg
     */
    @RabbitHandler
    public void process(String msg) {
        System.out.println("Info receiver:" + msg);
    }
}

@RabbitListener注解如果没有存在exchange和queue会自动创建

案例详细代码:https://github.com/pyygithub/springboot-rabbitmq

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018-10-22 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • application.properties:
  • RabbitMQ与SpringBoot整合配置详解:
  • 1. 生产端核心配置
  • 2. 消费端核心配置
  • @RabbitListener注解使用
相关产品与服务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档