前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊rocketmq的maxReconsumeTimes

聊聊rocketmq的maxReconsumeTimes

作者头像
code4it
发布2019-11-23 16:18:50
8630
发布2019-11-23 16:18:50
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下rocketmq的maxReconsumeTimes

maxReconsumeTimes

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java

代码语言:javascript
复制
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {

    private final InternalLogger log = ClientLogger.getLog();

    //......

    /**
     * Max re-consume times. -1 means 16 times.
     * </p>
     *
     * If messages are re-consumed more than {@link #maxReconsumeTimes} before success, it's be directed to a deletion
     * queue waiting.
     */
    private int maxReconsumeTimes = -1;

    //......

    public int getMaxReconsumeTimes() {
        return maxReconsumeTimes;
    }

    public void setMaxReconsumeTimes(final int maxReconsumeTimes) {
        this.maxReconsumeTimes = maxReconsumeTimes;
    }

    //......
}
  • DefaultMQPushConsumer定义了maxReconsumeTimes属性,默认为-1

sendMessageBack

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java

代码语言:javascript
复制
public class DefaultMQPushConsumerImpl implements MQConsumerInner {

//......

    public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        try {
            String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
                : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
            this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
                this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
        } catch (Exception e) {
            log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);

            Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());

            String originMsgId = MessageAccessor.getOriginMessageId(msg);
            MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);

            newMsg.setFlag(msg.getFlag());
            MessageAccessor.setProperties(newMsg, msg.getProperties());
            MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
            MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
            MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
            newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());

            this.mQClientFactory.getDefaultMQProducer().send(newMsg);
        } finally {
            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
        }
    }

    private int getMaxReconsumeTimes() {
        // default reconsume times: 16
        if (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1) {
            return 16;
        } else {
            return this.defaultMQPushConsumer.getMaxReconsumeTimes();
        }
    }

    //......
}
  • DefaultMQPushConsumerImpl的sendMessageBack方法会对mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack进行异常捕获,出现异常时会使用MessageAccessor.setReconsumeTime更新newMsg的reconsumeTime,以及调用getMaxReconsumeTimes方法设置newMsg的maxReconsumeTimes,最后使用mQClientFactory.getDefaultMQProducer().send(newMsg)发送消息

handleRetryAndDLQ

rocketmq/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java

代码语言:javascript
复制
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {

//......

    private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response,
                                      RemotingCommand request,
                                      MessageExt msg, TopicConfig topicConfig) {
        String newTopic = requestHeader.getTopic();
        if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
            String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
            SubscriptionGroupConfig subscriptionGroupConfig =
                this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
            if (null == subscriptionGroupConfig) {
                response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
                response.setRemark(
                    "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
                return false;
            }

            int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
            if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
                maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
            }
            int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes();
            if (reconsumeTimes >= maxReconsumeTimes) {
                newTopic = MixAll.getDLQTopic(groupName);
                int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
                topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
                    DLQ_NUMS_PER_GROUP,
                    PermName.PERM_WRITE, 0
                );
                msg.setTopic(newTopic);
                msg.setQueueId(queueIdInt);
                if (null == topicConfig) {
                    response.setCode(ResponseCode.SYSTEM_ERROR);
                    response.setRemark("topic[" + newTopic + "] not exist");
                    return false;
                }
            }
        }
        int sysFlag = requestHeader.getSysFlag();
        if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
            sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
        }
        msg.setSysFlag(sysFlag);
        return true;
    }

    //......
}
  • SendMessageProcessor的handleRetryAndDLQ方法会判断如果topic是RETRY_GROUP_TOPIC_PREFIX(%RETRY%)开头的,会先从subscriptionGroupConfig.getRetryMaxTimes()获取maxReconsumeTimes,对于mq版本大于等于MQVersion.Version.V3_4_9.ordinal()的则会从request的header中读取maxReconsumeTimes;之后从request的header读取reconsumeTimes,如果该值大于等于maxReconsumeTimes则更新newTopic为MixAll.getDLQTopic(groupName)

小结

DefaultMQPushConsumer定义了maxReconsumeTimes属性,默认为-1;DefaultMQPushConsumerImpl的sendMessageBack方法会对mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack进行异常捕获,出现异常时会使用MessageAccessor.setReconsumeTime更新newMsg的reconsumeTime,以及调用getMaxReconsumeTimes方法设置newMsg的maxReconsumeTimes,最后使用mQClientFactory.getDefaultMQProducer().send(newMsg)发送消息;broker端会判断reconsumeTimes如果大于等于maxReconsumeTimes则会将其topic改为MixAll.getDLQTopic(groupName)

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-11-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • maxReconsumeTimes
  • sendMessageBack
  • handleRetryAndDLQ
  • 小结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档