RocketMQ分布式消息中间件Pull模式下,我们的一般消费步骤如下:
1. 读取 topic的消息队列message queue的信息; 2. 按队列去拉取一定数目的消息; 3.(持久化message queue的消费进度consume offset)
首先有一些关键概念,我们需要理清楚:
consume offset 是基于topic 以及 消费组 consumer group的,意思是什么?
意思就是当采用集群消费模型(CLUSTERING),我们的consume offset 默认是存储在broker服务器上的config/consumerOffset.json文件。
{
"offsetTable":{
"TopicTest@pullConsumerGroupTest":{0:1578,1:1578,2:1578,3:1578
}
}
}
(TopicTest为某个topic,初始化为4个队列, pullConsumerGroupTest为某个消费组)
那么当同一个consumer group的多个consumer instance默认是共享一个消费进度。
那么问题来了,我们同一个consumer group的多个consumer instance在第一个步骤的时候,如何快速感知和分配合适的消息队列message queue,给每一个consumer instance消费呢??
---即 负载均衡问题如何解决?
废话不多说,贴个代码:
public Set<MessageQueue> fetchMessageQueuesForPullOperation(String topic)
throws MQClientException, InterruptedException {
DefaultMQPullConsumer pullConsumer; // please init
long fetchQueueTimeoutMillis = 5000l;
long fetchQueueNextDelayTimeMillis = 200l;
Set<MessageQueue> msgQueues = null;
switch (pullConsumer.getMessageModel()) {
case BROADCASTING:
msgQueues = pullConsumer.fetchSubscribeMessageQueues(topic);
break;
case CLUSTERING:
msgQueues = pullConsumer.fetchMessageQueuesInBalance(topic);
// 未获取到负载均衡的时候,等待fetchQueueNextDelayTimeMillis毫秒重新获取,直到超时
long timeout = 0L;
while (CollectionUtils.isEmpty(msgQueues) && timeout < fetchQueueTimeoutMillis) {
Thread.sleep(fetchQueueNextDelayTimeMillis);
timeout += fetchQueueNextDelayTimeMillis;
msgQueues = pullConsumer.fetchMessageQueuesInBalance(topic);
}
break;
default:
break;
}
return msgQueues;
}
当然还有一种内置的
MQPullConsumerScheduleService
也是可以实现。不同的是这个是回调模式。
by 斯武丶风晴 https://my.oschina.net/langxSpirit