前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >面试系列之-rocketmq重试队列和死信队列

面试系列之-rocketmq重试队列和死信队列

作者头像
用户4283147
发布2022-12-29 20:04:58
8800
发布2022-12-29 20:04:58
举报
文章被收录于专栏:对线JAVA面试对线JAVA面试

消费者异常

消费者中我们注册了一个监听器回调函数,当Consumer获取消息后,就会交给我们的回调函数来进行处理。如果处理完了,就返回一个ConsumeConcurrentlyStatus.CONSUME_SUCCESS,提交这批消息的offset到broker去,然后继续从broker获取下一批消息来进行处理。 如果上面代码回调函数中,对一批消息处理的时候,数据库宕机了就不能再能返回CONSUME_SUCCESS,如果你返回的话,下一次就会处理下一批消息,但是这批消息其实没处理成功,此时必然导致这批消息就丢失了;

返回RECONSUME_LATER
代码语言:javascript
复制
consumer.registerMessageListener(
    new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(
            List<MessageExt> msgs,
            ConsumeConcurrentlyContext context) {
                try {
                    // 后续消息处理逻辑
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } catch (Exception e) {
                    // 比如因为数据库宕机了,没法对消息完成处理
                    // 此时可以返回一个稍后重试的消费状态
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
        }
    }
);

如果消费端消费失败的话,过段时间再次给我这批消息重试;

重试队列

只有当消费模式为集群模式时,Broker 才会自动进行重试,对于广播消息是不会重试的;

RocketMQ会有一个针对你这个ConsumerGroup的重试队列,如果你返回了RECONSUME_LATER状态,他就会把你这批消息放到你这个消费组的重试队列中去,比如消费组是"WMSConsumerGroup",那么就会有一个“%RETRY%WMSConsumerGroup”,这个名字的重试队列;

然后过一段时间,重试队列中的消息会再次进行处理,如果再次失败,又返回了RECONSUME_LATER,那么会再过一段时间让我们再次进行处理,默认最多重试16次;每次重试之间的间隔时间是不一样的,这个间隔时间可以如下进行配置:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

意思就是,第一次重试是1秒后,第二次重试是5秒后,第三次重试是10秒后,第四次重试是30秒后,第五次重试是1分钟后,以此类推,最多重试16次;

  1. 所有的延迟消息到达broker后,会存放到SCHEDULE_TOPIC_XXX的topic下(这个topic比较特殊,对客户端是不可见的,包括使用rocketmq-console,也查不到这个topic);
  2. SCHEDULE_TOPIC_XXX这个topic下存在18个队列,每个队列中存放的消息都是同一个延迟级别消息;
  3. broker端启动了一个timer和timerTask的任务,定时从此topic下拉取数据,如果延迟时间到了,就会把此消息发送到指定的topic下,完成延迟消息的发送;

刚才我们说如果你返回了RECONSUME_LATER,消息就会进入重试队列,其实不完全准确;

当MQ接收到RECONSUME_LATER后,首先会完成消息的转换,把消息存到延时队列中,然后再根据消息的延时时间保存到重试队列中;

如果重试了16次之后依然无法处理,就会把这些消费放入死信队列。死信队列中的消息RocketMQ不会再做处理,这部分数据要怎么处理就要看我们的业务场景了,我们可以做一个后台线程去订阅这个死信队列,完成后续消息的处理;

死信队列

如果在16次重试范围内消息处理成功了,自然就没问题了;但是如果对一批消息重试了16次还是无法成功处理,就需要另外一个队列了,叫做死信队列,死信队列的名字是“%DLQ%WMSConsumerGroup”;

对死信队列中的消息处理,这个就看具体需求,比如可以专门开一个后台线程,订阅“%DLQ%WMSConsumerGroup”这个死信队列,对死信队列中的消息进行不停的重试;

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-11-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 对线JAVA面试 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 消费者异常
    • 返回RECONSUME_LATER
    • 重试队列
    • 死信队列
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档