前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Rabbitmq 通过延迟插件实现延迟队列

Rabbitmq 通过延迟插件实现延迟队列

作者头像
芥末鱿鱼
发布2022-05-05 14:56:03
1.1K0
发布2022-05-05 14:56:03
举报
文章被收录于专栏:玩转 Spring Cloud玩转 Spring Cloud

Rabbitmq 通过延迟插件实现延迟队列

文章目录

DLX+TTL 存在时序问题

由于队列先入先出的特性. 通过死信队列(DLX)和给每条消息设置过期时间(TTL)来实现延迟队列, 会存在时序问题. 即排在队列头的消息过期使时间如果设置的比较长, 会导致队列后面过期时间比较短的消息, 过期了迟迟不被消费掉. 可以通过给 Rabbitmq 安装延迟插件来实现延迟队列功能

安装延迟插件

下载地址

rabbitmq-delayed-message-exchange 插件可到这里下载: RabbitMQ 延迟插件

也可以到github上下载 : RabbitMQ Delayed Message Plugin

(注意插件版本, 这个插件适应的版本时 3.5.8 及其以后的版本)

安装

登录 Linux 服务器, 将插件复制到这个路径下: /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.15/plugins/

然后执行以下指令:

代码语言:javascript
复制
# 开启插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

# 重启 rabbitmq
/sbin/service rabbitmq-server restart

# 查看插件是否安装成功
sudo rabbitmq-plugins list

Java 代码实现

代码语言:javascript
复制
@Configuration
public class RabbitConfig implements ApplicationContextAware {
    
    private ApplicationContext applicationContext;    
    
    @PostConstruct
    public RabbitAdmin rabbitAdmin() {
        RabbitAdmin rabbitAdmin = applicationContext.getBean("rabbitAdmin", RabbitAdmin.class);
        TopicExchange exchange = new TopicExchange("exchange.delay");
        // 交换器设置延迟属性
        exchange.setDelayed(true);				                  
        rabbitAdmin.declareQueue(new Queue("queue.delay"));
        rabbitAdmin.declareExchange(exchange);
        rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("queue.delay")).to(exchange).with("rountingkey.delay");
        return rabbitAdmin;
    }
}

 // 消息发送器                                  
public class SendMQService {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    // 发送消息
    public void sendMessage(String exchange, String routingKey, String msg, Integer expirationTime) {
        rabbitTemplate.convertAndSend(exchange, routingKey, msg, message -> {
            // 给每条消息设置过期时间
            message.getMessageProperties().setExpiration(expirationTime);
            return message;
        });
    }
}

// 消息监听器, 交换器 delayed = "true"
@Component
@RabbitListener(containerFactory = "listenerContainerFactory",
    bindings = @QueueBinding(value = @Queue(value = "queue.delay"),
        exchange = @Exchange(value = "exchange.delay", type = ExchangeTypes.TOPIC, delayed = "true"),
        key = "rountingkey.delay"))
@Slf4j
public class MsgListener {

    @RabbitHandler
    public void msgHandler(String msg) {
        log.info("接收到的延迟消息 [{}]",msg)
    }
}                         
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-06-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 文章目录
  • DLX+TTL 存在时序问题
  • 安装延迟插件
    • 下载地址
      • 安装
      • Java 代码实现
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档