专栏首页KEN DO EVERTHING「 从0到1学习微服务SpringCloud 」11 补充篇 RabbitMq实现延迟消费和延迟重试

「 从0到1学习微服务SpringCloud 」11 补充篇 RabbitMq实现延迟消费和延迟重试

何为延迟队列?

延迟队列就是进入该队列的消息会被延迟消费的队列。而一般的队列,消息一旦入队了之后就会被消费者马上消费。

业务场景

延迟队列能做什么?最常见的是以下两种场景:

  • 消费

比如:用户生成订单之后,需要过一段时间校验订单的支付状态,如果订单仍未支付则需要及时地关闭订单;用户注册成功之后,需要过一段时间比如一周后校验用户的使用情况,如果发现用户活跃度较低,则发送邮件或者短信来提醒用户使用。

  • 重试

比如消费者从队列里消费消息时失败了,但是想要延迟一段时间后自动重试。

如果不使用延迟队列,那么我们只能通过一个轮询扫描程序去完成。这种方案既不优雅,也不方便做成统一的服务便于开发人员使用。但是使用延迟队列的话,我们就可以轻而易举地完成。

实现思路

在介绍具体思路钱,先介绍RabbitMQ的两个特性:Time-To-Live Extensions(消息存活时间) 和 Dead Letter Exchanges(死信交换机)

Time-To-Live Extensions

RabbitMQ允许我们为消息或者队列设置TTL(time to live),也就是过期时间。TTL表明了一条消息可在队列中存活的最大时间,单位为毫秒。当某条消息被设置了TTL或者当某条消息进入了设置了TTL的队列时,这条消息会在经过TTL秒后“死亡”,成为Dead Letter(死信)。如果既配置了消息的TTL,又配置了队列的TTL,那么较小的那个值会被取用。

Dead Letter Exchanges

设置了TTL的消息在过期后会成为Dead Letter。其实在RabbitMQ中,一共有三种消息的“死亡”形式:

  • 消息被拒绝。通过调用basic.reject或者basic.nack并且设置的requeue参数为false。
  • 消息因为设置了TTL而过期。
  • 消息进入了一条已经达到最大长度的队列。

如果队列设置了Dead Letter Exchange(DLX),那么这些Dead Letter就会被重新publish(推送)到Dead Letter Exchange,通过Dead Letter Exchange路由到其他队列。

实现流程

延迟消费

延迟消费是延迟队列最为常用的使用模式。如下图所示,生产者产生的消息首先会进入缓冲队列(图中红色队列)。通过RabbitMQ提供的TTL扩展,这些消息会被设置过期时间,等消息过期之后,这些消息会通过配置好的DLX转发到实际消费队列(图中蓝色队列),以此达到延迟消费的效果。

延迟重试

延迟重试本质上也是延迟消费的一种。

如下图所示,消费者发现该消息处理出现了异常,比如是因为网络波动引起的异常。那么如果不等待一段时间,直接就重试的话,很可能会导致在这期间内一直无法成功,造成一定的资源浪费。那么我们可以将其先放在缓冲队列中(图中红色队列),等消息经过一段的延迟时间后再次进入实际消费队列中(图中蓝色队列),此时由于已经过了“较长”的时间了,异常的一些波动通常已经恢复,这些消息可以被正常地消费。

代码实现

这里只贴上最主要的代码,全部的代码可查看github

1.延迟消费

Mq队列与交换机实例创建

 /**
     * 缓冲队列
     */
    private String DELAY_BUFFER_QUEUE = "delay_buffer_queue";

    /**
     * 实际消费交换机(DLX)
     */
    private String DELAY_SERVICE_EXCHANGE = "delay_service_exchange";

    /**
     * 实际消费队列
     */
    private String DELAY_SERVICE_QUEUE = "delay_service_queue";

    /**
     * 消息过期时间 3秒
     */
    private Integer QUEUE_EXPIRATION = 3 * 1000;

    /**
     * 实际消费队列
     * @return
     */
    @Bean
    Queue delayServiceQueue(){
        return QueueBuilder.durable(DELAY_SERVICE_QUEUE).build();
    }

    /**
     * 实际消费交换机
     * @return
     */
    @Bean
    DirectExchange delayServiceExchange() {
        return new DirectExchange(DELAY_SERVICE_EXCHANGE);
    }

    /**
     * 实际消费队列绑定实际消费交换机(DLX)
     * @param delayServiceQueue
     * @param delayServiceExchange
     * @return
     */
    @Bean
    Binding delayBinding(Queue delayServiceQueue, DirectExchange delayServiceExchange) {
        return BindingBuilder.bind(delayServiceQueue)
                .to(delayServiceExchange)
                .with(DELAY_SERVICE_QUEUE);
    }

    /**
     * 缓冲队列配置
     * @return
     */
    @Bean
    Queue delayBufferQueue(){
        return QueueBuilder.durable(DELAY_BUFFER_QUEUE)
                // 死信交换机 DLX
                .withArgument("x-dead-letter-exchange", DELAY_SERVICE_EXCHANGE)
                // 目标routing key
                .withArgument("x-dead-letter-routing-key", DELAY_SERVICE_QUEUE)
                // 设置队列的过期时间
                .withArgument("x-message-ttl", QUEUE_EXPIRATION)
                .build();
    }

