前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RocketMQ消费--Broker端处理逻辑【源码笔记】

RocketMQ消费--Broker端处理逻辑【源码笔记】

作者头像
瓜农老梁
发布2019-08-13 17:05:51
8720
发布2019-08-13 17:05:51
举报
文章被收录于专栏:瓜农老梁瓜农老梁瓜农老梁
目录
一、问题思考
二、Broker处理消费流程
1.Broker消费处理流程概览
2.查找消息流程
3.消息查询结果处理流程
三、消费进度流转
1.客户端上报消费进度
2.Broker端处理消费进度
3.消费进度流转示意图
一、问题思考

1.Broker是如何处理消费流程的? 2.消费进度是如何流转的? 说明:本文分析均为PUSH消费模式

二、Broker处理消费流程

本部分将消费的切分成三块梳理:Broker消费处理流程概览、查找消息流程、以及消息查询结果处理流程。

1.Broker消费处理流程概览

小结:在拉取消息时会进行Broker和主题读权限的判断,实战中若有必要可以封锁Broker的拉取权限从而禁止从该broker进行消费;或者封锁某主题的读权限禁止消费组从该主题消费消息。

2.查找消息流程

小结:如果需要从磁盘拉取消息则一次默认最多拉取8条,一次消息的消息大小最大为64K。如果从缓存中拉取默认最多32条,一次拉取的消息大小最大256K。使用tagcode会在查找消息前进行过滤,使用SQL92过滤再消息查找出来后进行过滤。

3.消息查询结果处理流程

小结:建议开启slaveReadEnable=true,当拉取的消息超过Broker内存40%时会从Slave节点消费,Master不必从磁盘重新读取数据;transferMsgByHeap默认为true即消息先拉取到堆空间再返回到客户端;如果设置为false则使用Netty#FileRegion,可用零字节拷贝不必再拷贝到堆内存提高性能。

三、消费进度流转
1.客户端上报消费进度
//@1 顺序消费/并发消费流程相同
//ConsumeMessageOrderlyService#processConsumeResult
//ConsumeMessageConcurrentlyService#processConsumeResult
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
//更新消费进度偏移量
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
@2 RemoteBrokerOffsetStore#updateOffset
AtomicLong offsetOld = this.offsetTable.get(mq);
MixAll.compareAndIncreaseOnly(offsetOld, offset);
@3 offsetTable存储结构:key为MessageQueue value为消费的偏移量进度
ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
new ConcurrentHashMap<MessageQueue, AtomicLong>()
@4 定时同步消费进度
//持久化消息消费进度,默认5秒保存一次
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
@5 RemoteBrokerOffsetStore#persistAll
for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet())
this.updateConsumeOffsetToBroker(mq, offset.get());

小结:PUSH消费中消费进度存储在offsetTable中,定时任务每5秒钟上报Broker一次。

2.Broker端处理消费进度
处理客户端定时上报消费进度
//@1 ConsumerManageProcessor#processRequest#updateConsumerOffset
this.brokerController.getConsumerOffsetManager().commitOffset
//@2 ConsumerOffsetManager#commitOffset
String key = topic + TOPIC_GROUP_SEPARATOR + group;
this.commitOffset(clientHost, key, queueId, offset);
Long storeOffset = map.put(queueId, offset);
//@3 消费进度缓存结构
//key=topic@group
//value=ConcurrentMap<Integer/* queueId*/, Long/*offset*/>>
offsetTable = new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);
//@4 5秒钟一次存储消费进度
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerOffsetManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumerOffset error.", e);
}
}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
//@5 consumerOffset.json文件格式
"zeus-package-mismatch-topic@autosort-packagelog":{0:9055300,1:9055157,2:9055304,3:9055232}

小结:Broker接到客户端消费进度上报后更新缓存offsetTable,每隔5秒中定时任务将offsetTable消费进度存储在磁盘文件consumerOffset.json中。

消息拉取后实时更新消费进度
//@1 PullMessageProcessor#processRequest
if (storeOffsetEnable) {
//更新消费进度
this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
}

小结:PUSH消费客户端拉取消息后会实时更新消费的进度。

3.消费进度流转示意图
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-08-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 瓜农老梁 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 目录
  • 一、问题思考
  • 二、Broker处理消费流程
  • 1.Broker消费处理流程概览
  • 2.查找消息流程
  • 3.消息查询结果处理流程
  • 三、消费进度流转
  • 1.客户端上报消费进度
  • 2.Broker端处理消费进度
  • 处理客户端定时上报消费进度
  • 消息拉取后实时更新消费进度
  • 3.消费进度流转示意图
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档