专栏首页alexzhangSpringBoot RabbitMQ

SpringBoot RabbitMQ

RabbitMQ 介绍

RabbitMQ的流程是:生产者将消息发送到对应交换机上,交换机再将消息转发到绑定的队列上,消费者从绑定的队列获取消息进行消费。

image.png

交换机的主要作用是接收相应的消息并且绑定到指定的队列.交换机有四种类型,分别为Direct,topic,headers,Fanout.

SpringBoot整合RabbitMQ

添加依赖:

<!-- 添加springboot对amqp的支持 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

在application.yml文件中添加配置

spring:
  application:
    name: async-task
  rabbitmq:
    host: 192.168.255.255
    port: 5672
    username: xxx
    password: 123456

Direct模式

Direct是RabbitMQ默认的交换机模式,也是最简单的模式.即创建消息队列的时候,指定一个路由键(RoutingKey).当发送者发送消息的时候,指定对应的Key.当Key和消息队列的RoutingKey一致的时候,消息将会被发送到该消息队列中.

@Configuration
public class RabbitMQConfig{
    // 交换机有四种类型,分别为Direct,topic,headers,Fanout.

    // Direct 模式创建队列
    // 创建队列
    @Bean
    public Queue testQueue() {
        return new Queue("queueName");
    }

    // 创建一个交换机
    @Bean
    public DirectExchange testExchange() {
        return new DirectExchange("exchangeName");
    }

    // 把队列和交换机绑定在一起
    @Bean
    public Binding testBinding(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("routingKey");
    }
}

// 消息生产者
@Component
public class HelloSender {
    @Autowired
    private RabbitTemplate template;
    
    public void send() {
    template.convertAndSend("exchangeName", "routingKey", "hello,rabbit~");
    }
}

// 定义消费者
@Component
public class TestConsumer {

    @RabbitListener(queues = "queueName")
    public void process(String data){
        log.info("------------data: {}",data);
    }
}

AmqpTemplate 发送的消息数据还可以是对象,但对象必须序列化

// 消息生产者
@Component
public class HelloSender {
    @Autowired
    private RabbitTemplate template;
    
    public void send() {
    User user = new User("name","password");
    template.convertAndSend("exchangeName", "routingKey",user);
    }
}


// 定义消费者
@Component
public class TestConsumer {

    @RabbitListener(queues = "queueName")
    public void process(User user){
        log.info("------------user: {}",user);
    }

@RabbitListener 可以作用在类上,需要和 @RabbitHandler 配合使用

@Component
@RabbitListener(queues = "queueName")
public class TestConsumer {

    @RabbitHandler
    public void process(String data){
        log.info("------------data: {}",data);
    }

    @RabbitHandler
    public void process(UserInfo userInfo){
        log.info("------------userInfo: {}",userInfo);
    }
    
    public void test(){
        log.info("------------");
    }
}

上面的TestConsumer 消费者接收所有路由键为 routingKey 的消息,队列中的消息会转发到被@RabbitHandler修饰的方法然后被消费,不同的消息类型被转发到对应的方法中。test()方法不会消费消息 RabbitMq 服务启动后会创建一个默认的DirectExchange,这个交换机只接收 路由键routingKey 和 队列名称相同的消息,所以direct模式可以简化:

@Configuration
public class RabbitMQConfig{
    // Direct 模式创建队列
    // 创建队列
    @Bean
    public Queue testQueue() {
        return new Queue("routingKey");  // 队列名和routingKey相同
    }
}

// 消息生产者
@Component
public class HelloSender {
    @Autowired
    private RabbitTemplate template;
    
    public void send() {
    template.convertAndSend("routingKey", "hello,rabbit~"); // 没有交换机名称,消息会被发送到默认交换机,然后被转发到 名称和routingKey相同的队列上
    }
}

// 定义消费者
@Component
public class TestConsumer {

