
消息队列的消费模式可以大致分为两种,一种是推 Push,一种是拉 Pull 。
RocketMQ 的推 Push 模式实现类是:DefaultMQPushConsumerImpl ,本质上来讲,RocketMQ 的推模式是拉模式的封装,可以实现负载均衡,并提供非常简单易用的 API 供开发者使用,降低了开发者的使用心智。
这篇文章,我们聊聊 RocketMQ 的拉模式 DefaultLitePullConsumerImpl , 希望大家读完之后,可以理解实现原理以及经典实践。
我们使用 DefaultLitePullConsumer 创建一个简单的拉模式消费者示例。
@Test
public void testAutoCommit() throws Exception {
boolean running = true;
// 定义消费者组 mygroup
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("mygroup");
// 设置名字服务地址
litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
// 从最新的进度偏移量开始消费
litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
// 订阅主题 TopicTest
litePullConsumer.subscribe("TopicTest", "*");
// 自动提交消费偏移量的选项设置为 true
litePullConsumer.setAutoCommit(true);
// 启动 pull 消费者
litePullConsumer.start();
try {
while (running) {
// 调用消费者的 poll 方法
List<MessageExt> messageExts = litePullConsumer.poll();
System.out.printf("%s%n", messageExts);
}
} finally {
// 关闭消费者
litePullConsumer.shutdown();
}
}
代码流程:
while 循环中持续地拉取 poll 消息并输出消息内容。Push 消费者使用起来的非常简洁,消费监听器内部定义消费逻辑即可,而 DefaultLitePullConsumer 需要手动调用拉取方法,获取消息内容,并填充消费逻辑。
第一节的示例是自动提交消息消费进度的,如果采取手动提交,需要应用程序手动调用 DefaultLitePullConsumer 消费者的 commitSync() 方法 。
@Test
public void testNoAutoCommit() throws MQClientException {
boolean running = true;
// 定义消费者组 mygroup
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("mygroup");
// 设置名字服务地址
litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
// 从最新的进度偏移量
litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
// 订阅主题 TopicTest
litePullConsumer.subscribe("TopicTest", "*");
// 自动提交消费偏移量的选项设置为 false
litePullConsumer.setAutoCommit(false);
litePullConsumer.start();
try {
while (running) {
boolean hasException = false;
List<MessageExt> messageExts = litePullConsumer.poll(1000L);
if (CollectionUtils.isNotEmpty(messageExts)) {
System.out.println("消息大小:" + messageExts.size());
// 取出第一条消息数据
MessageExt first = messageExts.get(0);
MessageQueue messageQueue = new MessageQueue(first.getTopic(), first.getBrokerName(), first.getQueueId());
for (MessageExt messageExt : messageExts) {
// 打印消息内容
System.out.println(new String(first.getBody()));
// 业务处理代码
// doBusiness();
}
litePullConsumer.commitSync();
if(hasException) {
// 回滚到第一条消息的消费点位
litePullConsumer.seek(messageQueue, first.getQueueOffset());
}
}
}
} catch (InterruptedException e) {
thrownew RuntimeException(e);
} finally {
// 最终关闭消费者
litePullConsumer.shutdown();
}
}
这段代码的逻辑与前一个类似,但有几个关键的不同点:
litePullConsumer.setAutoCommit(false); 将自动提交消费偏移量的选项设置为 false,即关闭了自动提交;while 循环中,如果收到了消息,则会打印出第一条消息的内容,然后执行消费逻辑;litePullConsumer.commitSync() 方法手工提交消费进度 ;litePullConsumer.seek() 方法将消费者的消费点位回滚到第一条消息的位置。同时,笔者启动两个测试用例,发现 Pull 模式两个细节 :
RocketMQ 推模式消费流程简化如下图:

接下来,笔者会按照消费流程展示推模式和推模型的异同。
1、负载均衡
DefaultLitePullConsumer 消费者启动 ,同样会进行负载均衡,负载均衡流程和推模式基本相同。

如图,同一个消费组有两个消费者,主题下的四个队列会均匀分布给这两个消费者。 负载均衡之后,接下来会触发拉取消息服务,该服务会将消息拉取到本地缓存起来。
2、拉取消息
拉模式和推模式的差别很大 ,我们知道推模式下 :消费者启动的时候,会创建一个拉取消息服务 PullMessageService ,它是一个单线程的服务。

而拉模式下,会启动一个定时任务(默认 20个拉取线程),如图假如消费者分配了 4 个队列,那么会启动 4 个拉取任务,拉取任务发送请求到队列对应的 Broker 去拉取消息数据,拉取完成之后,将拉取结果存储在本地阻塞队列 consumeRequestCache 中。
3、消费消息
在拉模式下,消费消息是我们手工控制的逻辑。

通过 poll 方法获取消息数据列表,然后遍历消息列表,执行相关消费逻辑。
我们从源码角度分析 poll 方法 到底做了哪些事情?

这段代码的本质很简单:就是从 LinkedBlockingQueue 中弹出元素即可 ,弹出之后,需要做两件事情:
4、保存进度/消费重试
拉模式可以设置自动提交进度开关 :
litePullConsumer.setAutoCommit(true); // true :自动提交 false:手工提交
自动提交的本质是每次调用 poll 方法时,会执行如下的方法:

我们可以简单的理解为每隔 5 秒执行一次提交分配队列的消费偏移量。

假如设置为手工提交,我们执行同步提交方法 commitSync 方法,本质是调用 commitAll 方法 。
5、消费重试
拉模式下,消费失败时,我们可以调用 seek 方法修改队列的消费进度偏移量。
MessageExt first = messageExts.get(0);
MessageQueue messageQueue = new MessageQueue(first.getTopic(), first.getBrokerName(), first.getQueueId());
// 回滚到第一条消息的点位
litePullConsumer.seek(messageQueue, first.getQueueOffset());
下面我们简要分析下 seek 方法:

核心流程:
在笔者看来,拉模式是非常值得学习和应用的技巧,体现在如下两点:
1、灵活控制消费动作
拉模式非常灵活,我们可以在消费前执行判断逻辑,也可以在消费失败后,回滚到指定偏移量。
2、拉取效率高,适合大数据场景
拉模式消费者默认会创建启动 20 个拉取线程,每一个线程对应一个队列拉取任务 ,在消息拉取效率方面比 PUSH 模型具有无可比拟的优势,特别适合大数据领域的批处理任务。