专栏首页瓜农老梁RocketMQ消费--Broker端处理逻辑【源码笔记】

RocketMQ消费--Broker端处理逻辑【源码笔记】

目录
一、问题思考
二、Broker处理消费流程
1.Broker消费处理流程概览
2.查找消息流程
3.消息查询结果处理流程
三、消费进度流转
1.客户端上报消费进度
2.Broker端处理消费进度
3.消费进度流转示意图
一、问题思考

1.Broker是如何处理消费流程的? 2.消费进度是如何流转的? 说明:本文分析均为PUSH消费模式

二、Broker处理消费流程

本部分将消费的切分成三块梳理:Broker消费处理流程概览、查找消息流程、以及消息查询结果处理流程。

1.Broker消费处理流程概览

小结:在拉取消息时会进行Broker和主题读权限的判断,实战中若有必要可以封锁Broker的拉取权限从而禁止从该broker进行消费;或者封锁某主题的读权限禁止消费组从该主题消费消息。

2.查找消息流程

小结:如果需要从磁盘拉取消息则一次默认最多拉取8条,一次消息的消息大小最大为64K。如果从缓存中拉取默认最多32条,一次拉取的消息大小最大256K。使用tagcode会在查找消息前进行过滤,使用SQL92过滤再消息查找出来后进行过滤。

3.消息查询结果处理流程

小结:建议开启slaveReadEnable=true,当拉取的消息超过Broker内存40%时会从Slave节点消费,Master不必从磁盘重新读取数据;transferMsgByHeap默认为true即消息先拉取到堆空间再返回到客户端;如果设置为false则使用Netty#FileRegion,可用零字节拷贝不必再拷贝到堆内存提高性能。

三、消费进度流转
1.客户端上报消费进度
//@1 顺序消费/并发消费流程相同
//ConsumeMessageOrderlyService#processConsumeResult
//ConsumeMessageConcurrentlyService#processConsumeResult
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
//更新消费进度偏移量
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
@2 RemoteBrokerOffsetStore#updateOffset
AtomicLong offsetOld = this.offsetTable.get(mq);
MixAll.compareAndIncreaseOnly(offsetOld, offset);
@3 offsetTable存储结构:key为MessageQueue value为消费的偏移量进度
ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
new ConcurrentHashMap<MessageQueue, AtomicLong>()
@4 定时同步消费进度
//持久化消息消费进度,默认5秒保存一次
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
@5 RemoteBrokerOffsetStore#persistAll
for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet())
this.updateConsumeOffsetToBroker(mq, offset.get());

小结:PUSH消费中消费进度存储在offsetTable中,定时任务每5秒钟上报Broker一次。

2.Broker端处理消费进度
处理客户端定时上报消费进度
//@1 ConsumerManageProcessor#processRequest#updateConsumerOffset
this.brokerController.getConsumerOffsetManager().commitOffset
//@2 ConsumerOffsetManager#commitOffset
String key = topic + TOPIC_GROUP_SEPARATOR + group;
this.commitOffset(clientHost, key, queueId, offset);
Long storeOffset = map.put(queueId, offset);
//@3 消费进度缓存结构
//key=topic@group
//value=ConcurrentMap<Integer/* queueId*/, Long/*offset*/>>
offsetTable = new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);
//@4 5秒钟一次存储消费进度
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerOffsetManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumerOffset error.", e);
}
}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
//@5 consumerOffset.json文件格式
"zeus-package-mismatch-topic@autosort-packagelog":{0:9055300,1:9055157,2:9055304,3:9055232}

小结:Broker接到客户端消费进度上报后更新缓存offsetTable,每隔5秒中定时任务将offsetTable消费进度存储在磁盘文件consumerOffset.json中。

消息拉取后实时更新消费进度
//@1 PullMessageProcessor#processRequest
if (storeOffsetEnable) {
//更新消费进度
this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
}

小结:PUSH消费客户端拉取消息后会实时更新消费的进度。

3.消费进度流转示意图

本文分享自微信公众号 - 瓜农老梁(gh_01130ae30a83),作者:梁勇

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-08-09

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • RoecketMQ存储--映射文件预热【源码笔记】

    1.为什么创建文件(commitLog)时要预热? 2.为什么要写入1G大小的假值(0)呢? 3.为什么要锁定内存? 4.预热流程是怎么样的?

    瓜农老梁
  • RocketMQ NameServer【源码笔记】

    NamesrvStartup.java 启动入口类,NameServer 启动默认端口9876

    瓜农老梁
  • RocketMQ Topic创建【源码笔记】

    Topic的创建分为自动创建和通过命令行创建两种。通过broker配置参数autoCreateTopicEnable设置。 通常可以在非生产环境开启自动创建,生...

    瓜农老梁
  • 运维平台的建设思考(r6笔记第20天)

    自己最近也在琢磨如何搭建出一个完善有效的运维平台,当然这个工作不是一朝一夕就能完成,前行的道路上肯定会有各种各样的困难和牵绊,但是自己还是能够学以致用,把一些重...

    jeanron100
  • AAAI 2018 | 腾讯提出自适应图卷积神经网络,接受不同图结构和规模的数据

    机器之心
  • 腾讯提出自适应图卷积神经网络,接受不同图结构和规模的数据

    选自arXiv 作者:Ruoyu Li等 机器之心编译 参与:路雪 近日,AAAI 2018 发布接收论文列表,腾讯 AI Lab 共入选 11 篇。在论文《A...

    企鹅号小编
  • dotnet OpenXML SDK 文本占位符解析

    在使用 OpenXML SDK 解析 PPT 文档的文本占位符的时候,需要对 PPT 的格式有一定的了解,尽管整个 OpenXML SDK 包括文档等都很详细。...

    林德熙
  • 【动手学深度学习笔记】之读取和存储

    存储和读取Tensor可以分别使用save函数和load函数实现。save函数的操作对象包括模型、张量和字典等。

    树枝990
  • 修复cocos2d-jsv3.1文本换行bug

    本文作者:IMWeb vienwu 原文出处:IMWeb社区 未经同意,禁止转载 使用cocos2d-js版开发跨平台手游非常简单,并且在手机端也拥有...

    IMWeb前端团队
  • 手撸vuex和vue-router

    把这个store返回出去,那就写完了,核心原理可以说是异常简单。 就用官方文档的案例验证下这个duex有多靠谱:

    一粒小麦

扫码关注云+社区

领取腾讯云代金券