    @RabbitListener(queues = "routingKey")
    public void process(String data){
        log.info("------------data: {}",data);
    }
}

Topic模式

topic转发信息主要是依据通配符,队列和交换机的绑定主要是依据一种模式(通配符+字符串),而当发送消息的时候,只有指定的Key和该模式相匹配的时候,消息才会被发送到该消息队列中. 通配符:* 表示一个词,# 表示零个或多个词 注意: 通配符是针对交换机的!!!也就是说消息进入交换机时才进行通配符匹配,匹配完了以后才进入固定的队列

@Configuration
public class RabbitMQConfig{

   // 交换机有四种类型,分别为Direct,topic,headers,Fanout.

   // topit 模式

   @Bean(name="queueName1")
   public Queue queueMessage1() {
       return new Queue("queueName1");      // 定义第一个队列,名称为 queueName1
   }
   @Bean(name="queueName2")
   public Queue queueMessage2() {
       return new Queue("queueName2");     // 定义第二个队列,名称为 queueName2
   }
   @Bean
   public TopicExchange exchange() {
       return new TopicExchange("exchangeName");  // 定义交换机
   }
   // 定义绑定关系,通过交换机 将名称为queueName1 的队列绑定到交换机上, routingKey 为 topic.key1
   @Bean
   public Binding bindingExchangeMessage(@Qualifier("queueName1") Queue queue, TopicExchange exchange) {
       return BindingBuilder.bind(queue).to(exchange).with("topic.key1");
   }
   // 定义另一个绑定关系,通过交换机 将名称为queueName2 的队列绑定到交换机上 ,routingKey 是符合 通配符topic.#  的路由键
   // 如:topic.xx、topic.yy 等
   @Bean
   public Binding bindingExchangeMessages(@Qualifier("queueName2") Queue queue, TopicExchange exchange) {
       return BindingBuilder.bind(queue).to(exchange).with("topic.#");//*表示一个词,#表示零个或多个词
   }

消费者

    @RabbitListener(queues="queueName1")    //监听器监听指定的Queue
    public void process1(String str) {    
        System.out.println("message:"+str);
    }
    @RabbitListener(queues="queueName2")    //监听器监听指定的Queue
    public void process2(String str) {
        System.out.println("messages:"+str);
    }

然后发送消息

// 2个消费者都会收到消息
template.convertAndSend("exchangeName", "topic.key1", "data info");
// 只有第2个消费者收到消息
template.convertAndSend("exchangeName", "topic.key2", "data info");
// 只有第2个消费者收到消息
template.convertAndSend("exchangeName", "topic.key3", "data info");

Fanout模式

fanout是路由广播的形式,将会把消息发给绑定它的全部队列,即便设置了key,也会被忽略. 因此我们发送到交换机的消息会使得绑定到该交换机的每一个Queue接收到消息,这个时候就算指定了路由键(routingKey),或者规则(即上文中convertAndSend方法的参数2),也会被忽略!

// fanout模式
    @Bean(name="Amessage")
    public Queue AMessage() {
        return new Queue("fanout.A");
    }


    @Bean(name="Bmessage")
    public Queue BMessage() {
        return new Queue("fanout.B");
    }

    @Bean(name="Cmessage")
    public Queue CMessage() {
        return new Queue("fanout.C");
    }

    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");//配置广播路由器
    }