监听实际消费队列

@Component
public class DelayMsgListener {
    @RabbitListener(queues="delay_service_queue")
    public void listenServiceMsg(Message message){
        System.out.println(new Date()+ "收到延迟消息啦:"+new String(message.getBody()));
    }
}

测试:发送消息到缓冲队列

@Test
    public void send1(){
        System.out.println(new Date() +"发送延迟消息!!!");
        amqpTemplate.convertAndSend("delay_buffer_queue","Hello!Delay Message!");
    }

结果如下 可以看到,在发消息后3秒(TTL),实际消费队列接收到了消息并被消费

2.延迟重试

Mq队列与交换机实例创建

   /**
     * 缓冲队列
     */
    private String RETRY_BUFFER_QUEUE = "retry_buffer_queue";
    /**
     * 缓冲交换机
     */
    private String RETRY_BUFFER_EXCHANGE = "retry_buffer_exchange";

    /**
     * 实际消费交换机(DLX)
     */
    private String RETRY_SERVICE_EXCHANGE = "retry_service_exchange";

    /**
     * 实际消费队列
     */
    private String RETRY_SERVICE_QUEUE = "retry_service_queue";


    /**
     * 实际消费队列
     * @return
     */
    @Bean
    Queue retryServiceQueue(){
        return QueueBuilder.durable(RETRY_SERVICE_QUEUE).build();
    }

    /**
     * 实际消费交换机
     * @return
     */
    @Bean
    DirectExchange retryServiceExchange() {
        return new DirectExchange(RETRY_SERVICE_EXCHANGE);
    }

    /**
     * 实际消费队列绑定实际消费交换机(DLX)
     * @param retryServiceQueue
     * @param retryServiceExchange
     * @return
     */
    @Bean
    Binding retryBinding(Queue retryServiceQueue, DirectExchange retryServiceExchange) {
        return BindingBuilder.bind(retryServiceQueue)
                .to(retryServiceExchange)
                .with(RETRY_SERVICE_QUEUE);
    }

    /**
     * 缓冲队列配置
     * @return
     */
    @Bean
    Queue retryBufferQueue(){
        return QueueBuilder.durable(RETRY_BUFFER_QUEUE)
                // 死信交换机 DLX
                .withArgument("x-dead-letter-exchange", RETRY_SERVICE_EXCHANGE)
                // 目标routing key
                .withArgument("x-dead-letter-routing-key", RETRY_SERVICE_QUEUE)
                // 设置队列的过期时间
                .withArgument("x-message-ttl", QUEUE_EXPIRATION)
                .build();
    }

    /**
     * 缓冲交换机
     * @return
     */
    @Bean
    DirectExchange retryBufferExchange() {
        return new DirectExchange(RETRY_BUFFER_EXCHANGE);
    }

    /**
     * 缓冲队列绑定缓冲交换机
     * @param retryBufferQueue
     * @param retryBufferQueue
     * @return
     */
    @Bean
    Binding bufferBinding(Queue retryBufferQueue, DirectExchange retryBufferExchange) {
        return BindingBuilder.bind(retryBufferQueue)
                .to(retryBufferExchange)
                .with(RETRY_BUFFER_QUEUE);
    }

监听实际消费队列

@Component
public class RetryMsgListener {
    /**
     * 缓冲队列
     */
    private String RETRY_BUFFER_QUEUE = "retry_buffer_queue";
    /**
     * 缓冲交换机
     */
    private String RETRY_BUFFER_EXCHANGE = "retry_buffer_exchange";

    @Autowired
    private MessagePropertiesConverter messagePropertiesConverter;

