前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >SpringBoot+RabbitMQ 实现延迟队列

SpringBoot+RabbitMQ 实现延迟队列

作者头像
水货程序员
发布2020-04-22 11:46:12
6030
发布2020-04-22 11:46:12
举报
文章被收录于专栏:javathingsjavathings

9 total views, 3 views today

RabbitMQ 实现延迟队列,方法有两种。

第一种是安装延迟队列插件;

第二种就是利用死信队列的方式;

这里采用第二种方式。

rabbitmq 自身的一些概念,可以去网上或者书上获得。rabbitmq 延迟队列的实现原理,网上资料很多,简单盗图一张。

简单说明一下原理。

将消息发送到一个队列中去,消息自身有一个 TTL,即失效时间,如果到期还是为消费该消息,那么该消息就成为死信,将死信移到专门的死性队列,然后消费者只需要消费死信队列中的消息,变相的实现了延迟消息的功能。

基于 Springboot 的具体代码实现:

代码语言:javascript
复制
<!-- rabbitmq -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置文件:

代码语言:javascript
复制
spring:
  rabbitmq:
    host: 111.22.1.2 
    port: 5672
    username: user
    password: passw
    template:
      mandatory: true
    #支持发布确认与返回
    publisher-confirms: true
    publisher-returns: true
    listener:
      simple:
        #是否自动开始监听消息队列
        auto-startup: false
        #手动应答
        acknowledge-mode: manual
        #监听容器数及最大数
        concurrency: 1
        max-concurrency: 1
        #是否支持重试
        retry:
          enabled: true

配置文件 TopicRabbitConfig.Java

代码语言:javascript
复制
package com.mine.config;


import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class TopicRabbitConfig {


    //延迟队列的名字
    public static final String delayQueueName = "delay_machine_check_queue";
    //延迟队列的exchange名字
    public static final String delayExchangeName = "delay_machine_check_exchange";

    //死信exchange的名称
    public static final String deadLetterProcessQueueName = "dead_letter_process_queue";
    //死信exchange的名称
    public static final String deadLetterProcessExchangeName = "dead_letter_machine_check_exchange";

    public static final String routingKey = "machine_check";


    //延迟队列的exchange
    @Bean
    public TopicExchange delayExchange() {
        return new TopicExchange(delayExchangeName, true, false);
    }

    //死信队列的exchange
    @Bean
    public TopicExchange deadLetterProcessExchange() {
        return new TopicExchange(deadLetterProcessExchangeName, true, false);
    }

    //延迟队列
    @Bean
    Queue delayQueue() {
        Map<String, Object> args = new HashMap<>();
        //args.put("x-message-ttl", 20000);
        args.put("x-dead-letter-exchange", deadLetterProcessExchangeName); //DLX,dead letter发送到的exchange
        args.put("x-dead-letter-routing-key", routingKey);
        return new Queue(delayQueueName, true, false, false, args);
    }

    //死信队列
    @Bean
    public Queue deadLetterProcessQueue() {
        return new Queue(deadLetterProcessQueueName, true, false, false);
    }

    //绑定延迟队列,延迟Exchange,routing关系
    @Bean
    Binding bindingDelayExchange(Queue delayQueue, TopicExchange delayExchange) {
        return BindingBuilder.bind(delayQueue).to(delayExchange).with(routingKey);
    }

    //绑定死信队列,死信Exchange,routing关系
    //参数根据Spring命名约定的方式,会将上面的Queue实例和exchange实例注入进来,形成绑定关系
    @Bean
    Binding bindingDeadLetterProcessExchange(Queue deadLetterProcessQueue, TopicExchange deadLetterProcessExchange) {
        return BindingBuilder.bind(deadLetterProcessQueue).to(deadLetterProcessExchange).with(routingKey);
    }
}

监听代码:

代码语言:javascript
复制
//只监听听死信队列即可
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = TopicRabbitConfig.deadLetterProcessQueueName, durable = "true", autoDelete = "false"),
        exchange = @Exchange(name = TopicRabbitConfig.deadLetterProcessExchangeName, durable = "true", type = "topic", autoDelete = "false"),
        key = TopicRabbitConfig.routingKey), autoStartup = "true", id = "myconsumer")
public void receiveRabbitMQ(Message message, Channel channel) throws Exception {
    try {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        LogHelper.log4j2.info("收到消息:" + new String(message.getBody()));

    } catch (Exception ex) {
        LogHelper.log4j2.error("receive", ex);
    }
}

上面 2 段代码,实现的效果就是如下:

发送消息的代码:RabbitMQTopicSender.java

代码语言:javascript
复制
package com.mine.utils;

import com.mine.config.TopicRabbitConfig;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;


@Component
public class RabbitMQTopicSender {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send(String message, Long delaySecond) {

        /***
         * 方法参数说明
         * https://docs.spring.io/spring-amqp/docs/latest_ga/api/org/springframework/amqp/rabbit/core/RabbitTemplate.html
         convertAndSend(String exchange, String routingKey, Object object)
         Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
         ***/

        message = message + "【" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) + "】";
        LogHelper.log4j2.info("message:" + message);
        MessageProperties props = new MessageProperties();
        props.setExpiration(Long.toString(delaySecond * 1000));//消息的延迟时间
        Message ttlMessage = new Message(message.getBytes(), props);

        rabbitTemplate.convertAndSend(TopicRabbitConfig.delayExchangeName, TopicRabbitConfig.routingKey, ttlMessage);
        LogHelper.log4j2.info("消息发送成功");
    }
}

至此调用 send 方法,即可发送延迟队列。

以上代码亲测有效。

原创文章,转载请注明出处!https://cloud.tencent.com/developer/article/1618510

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档