最近发现项目内部和外部沟通频繁使用MQ,并通过tag进行消息过滤和隔离,因此想搞清楚tag在源码中使用的地方,毕竟消息中间件这块还是有很多该学习的地方。
RocketMQ-4.9.1
首先要RocketMQ的文件存储设计,本文主要关注CommitLog文件和ConsumeQueue文件,如下图所示(图片引自该处)。当消息生产者生产消息时,所有topic的消息都会顺序的保存在CommitLog文件里,如果只从CommitLog一个文件看,是没有办法快速定位到某个topic的消息,那么此时就需要ConsumeQueue登场了。
ConsumeQueue在不同的文件夹下,根据不同的文件夹可以区分不同的队列,而ConsumeQueue文件存储的是消息的索引信息。
如上图所示消息生产者每生产一条消息就对应这下图的一条索引记录。其中消息的真实内容存储在commitLog中。
整个流程为:
其中
topic = TopicTest
tag = TagA
一般producer生产消息时候会使用如下代码,其中消息要包含topic、tag和msg消息体。
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
Message msg =
new Message(
"TopicTest", // topic
"TagA", // tag
"OrderID188", // key
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); // msg body
SendResult sendResult = producer.send(msg);
其中上面的tag是存在哪呢?跟Message的构造方法可以看到tag其实是放在msg的properties里,MessageConst.PROPERTY_TAGS = TAGS
public void setTags(String tags) {
this.putProperty(MessageConst.PROPERTY_TAGS, tags);
}
跟上面的send方法中间会跟到
MQClientAPIImpl#sendMessage方法,方法中的一行代码如下图所示,创建Request,因为本次发送为单条消息,所以代码中的三元表达式中选择RequestCode.SEND_MESSAGE_V2(310)。
request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
在往下跟其实就是通过Netty给borker发送消息了(非本次内容关注重点,忽略)。
总结:
tag放在msg的properties里
发送请求的code = RequestCode.SEND_MESSAGE_V2(310)
本文关注的有两个文件,一个是存储消息的CommitLog文件和存储topic索引的ConsumeQueue文件。
CommitLog是对外暴露的是一个逻辑日志(而真正对应的物理日志是多个MappedFile文件组成的)。该逻辑日志有一个最大偏移量maxOffset(DefaultMessageStore.this.commitLog.getMaxOffset())。当有新消息发到broker时消息会写到CommitLog里并且maxOffset就会增加。
而ConsumeQueue的构成是由另一个类ReputMessageService异步线程进行处理,异步构建Consumequeue。
ReputMessageService是Runnable实现类,run方法会每隔1秒执行doReput方法,如下面代码所示。
public void run() {
DefaultMessageStore.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
Thread.sleep(1);
this.doReput();
} catch (Exception e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
DefaultMessageStore.log.info(this.getServiceName() + " service end");
}
ReputMessageService里有一个属性是reputFromOffset,该属性表示同步CommLog到Consumequeue的进度。
如果
this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset()
则说明有新的消息要从CommLog构建索引到Consumequeue。
而Consumequeue中的三个属性(commitlog offset、size、tag hashcode)是怎么来的?
本身我们是有一个CommitLog的偏移量(reputFromOffset),从这个偏移量开始往后解析我们是可以解析出整条消息的,消息格式如下图所示。
解析出整条消息后可以获取到
那么就可以构建一条Consumequeue索引了。
总结:
broker收到消息后同步放在CommitLog中(本文没讲)
ReputMessageService通过异步不断扫描reputFromOffset和commitLog.getMaxOffset关系从而获取需要构建的通知。
解析消息获取Consumequeue参数并构建。
1、获取订阅的topic和Queue信息
2、通过Reblace获取被分配的Queue,开始拉取消息
消费者启动会调用
MQClientInstance#start()方法,start()方法里有会调用
MQClientInstance#startScheduledTask()方法,里面的一段代码如下,会每隔一段时间更新一下topic路由信息
//MQClientInstance###startScheduledTask()
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
会把路由信息保存到本地的一个HashMap里,这样消费者就拿到了topic的信息并且会把broker的信息保存下来
//MQClientInstance###updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,DefaultMQProducer defaultMQProducer)
//根据主题从nameserver获取topic信息
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
//MQClientInstance###updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,DefaultMQProducer defaultMQProducer)
//把主题和主题队列相关的broker保存下来
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}
总结:
消费者拿到主题的队列列表和broker信息
consumer怎么开始拉取消息?这里其实是一个reblance的过程
MQClientInstance的start的方法里会开启一个rebalance的线程,如下面代码所示
//MQClientInstance###start()
public void start() throws MQClientException {
//省略
// Start rebalance service
this.rebalanceService.start();
//省略
}
跟RebalanceService的run()方法一直跟下去最后跟到RebalanceImpl的rebalanceByTopic方法。如下面代码所示。根据主题队列列表和消费者组集合去做一个Rebalance,最后的返回结果是当前消费者需要消费的主题队列。
//RebalanceImpl##rebalanceByTopic
private void rebalanceByTopic(final String topic, final boolean isOrder) {
//获取订阅的主题的队列
//获取订阅的主题的队列
//获取订阅的主题的队列
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
//获取同消费者组的ClientID集合
//获取同消费者组的ClientID集合
//获取同消费者组的ClientID集合
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
//排序
//排序
//排序
Collections.sort(mqAll);
Collections.sort(cidAll);
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
//rebalance算法核心实现,最后的结果是返回应该消费的队列
//rebalance算法核心实现,最后的结果是返回应该消费的队列
//rebalance算法核心实现,最后的结果是返回应该消费的队列
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
//rebalance算法核心实现,最后的结果是返回应该消费的队列
//rebalance算法核心实现,最后的结果是返回应该消费的队列
//rebalance算法核心实现,最后的结果是返回应该消费的队列
allocateResultSet.addAll(allocateResult);
}
//此处看下面的消费者怎么去拉消息
//此处看下面的消费者怎么去拉消息
//此处看下面的消费者怎么去拉消息
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
}
}
上面代码中allocateResultSet就是该consumerGroup被分配的Queue。后面会把每一个Queue包装成一个Task去对应的Broker中拉取消息。
总结:
如下图所示,RebalanceService线程会根据情况把请求放在PullMessageService的pullRequestQueue阻塞队列队列里,队列的每一个节点就是拉请求;PullMessageService线程就是不断去pullRequestQueue里拿任务然后去看一下broker中有没有数据,如果有数据就消费。
首先Consumer给broker发送消息,请求code是 RequestCode.PULL_MESSAGE ,因此我们可以跟borker里对这个请求码的处理的processor,最后定位到
PullMessageProcessor#processRequest方法,方法里有如下的代码
final GetMessageResult getMessageResult =
this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
跟DefaultMessageStore#getMessage方法
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
final int maxMsgNums,
final MessageFilter messageFilter) {
//省略
for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
//获取消息的偏移量
long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
//获取消息的大小
int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
//获取消息的tag的hashcode
long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();
maxPhyOffsetPulling = offsetPy;
if (nextPhyFileStartOffset != Long.MIN_VALUE) {
if (offsetPy < nextPhyFileStartOffset)
continue;
}
//省略
//省略
//省略
//查看消息tag是否匹配,此时在broker实现过滤
//查看消息tag是否匹配,此时在broker实现过滤
//查看消息tag是否匹配,此时在broker实现过滤
if (messageFilter != null
&& !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}
continue;
}
//省略
//省略
//省略
return getResult;
}
跟进匹配方法,此时能发现过滤方法是看subscriptionData里是否有包含tagsCode
//ExpressionMessageFilter#isMatchedByConsumeQueue
@Override
public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
//省略
//省略
//省略
//订阅主题里是否包含这个hashcode
return subscriptionData.getCodeSet().contains(tagsCode.intValue());
} else {
//省略
}
总结:broker是根据subscriptionData里的tag的hashcode列表去过滤消息,判断从ConsumeQueue中读取的tag的hashcode是否在subscriptionData里的tag的hashcode列表中。
Consumer端在DefaultMQPushConsumerImpl#pullMessage方法里有一个PullCallback,此方法是一个给broker发送拉取消息后的一个回调方法
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
//省略
}
跟一下PullAPIWrapper#processPullResult方法
public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
final SubscriptionData subscriptionData) {
//省略
for (MessageExt msg : msgList) {
if (msg.getTags() != null) {
//Consumer端过滤消息
//Consumer端过滤消息
//Consumer端过滤消息
if (subscriptionData.getTagsSet().contains(msg.getTags())) {
msgListFilterAgain.add(msg);
}
}
}
}
//省略
return pullResult;
}
总结:broker端的消息过滤是通过看subscriptionData里的tag列表是否含有当前消息的tag
RocketMQ消费者启动流程
https://cloud.tencent.com/developer/article/2374328
深入剖析 RocketMQ 源码 - 消息存储模块
https://blog.csdn.net/vivo_tech/article/details/121221880
7张图揭晓RocketMQ存储设计的奥妙