    @Bean
    Binding bindingExchangeA(@Qualifier("Amessage") Queue AMessage,FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(AMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeB(@Qualifier("Bmessage") Queue BMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(BMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeC(@Qualifier("Cmessage") Queue CMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(CMessage).to(fanoutExchange);
    }

然后发送消息

template.convertAndSend("fanoutExchange", "", "data info"); // 第二个参数会被忽略

消费者

    @RabbitListener(queues="fanout.A")
    public void processA(String str1) {
        System.out.println("ReceiveA:"+str1);
    }
    @RabbitListener(queues="fanout.B")
    public void processB(String str) {
        System.out.println("ReceiveB:"+str);
    }
    @RabbitListener(queues="fanout.C")
    public void processC(String str) {
        System.out.println("ReceiveC:"+str);
    }

结果三个都收到消息

image.png

RabbitMQ 实现延迟队列

rabbitMQ可以通过死信机制来实现延迟队列的功能,一些概念: 1、TTL ,即 Time-To-Live,存活时间,消息和队列都可以设置存活时间 2、Dead Letter,即死信,若给消息设置了存活时间,当超过存活时间后消息还没有被消费,则该消息变成了死信 3、Dead Letter Exchanges(DLX),即死信交换机 4、Dead Letter Routing Key (DLK),死信路由键 直接上代码:

@Configuration
public class RabbitMQConfig { 

    // 创建一个立即消费队列
    @Bean(QueueName.ImmediateQueue)
    public Queue immediateQueue() {
        return new Queue(QueueName.ImmediateQueue);
    }

    @Bean(ExchangeName.IMMEDIATE)
    public DirectExchange immediateExchange() {
        return new DirectExchange(ExchangeName.IMMEDIATE);
    }

    // 把 立即消费的队列 和 立即消费的exchange 绑定在一起
    @Bean
    public Binding immediateBinding(@Qualifier(QueueName.ImmediateQueue) Queue queue, @Qualifier(ExchangeName.IMMEDIATE) DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(RoutingKey.IMMEDIATE_ROUTING_KEY);
    }


    // 创建一个延时队列
    @Bean(QueueName.DelayQueue)
    public Queue delayQueue() {
        Map<String, Object> params = new HashMap<>();

        // x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,
        params.put("x-dead-letter-exchange", ExchangeName.IMMEDIATE);

        // x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key
        params.put("x-dead-letter-routing-key", RoutingKey.IMMEDIATE_ROUTING_KEY);

        // 设置队列中消息的过期时间,单位 毫秒
        params.put("x-message-ttl", 5 * 1000);

        return new Queue(QueueName.DelayQueue, true, false, false, params);
    }

    @Bean(ExchangeName.DELAY)
    public DirectExchange deadLetterExchange() {
        return new DirectExchange(ExchangeName.DEAD_LETTER);
    }

    // 把 延迟消费的队列 和 延迟消费的exchange 绑定在一起
    @Bean
    public Binding delayBinding(@Qualifier(QueueName.DelayQueue) Queue queue, @Qualifier(ExchangeName.DELAY) DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(RoutingKey.DELAY_KEY);
    }
}

@Component
public class TestConsumer {
    @RabbitListener(queues = QueueName.ImmediateQueue)
    public void process2(UserInfo userInfo){
        log.info("------------userInfo: {}",userInfo);
    }
}

// 常量
public interface ExchangeName {
    String IMMEDIATE = "immediate";
    String DELAY = "delay";
}
public interface QueueName {
    String ImmediateQueue = "ImmediateQueue";
    String DelayQueue = "DelayQueue";
}
public interface RoutingKey {
    String IMMEDIATE_ROUTING_KEY = "immediate.key";
    String DELAY_KEY = "delay.key";
}

image.png

过程: 1、先创建一个普通队列,即上面的 ImmediateQueue,创建一个普通交换机 immediateExchange,绑定两者。 2、创建一个延迟队列,即创建时设置了参数:x-dead-letter-exchange,x-dead-letter-routing-key,x-message-ttl,该队列就相当于是一个延迟队列了 3、创建延迟交换机(其实也是普通交换机),和延迟队列绑定 4、给ImmediateQueue创建监听消费者,注意,延迟队列不要设置监听消费者,不然延迟队列就变成普通队列了,不起作用 到此延迟队列已完成,直接发送消息到延迟交换机即可

        UserInfo userInfo = new UserInfo();
        userInfo.setPhone("15800000000");
        userInfo.setUserName("aaaaaaa");

        log.info("开始发送消息");
        template.convertAndSend(ExchangeName.DELAY, RoutingKey.DELAY_KEY, userInfo);

原理:发送消息到延迟交换机,延迟交换机将消息转发到延迟队列,因为延迟队列没有监听消费者,所以消息不会被消费,直到消息超过存活时间(即 延迟)变成死信,这时延迟队列会将死信转发到死信交换机,即上面的immediateExchange(因为延迟队列绑定的死信交换机x-dead-letter-exchange指向了immediateExchange),immediateExchange将消息转发给ImmediateQueue,然后被监听消费者消费

测试结果:

image.png

可以看出过了5秒才消费消息

注意: 一个延迟队列只能设置一个存活时间,即该延迟队列里面的所有消息的存活时间都必须一致,如果需要设置不一样的存活时间,只能再创建一个延迟队列。原因是延迟队列并不会去扫描队列里面所有消息的存活时间,只会判断队列头的第一个消息是否过期,若过期了就转发消息,否则一直等待,即使队列后面已经有消息先过期,也只能等前面的消息被转发后,该消息才被转发。

消息确认机制

消息确认分为两部分: 生产确认 和 消费确认。 生产确认: 生产者生产消息后,将消息发送到交换机,触发确认回调;交换机将消息转发到绑定队列,若失败则触发返回回调。 消费确认: 默认情况下消息被消费者从队列中获取后即发送确认,不管消费者处理消息时是否失败,不需要额外代码,但是不能保证消息被正确消费。我们增加手动确认,则需要代码中明确进行消息确认。 在配置文件中添加:

spring:
 application:
   name: async-task
 rabbitmq:
   host: 192.168.0.0
   port: 5672
   username: username
   password: password
   publisher-confirms: true  # 开启发送确认
   publisher-returns: true   # 开启发送失败退回
   template:
     mandatory: true
   listener:
     type: simple
     simple:
       acknowledge-mode: manual # 开启消息消费手动确认

在RabbitMQConfig 中添加如下配置

    @Bean
    public RabbitTemplate getTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate template = new RabbitTemplate(connectionFactory);

        // 消息发送到交换器Exchange后触发回调
        template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                //  可以进行消息入库操作
                log.info("消息唯一标识 correlationData = {}", correlationData);
                log.info("确认结果 ack = {}", ack);
                log.info("失败原因 cause = {}", cause);
            }
        });

        // 配置这个,下面的ReturnCallback 才会起作用
        template.setMandatory(true);
        // 如果消息从交换器发送到对应队列失败时触发(比如 根据发送消息时指定的routingKey找不到队列时会触发)
        template.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                //  可以进行消息入库操作
                log.info("消息主体 message = {}", message);
                log.info("回复码 replyCode = {}", replyCode);
                log.info("回复描述 replyText = {}", replyText);
                log.info("交换机名字 exchange = {}", exchange);
                log.info("路由键 routingKey = {}", routingKey);
            }
        });

        return template;
    }

