前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RocketMQ消费处理hold过程学习

RocketMQ消费处理hold过程学习

作者头像
路行的亚洲
发布2023-12-25 11:05:15
960
发布2023-12-25 11:05:15
举报
文章被收录于专栏:后端技术学习

一、拉取消息处理的结果情况

通过前面学习,我们知道rocketmq消费消息的过程中,会有一个拉取的动作,而这个拉取的动作中又会涉及到对拉取消息的处理。而这里又分为好几种情况。

代码语言:javascript
复制
ResponseCode.SUCCESS 拉取成功,响应成功的情况
ResponseCode.PULL_NOT_FOUND  拉取消息没找到
ResponseCode.PULL_RETRY_IMMEDIATELY 立即拉取回复
ResponseCode.PULL_OFFSET_MOVED  拉取位点被移除

拉取成功,我们很好理解,此时就是拉取成功了,进行正常响应。

二、拉取消息hold的情况

什么情况下会出现hold,然后wakeup呢?

一种情况是拉取处理的时候,没有找到拉取的消息,此时会做hold,另一种情况是broker启动的时候,同步broker成员组的时候会做hold操作。

本质是将请求放入到ManyPullRequest和pullRequestTable,然后取出,进行处理。

那么消息处理的过程中,如果当前没有消息可拉的时候,会怎么处理呢?

代码语言:javascript
复制
   case ResponseCode.PULL_NOT_FOUND:
                final boolean hasSuspendFlag = PullSysFlag.hasSuspendFlag(requestHeader.getSysFlag());
                final long suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0;

                if (brokerAllowSuspend && hasSuspendFlag) {
                    long pollingTimeMills = suspendTimeoutMillisLong;
                    if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                        pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
                    }

                    String topic = requestHeader.getTopic();
                    long offset = requestHeader.getQueueOffset();
                    int queueId = requestHeader.getQueueId();
                    PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
                        this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
                    // 将拉取请求进行hold操作
                    this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
                    return null;
                }

可以看到这个过程中主题和队列id通过@拼接起来作为key,然后通过key拿到拉取请求,如果没有的话,说明pullRequestTable没有,此时会将拉取请求放入到pullRequestTable中,然后设置成suspended为true的标识。最终将拉取请求添加到ManyPullRequest中。

三、ManyPullRequest的hold检查处理

通过运行的标识,可以看到,如果开启了长轮询的标识的话,就会等待5000毫秒钟之后会执行后面的hold检查操作,否则等待执行短轮询操作,执行hold检查。

代码语言:javascript
复制
  public void run() {
        log.info("{} service started", this.getServiceName());
        while (!this.isStopped()) {
            try {
                if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                    this.waitForRunning(5 * 1000);
                } else {
                    this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
                }

                long beginLockTimestamp = this.systemClock.now();
                // 检查是否hold请求了
                this.checkHoldRequest();
                long costTime = this.systemClock.now() - beginLockTimestamp;
                if (costTime > 5 * 1000) {
                    log.warn("PullRequestHoldService: check hold pull request cost {}ms", costTime);
                }
            } catch (Throwable e) {
                log.warn(this.getServiceName() + " service has exception. ", e);
            }
        }

        log.info("{} service end", this.getServiceName());
    }

hold检查的过程实质就是wakeup拉取处理器执行拉取消息的过程。可以看到会根据

代码语言:javascript
复制
ManyPullRequest mpr = this.pullRequestTable.get(key)

拿到请求,从而wakeup:

代码语言:javascript
复制
 this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
                                    request.getRequestCommand());

从而实现拉取从而处理数据。

除此之外,我们可以wakeup出现的情况,还有一种情况是onMinBrokerChange,此时会通过broker上线的情况notifyMasterOnline,进行消息拉取。而本质是在broker启动的时候,会启动定时任务,同步broker成员组信息。

代码语言:javascript
复制
 BrokerController.this.syncBrokerMemberGroup();

可以看到1秒一次。

参考:https://rocketmq.apache.org/ rocketmq官网

https://github.com/apache/rocketmq

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-12-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 后端技术学习 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、拉取消息处理的结果情况
  • 二、拉取消息hold的情况
  • 三、ManyPullRequest的hold检查处理
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档