    @RabbitListener(queues="retry_service_queue")
    public void listenServiceMsg(@Payload Message message, Channel channel){
        try {
            System.out.println(new Date() + "收到消息:" + new String(message.getBody()));
            //TODO 业务逻辑
            //突然出现异常
            throw  new RuntimeException("特殊异常");
        }catch (Exception e){
            Map<String,Object> headers = message.getMessageProperties().getHeaders();
            try{
                Long retryCount = getRetryCount(headers);
                //重试3次
                if(retryCount < 3){
                    retryCount += 1;
                    System.out.println("消费异常,准备重试,第"+retryCount+"次");

                    //转换为RabbitMQ 的Message Properties对象
                    AMQP.BasicProperties rabbitMQProperties =
                            messagePropertiesConverter.fromMessageProperties( message.getMessageProperties(), "UTF-8");
                    //设置headers
                    rabbitMQProperties.builder().headers(headers);
                    //程序异常重试
                    //这里必须把rabbitMQProperties也传进来,否则死信队列无法识别是否是同一条信息,导致重试次数无法递增
                    channel.basicPublish(RETRY_BUFFER_EXCHANGE,RETRY_BUFFER_QUEUE,rabbitMQProperties, message.getBody());
                }else {
                    //TODO 重试失败,需要人工处理 (发送到失败队列或发邮件/信息)
                    System.out.println("已重试3次,需人工处理!");
                }
            }catch (IOException ioe){
                System.out.println("消息重试失败!");
                ioe.printStackTrace();
            }
        }
    }

    /**
     * 获取重试次数
     * 如果这条消息是死信,header中会有一个x-death的记录相关信息
     * 其中包含死亡次数
     * @param headers
     * @return
     */
    private long getRetryCount(Map<String, Object> headers) {
        long retryCount = 0;
        if(null != headers) {
            if(headers.containsKey("x-death")) {
                List<Map<String, Object>> deathList = (List<Map<String, Object>>) headers.get("x-death");
                if(!deathList.isEmpty()) {
                    Map<String, Object> deathEntry = deathList.get(0);
                    retryCount = (Long)deathEntry.get("count");
                }
            }
        }
        return retryCount;
    }
}

测试:发送消息到实际消费队列

@Test
    public void send2(){
        System.out.println(new Date() +"发送延迟重试消息!!!");
        //直接发消息到实际消费队列
        amqpTemplate.convertAndSend("retry_service_queue","Hello!Retry Message!");
    }

结果如下: 可以看到,消费异常后,重试了3次

延迟队列在实际业务中是经常被用到的,同学们最好都学学哦,代码已上传github https://github.com/zhangwenkang0/springcloud-learning-from-0-to-1/tree/master/rabbitmq-demo

本文分享自微信公众号 - java从心(javaFollowHeart),作者:a丶ken

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-05-08

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 「 从0到1学习微服务SpringCloud 」07 RabbitMq的基本使用

    在上篇文章中,我们已经用到了MQ,用于实现配置自动刷新。接下来,就具体说说MQ的应用场景以及RabbtMq的基本使用。

    KEN DO EVERTHING
  • 一组漫画完美总结互联网人生

    1991年,万维网(World Wide Web)向公众开放,标志着互联网的诞生。如今人类的生活被互联网极大地改变,以至于没有网络的生活几乎是难以想象的...

    KEN DO EVERTHING
  • 几个IDEA高级调试技巧,完全是bug杀手啊

    循环中经常用到这个技巧,比如:遍历1个大List的过程中,想让断点停在某个特定值。

    KEN DO EVERTHING
  • liteos队列

    队列又称消息队列,是一种常用于任务间通信的数据结构,实现了接收来自任务或中断的不固定长度的消息,并根据不同的接口选择传递消息是否存放在自己空间。任务能够从队列里...

    233333
  • 认识消息队列(一) 转

    业务无关,一个具有普适性质的消息队列组件不需要考虑上层的业务模型,只做好消息的分发就可以了,上层业务的不同模块反而需要依赖消息队列所定义的规范进行通信。

    wuweixiang
  • Rabbitmq延迟队列实现定时任务

    开发中经常需要用到定时任务,对于商城来说,定时任务尤其多,比如优惠券定时过期、订单定时关闭、微信支付2小时未支付关闭订单等等,都需要用到定时任务,但是定时任务本...

    搜云库技术团队
  • 前端中的数据结构——队列篇

    队列是数据结构中的一种,它与实际生活中的排队相似:在一条队伍中,先来的人总是能够先得到服务,后来的人只能排在队伍末尾等候。队列也是一样,它符合先进先出 FIFO...

    企鹅号小编
  • AI_第一部分 数据结构与算法(8.队列)

    第四阶段我们进行深度学习(AI),本部分(第一部分)主要是对底层的数据结构与算法部分进行详尽的讲解,通过本部分的学习主要达到以下两方面的效果:

    还是牛6504957
  • 记录使用mint-ui的感想

    今天记录一下使用mint-ui的心得,首先说明一下mint-ui是用来做手机端界面的ui库,这个ui库我是做项目的时候使用到了,所以今天简单的说明记录一下该库的...

    何处锦绣不灰堆
  • 【数据结构(C语言版)系列三】 队列

    队列是一种先进先出的线性表,它只允许在表的一端进行插入,而在另一端删除元素。这和我们日常生活中的排队是一致的,最早进入队列的元素最早离开。在队列中,允许插入的一...

    闪电gogogo

扫码关注云+社区

领取腾讯云代金券