前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Rocketmq消费者流程

Rocketmq消费者流程

作者头像
路行的亚洲
发布2023-12-25 11:04:45
940
发布2023-12-25 11:04:45
举报
文章被收录于专栏:后端技术学习后端技术学习

我们知道RocketMQ需要经过生产者生产消息,然后到broker存储消息,接着业务系统注册监听消费消息。

一、消费消息的接口的业务系统实现

代码语言:javascript
复制
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

二、RocketMQ中调用接口的四个地方

根据消费的代码,可以进一步追溯到消费的接口。从而进一步定位到消费消息的过程。

从中我们可以看到目前调用consumeMessage的接口有四个地方:

可以看到目前支持的有:并行消费、顺序消费、Pop模式的并行和顺序消费,以此与业务系统建立联系,从而实现消费消息。

可以看到有一个共性,都是ConsumeRequest实现Runnable接口里面的线程运行消费的。而processConsumeResult是处理后续消费的结果的。这里可以看到会根据消费的情况,考虑是否重试的情况。

因此我们需要找到这个请求ConsumeRequest。因此我们可以追寻代码可以看到ClientRemotingProcessor处理器会处理这个请求:

代码语言:javascript
复制
   case RequestCode.CONSUME_MESSAGE_DIRECTLY:
                return this.consumeMessageDirectly(ctx, request);

可以看到这个是在Netty启动后经过channelRead执行对应的处理器,然后执行到这里。同时可以看到这个方法也是暴露给了MQAdmin,因此可以在tools模块的可视化界面之间操作消费调用的就是这个接口。

三、提交请求的方法

那提交请求的方法在哪呢?

代码语言:javascript
复制
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
    pullResult.getMsgFoundList(),
    processQueue,
    pullRequest.getMessageQueue(),
    dispatchToConsume);

这里会提交消费请求,然后做消费。可以看到其基于pullMessage(final PullRequest pullRequest) 实现的。因此可以看到里面有一个拉取消息的接口核心拉取实现:

代码语言:javascript
复制
this.pullAPIWrapper.pullKernelImpl

此时会执行拉取操作:

代码语言:javascript
复制
this.mQClientFactory.getMQClientAPIImpl().pullMessage

此时执行动作的是PullMessageProcessor的处理请求,进行消息的拉取和更新位点等等。经过pullMessageResultHandler处理,最终将结果返回给业务系统。也即此时会执行拉取操作,目前看主要分为两类:

代码语言:javascript
复制
LITE_PULL_MESSAGE
PULL_MESSAGE

两类处理的方式,目前推荐的是LITE_PULL_MESSAGE。

四、RebalanceService操作的dispatchPullRequest

而进行拉取的操作的时机是在pullMessgeService中完成的,而拉取的过程中是通过RebalanceService操作的dispatchPullRequest而来的。从RebalanceImpl里面可以看到目前使用的地方:

因此此时重点来到重平衡的两个过程提交和更新处理队列

此时可以看到最终是在生产者和消费者启动的时候,会启动重平衡service和拉取消息service。

因此可以看到其在生产者和消费启动的时候就启动了。

而对应的拉取的时候服务器没有消息, 就 hold 一会儿。一旦有消息,就会通知。可以看到PullRequestHoldService会做这样的操作,唤醒PullMessageProcessor的处理请求处理拉取请求。

代码语言:javascript
复制
this.notifyMessageArriving(topic, queueId, offset)

同时感谢王小瑞老师的耐心解答。王小瑞老师的公司Automq欢迎大家试用。https://github.com/AutoMQ/automq-for-rocketmq

四、参考资料

参考:https://rocketmq.apache.org/ rocketmq官网

https://github.com/apache/rocketmq

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-12-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 后端技术学习 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、消费消息的接口的业务系统实现
  • 二、RocketMQ中调用接口的四个地方
  • 三、提交请求的方法
  • 四、RebalanceService操作的dispatchPullRequest
  • 四、参考资料
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档