前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Rabbitmq 通过死信队列实现延迟消息发送

Rabbitmq 通过死信队列实现延迟消息发送

作者头像
芥末鱿鱼
发布2022-05-05 14:55:24
4590
发布2022-05-05 14:55:24
举报
文章被收录于专栏:玩转 Spring Cloud玩转 Spring Cloud

Rabbitmq 通过死信队列实现延迟消息发送

文章目录

设置消息的过期时间(TTL)

TTL, Time to Live 的简称, 即过期时间.

两种方法设置 TTL

  1. 通过队列属性设置. 即队列中所有的消息都有相同的过期时间. 在 channel.queueDeclare 方法中加入 x-message-ttl 参数实现, 单位是毫秒
  2. 对消息本身进行单独设置. 即每条消息的 TTL 可以不同. 在 channel.basicPublish 方法中加入 expiration 属性, 单位是毫秒 总结: 如果两种方法一起使用, 则以较小的那个 TTL 为准. 消息过期后, 消费者无法再接收该消息, 就会变成死信(Dead Message). 这个特性可以实现延迟队列功能

Java 代码实现

给队列设置 TTL

代码语言:javascript
复制
@Configuration
public class RabbitConfig implements ApplicationContextAware {
    
    private ApplicationContext applicationContext;    
    
    @PostConstruct
    public RabbitAdmin rabbitAdmin() {
        RabbitAdmin rabbitAdmin = applicationContext.getBean("rabbitAdmin", RabbitAdmin.class);
        rabbitAdmin.declareExchange(new TopicExchange("exchange.expiration"));
        Map<String, Object> argMap = new HashMap<>();
        argMap.put("x-message-ttl", 2000);// 2s 过期
        Queue queue = new Queue("queue.expiration", true, false, false, argMap);
        rabbitAdmin.declareQueue(queue);
        rabbitAdmin.declareBinding(
            BindingBuilder.bind(queue).to(new TopicExchange("exchange.expiration")).with("routingkey.expiration"));
        return rabbitAdmin;
    }
}

@Service
public class SendMQService {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void sendMessage(String exchange, String routingKey, String msg) {
        rabbitTemplate.convertAndSend(exchange, routingKey, msg);
    }
}

给每一个消息单独设置 TTL

代码语言:javascript
复制
@Configuration
public class RabbitConfig implements ApplicationContextAware {
    
    private ApplicationContext applicationContext;  
    
    @PostConstruct
    public RabbitAdmin rabbitAdmin() {
        RabbitAdmin rabbitAdmin = applicationContext.getBean("rabbitAdmin", RabbitAdmin.class);
        rabbitAdmin.declareExchange(new TopicExchange("exchange.expiration"));
        rabbitAdmin.declareQueue(new Queue("queue.expiration"));
        rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("queue.expiration"))
            .to(new TopicExchange("exchange.expiration")).with("routingkey.expiration"));
        return rabbitAdmin;
    }
}

@Service
public class SendMQService {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void sendMessage(String exchange, String routingKey, String msg, Integer expirationTime) {
        rabbitTemplate.convertAndSend(exchange, routingKey, msg, message -> {
            message.getMessageProperties().setExpiration(expirationTime);
            return message;
        });
    }
}

死信队列

DLX(Dead-Letter-Exchange), 可以称之为死信交换器, 也成死信信箱. 当消息在一个队列中变成死信(dead message) 后, 会被重新发送到另外一个交换器中, 这个交换器就是 DLX. 绑定了 DLX 的队列就是死信队列. 说白了就是, 有两个队列, 一个队列上的消息设置了过期时间, 但没有消费者. 另一个队列是普通队列, 有消费者. 后者被称为死信队列. 当前一个队列消息过期后, Rabbitmq 会自动将过期消息转发到死信队列里. 然后被死信队列的消费者消费掉. 实现消息的延迟发送功能

延迟队列

延迟队列是为了存放那些延迟执行的消息,待消息过期之后消费端从队列里拿出来执行

实现方法

通过在 channel.queueDeclare 方法中设置 x-dead-letter-exchange 参数为这个队列添加 DLX

Java 代码

代码语言:javascript
复制
@Configuration
public class RabbitConfig implements ApplicationContextAware {
    
    private ApplicationContext applicationContext;    
    
    @PostConstruct
    public RabbitAdmin rabbitAdmin() {
        RabbitAdmin rabbitAdmin = applicationContext.getBean("rabbitAdmin", RabbitAdmin.class);
        // 死信队列
        rabbitAdmin.declareExchange(new DirectExchange("exchange.dlx"));
        rabbitAdmin.declareQueue(new Queue("queue.dlx"));
        rabbitAdmin.declareBinding(
            BindingBuilder.bind(new Queue("queue.dlx")).to(new DirectExchange("exchange.dlx")).with("routingkey.dlx"));

        // 延迟队列
        Map<String, Object> argMap = new HashMap<>(3);
        argMap.put("x-message-ttl", 2000);// 2s 过期
        argMap.put("x-dead-letter-exchange", "exchange.dlx");
        argMap.put("x-dead-letter-routing-key", "routingkey.dlx");
        rabbitAdmin.declareQueue(new Queue("queue.normal", true, false, false, argMap));
        rabbitAdmin.declareExchange(new TopicExchange("exchange.normal"));
        rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("queue.normal", true, false, false, argMap))
            .to(new TopicExchange("exchange.normal")).with("queue.normal"));
    }
}

缺点

使用死信队列来实现消息的延迟发送. 如果是采用第一种方式, 即每个队列设置相同的过期时间, 可以很好的实现消息的延迟发送功能. 如果采用第二种方式, 给每个消息设置不同的过期时间, 由于队列先入先出的特性, 如果队列头的消息过期时间很长, 后面的消息过期时间很短, 会导致后面的消息过期后不能及时被消费掉

简单的做法时, 使用 rabbitmq 的延迟插件: Rabbitmq 通过延迟插件实现延迟队列

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-06-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 文章目录
  • 设置消息的过期时间(TTL)
    • 两种方法设置 TTL
      • Java 代码实现
        • 给队列设置 TTL
        • 给每一个消息单独设置 TTL
    • 死信队列
      • 延迟队列
        • 实现方法
          • Java 代码
          • 缺点
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档