成功确认: void basicAck(long deliveryTag, boolean multiple) throws IOException; deliveryTag:该消息的index multiple:是否批量. true:将一次性ack所有小于deliveryTag的消息。 消费者成功处理后,调用channel.basicAck(message.getMessageProperties().getDeliveryTag(), false)方法对消息进行确认。

失败确认: void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException; deliveryTag:该消息的index。 multiple:是否批量. true:将一次性拒绝所有小于deliveryTag的消息。 requeue:是否重新入队列。

拒绝 void basicReject(long deliveryTag, boolean requeue) throws IOException; deliveryTag:该消息的index。 requeue:被拒绝的是否重新入队列。

channel.basicNack 与 channel.basicReject 的区别在于basicNack可以批量拒绝多条消息,而basicReject一次只能拒绝一条消息

消费者消息确认代码:

@Component
public class OrderConsumer {

    @Autowired
    private TaskOrderFeign orderFeign;

    @RabbitListener(queues = QueueName.OrderCancelQueue)
    public void process(OrderDTO orderDTO, Message message, Channel channel){

        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            log.info("---消费消息---------deliveryTag = {} ,  orderDTO: {}",deliveryTag ,orderDTO);

            // 取消订单
            orderDTO.setState(OrderStateEnum.DELAY_CANCEL.code);
            Response response = orderFeign.cancelOrder(orderDTO);

            //TODO 判断结果,是否需要重试

            // 成功确认消息
            channel.basicAck(deliveryTag, true);

        } catch (IOException e) {
            log.error("确认消息时抛出异常 , e = {}", PrintUtil.print(e));
            // 重新确认
            // 成功确认消息
            try {
                Thread.sleep(50);
                channel.basicAck(deliveryTag, true);
            } catch (IOException | InterruptedException e1) {
                log.error("确认消息时抛出异常 , e = {}", PrintUtil.print(e));
                // 可以考虑入库
            }

        } catch (Exception e) {

            log.error("取消订单失败 , e = {}", PrintUtil.print(e));

            try {
                // 失败确认
                channel.basicNack(deliveryTag, false, false);
            } catch (IOException e1) {
                log.error("消息失败确认失败 , e1 = {}", PrintUtil.print(e1));
            }
        }
    }
}

