我们知道RocketMQ需要经过生产者生产消息,然后到broker存储消息,接着业务系统注册监听消费消息。
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
根据消费的代码,可以进一步追溯到消费的接口。从而进一步定位到消费消息的过程。
从中我们可以看到目前调用consumeMessage的接口有四个地方:
可以看到目前支持的有:并行消费、顺序消费、Pop模式的并行和顺序消费,以此与业务系统建立联系,从而实现消费消息。
可以看到有一个共性,都是ConsumeRequest实现Runnable接口里面的线程运行消费的。而processConsumeResult是处理后续消费的结果的。这里可以看到会根据消费的情况,考虑是否重试的情况。
因此我们需要找到这个请求ConsumeRequest。因此我们可以追寻代码可以看到ClientRemotingProcessor处理器会处理这个请求:
case RequestCode.CONSUME_MESSAGE_DIRECTLY:
return this.consumeMessageDirectly(ctx, request);
可以看到这个是在Netty启动后经过channelRead执行对应的处理器,然后执行到这里。同时可以看到这个方法也是暴露给了MQAdmin,因此可以在tools模块的可视化界面之间操作消费调用的就是这个接口。
那提交请求的方法在哪呢?
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
这里会提交消费请求,然后做消费。可以看到其基于pullMessage(final PullRequest pullRequest) 实现的。因此可以看到里面有一个拉取消息的接口核心拉取实现:
this.pullAPIWrapper.pullKernelImpl
此时会执行拉取操作:
this.mQClientFactory.getMQClientAPIImpl().pullMessage
此时执行动作的是PullMessageProcessor的处理请求,进行消息的拉取和更新位点等等。经过pullMessageResultHandler处理,最终将结果返回给业务系统。也即此时会执行拉取操作,目前看主要分为两类:
LITE_PULL_MESSAGE
PULL_MESSAGE
两类处理的方式,目前推荐的是LITE_PULL_MESSAGE。
而进行拉取的操作的时机是在pullMessgeService中完成的,而拉取的过程中是通过RebalanceService操作的dispatchPullRequest而来的。从RebalanceImpl里面可以看到目前使用的地方:
因此此时重点来到重平衡的两个过程提交和更新处理队列
此时可以看到最终是在生产者和消费者启动的时候,会启动重平衡service和拉取消息service。
因此可以看到其在生产者和消费启动的时候就启动了。
而对应的拉取的时候服务器没有消息, 就 hold 一会儿。一旦有消息,就会通知。可以看到PullRequestHoldService会做这样的操作,唤醒PullMessageProcessor的处理请求处理拉取请求。
this.notifyMessageArriving(topic, queueId, offset)
同时感谢王小瑞老师的耐心解答。王小瑞老师的公司Automq欢迎大家试用。https://github.com/AutoMQ/automq-for-rocketmq
参考:https://rocketmq.apache.org/ rocketmq官网
https://github.com/apache/rocketmq