作者:万物归于简 链接:https://www.jianshu.com/p/2e6280ab2bc9 来源:简书 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

本文转载自: https://www.jianshu.com/p/2e6280ab2bc9复制
如有侵权,请联系 cloudcommunity@tencent.com 删除。
登录 后参与评论
0 条评论

相关文章

  • SpringBoot 整合 RabbitMQ

    RabbitMQ 是由 Erlang 语言编写的实现了高级消息队列协议(AMQP)的开源消息代理软件(也可称为面向消息的中间件)。其支持 Windows、Lin...

    好好学java
  • SpringBoot整合RabbitMQ

    崔笑颜
  • SpringBoot集成RabbitMQ

    添加依赖 配置 无法远程连接,需要配置文件 cd etc/rabbitmq

    JavaEdge
  • SpringBoot整合RabbitMQ

    RabbitMQ是开源消息队列系统,用erlang语言开发。如果不了解可以查看官网http://www.rabbitmq.com/ 这篇文章介绍一个spring...

    dalaoyang
  • SpringBoot 整合 RabbitMQ

    特殊说明: 解决问题的光鲜,藏着磕Bug的痛苦。 万物皆入轮回,谁也躲不掉! 以上文章,均是我实际操作,写出来的笔记资料,不会出现全文盗用别人文章...

    收心
  • springboot之RabbitMQ

    RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。

    Vincent-yuan
  • SpringBoot集成RabbitMq

    本站文章除注明转载/出处外,皆为作者原创,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

    Cheng_Blog
  • SpringBoot整合RabbitMQ

    1.创建生产者SpringBoot工程 2.导入依赖坐标 3.编写yml配置,基本信息配置 4.定义交换机,队列以及绑定关系的配置类 5.注入Rabbi...

    一只胡说八道的猴子
  • SpringBoot 整合 Rabbitmq

    场景:用户注册,信息写入数据库后,需要给用户发送注册成功的邮件,再发送注册成功的邮件。

    jwangkun
  • SpringBoot 整合RabbitMQ

    MQ(Message Quene):通过典型的生产者和消费者模型,生产者不断向消息队列中产生消息,消费者不断的从队列中获取消息。因为生产者和消费者都是异步的,而...

    jwangkun
  • SpringBoot教程(十五) | SpringBoot集成RabbitMq

    RabbitMq是我们在开发过程中经常会使用的一种消息队列。今天我们来研究研究rabbitMq的使用。

    一缕82年的清风
  • springboot(八):RabbitMQ详解

    RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。 消息中间件在互联网公司的使用中越来越多,刚才还看到...

    纯洁的微笑
  • RabbitMQ与SpringBoot整合

    RabbitMQ是消息中间件的一种,消息中间件即分布式系统中完成消息的发送和接收的基础软件.这些软件有很多,包括ActiveMQ(apache公司的),Rock...

    Java编程指南
  • springboot集成rabbitmq(实战)

    名山丶深处
  • springboot与rabbitmq整合

    之前学习了rabbitmq,对其基本的用法有了一定的认识,但对其深层次的使用必须依赖于具体的业务,就像编程语言一样,提供的基础的使用规范,但是却产生了那么多高性...

    sucl
  • (16)SpringBoot整合RabbitMQ

    https://blog.csdn.net/weixin_39800144/article/details/89037698

    IT云清
  • SpringBoot ( 八 ) :RabbitMQ 详解

    RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。

    前朝楚水
  • 原创 | Springboot整合RabbitMQ

    MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。MQ是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息...

    润森

扫码关注云+社区

领取腾讯云代金券