首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >聊聊 RocketMQ 拉模式消费者 DefaultLitePullConsumer

聊聊 RocketMQ 拉模式消费者 DefaultLitePullConsumer

作者头像
勇哥java实战
发布2026-01-06 14:57:56
发布2026-01-06 14:57:56
1100
举报
文章被收录于专栏:勇哥编程游记勇哥编程游记

消息队列的消费模式可以大致分为两种,一种是推 Push,一种是拉 Pull

  • 推 Push 服务端主动推送消息给客户端,优点是及时性较好,但如果客户端没有做好流控,一旦服务端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃。
  • 拉 Pull 客户端需要主动到服务端取数据,优点是客户端可以依据自己的消费能力进行消费,但拉取的频率也需要用户自己控制,拉取频繁容易造成服务端和客户端的压力,拉取间隔长又容易造成消费不及时。

RocketMQ 的推 Push 模式实现类是:DefaultMQPushConsumerImpl ,本质上来讲,RocketMQ 的推模式是拉模式的封装,可以实现负载均衡,并提供非常简单易用的 API 供开发者使用,降低了开发者的使用心智。

这篇文章,我们聊聊 RocketMQ 的拉模式 DefaultLitePullConsumerImpl , 希望大家读完之后,可以理解实现原理以及经典实践。

1 示例-自动提交消费进度

我们使用 DefaultLitePullConsumer 创建一个简单的拉模式消费者示例。

代码语言:javascript
复制
@Test
public void testAutoCommit() throws Exception {
    boolean running = true;
    // 定义消费者组 mygroup
    DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("mygroup");
    // 设置名字服务地址
    litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
    // 从最新的进度偏移量开始消费
    litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    // 订阅主题 TopicTest
    litePullConsumer.subscribe("TopicTest", "*");
    // 自动提交消费偏移量的选项设置为 true
    litePullConsumer.setAutoCommit(true);
    // 启动 pull 消费者
    litePullConsumer.start();
    try {
        while (running) {
            // 调用消费者的 poll 方法
            List<MessageExt> messageExts = litePullConsumer.poll();
            System.out.printf("%s%n", messageExts);
        }
    } finally {
        // 关闭消费者
        litePullConsumer.shutdown();
    }
}

代码流程:

  1. 创建一个消费者实例 DefaultLitePullConsumer,订阅主题 "TopicTest" , 消费者组名为 "mygroup"。
  2. 消费者从最新的进度偏移量开始消费,并启用了自动提交消费偏移量的选项。
  3. 启动消费者实例,在一个 while 循环中持续地拉取 poll 消息并输出消息内容。

Push 消费者使用起来的非常简洁,消费监听器内部定义消费逻辑即可,而 DefaultLitePullConsumer 需要手动调用拉取方法,获取消息内容,并填充消费逻辑。

2 示例-手工提交消费进度

第一节的示例是自动提交消息消费进度的,如果采取手动提交,需要应用程序手动调用 DefaultLitePullConsumer 消费者的 commitSync() 方法 。

代码语言:javascript
复制
@Test
public void testNoAutoCommit() throws MQClientException {
    boolean running = true;
    // 定义消费者组 mygroup
    DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("mygroup");
    // 设置名字服务地址
    litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
    // 从最新的进度偏移量
    litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    // 订阅主题 TopicTest
    litePullConsumer.subscribe("TopicTest", "*");
    // 自动提交消费偏移量的选项设置为 false
    litePullConsumer.setAutoCommit(false);
    litePullConsumer.start();
    try {
        while (running) {
            boolean hasException = false; 
            List<MessageExt> messageExts = litePullConsumer.poll(1000L);
            if (CollectionUtils.isNotEmpty(messageExts)) {
                System.out.println("消息大小:" + messageExts.size());
                // 取出第一条消息数据
                MessageExt first = messageExts.get(0);
                MessageQueue messageQueue = new MessageQueue(first.getTopic(), first.getBrokerName(), first.getQueueId());
                for (MessageExt messageExt : messageExts) {
                    // 打印消息内容
                    System.out.println(new String(first.getBody()));
                   // 业务处理代码
                   // doBusiness();
                }
                litePullConsumer.commitSync();
                if(hasException) {
                  // 回滚到第一条消息的消费点位
                  litePullConsumer.seek(messageQueue, first.getQueueOffset());
                }
            }
        }
    } catch (InterruptedException e) {
        thrownew RuntimeException(e);
    } finally {
        // 最终关闭消费者
        litePullConsumer.shutdown();
    }
}

这段代码的逻辑与前一个类似,但有几个关键的不同点:

  1. litePullConsumer.setAutoCommit(false); 将自动提交消费偏移量的选项设置为 false,即关闭了自动提交;
  2. while 循环中,如果收到了消息,则会打印出第一条消息的内容,然后执行消费逻辑;
  3. 调用litePullConsumer.commitSync() 方法手工提交消费进度 ;
  4. 如果发生异常了,通过 litePullConsumer.seek() 方法将消费者的消费点位回滚到第一条消息的位置。

同时,笔者启动两个测试用例,发现 Pull 模式两个细节 :

  • 每次执行 litePullConsumer 的 poll 方法, 得到的同一队列的批量消息数据 ;
  • 多实例消费者启动后,可以实现负载均衡,因此可以非常容易的使用 Pull 模式用于生产环境 。

3 实现原理

RocketMQ 推模式消费流程简化如下图:

接下来,笔者会按照消费流程展示推模式和推模型的异同。

1、负载均衡

DefaultLitePullConsumer 消费者启动 ,同样会进行负载均衡,负载均衡流程和推模式基本相同。

如图,同一个消费组有两个消费者,主题下的四个队列会均匀分布给这两个消费者。 负载均衡之后,接下来会触发拉取消息服务,该服务会将消息拉取到本地缓存起来。

2、拉取消息

拉模式和推模式的差别很大 ,我们知道推模式下 :消费者启动的时候,会创建一个拉取消息服务 PullMessageService ,它是一个单线程的服务。

而拉模式下,会启动一个定时任务(默认 20个拉取线程),如图假如消费者分配了 4 个队列,那么会启动 4 个拉取任务,拉取任务发送请求到队列对应的 Broker 去拉取消息数据,拉取完成之后,将拉取结果存储在本地阻塞队列 consumeRequestCache 中。

3、消费消息

在拉模式下,消费消息是我们手工控制的逻辑。

通过 poll 方法获取消息数据列表,然后遍历消息列表,执行相关消费逻辑。

我们从源码角度分析 poll 方法 到底做了哪些事情?

这段代码的本质很简单:就是从 LinkedBlockingQueue 中弹出元素即可 ,弹出之后,需要做两件事情:

  1. 将消费快照 ProcessQueue 的元素删除掉,返回消费进度偏移量;
  2. 修改负载均衡分配队列的消费偏移量 (负载均衡分配队列是一个本地队列状态类,用于保存消费快照,队列信息,消费进度偏移量,拉取偏移量等信息)。

4、保存进度/消费重试

拉模式可以设置自动提交进度开关 :

代码语言:javascript
复制
litePullConsumer.setAutoCommit(true); // true :自动提交 false:手工提交

自动提交的本质是每次调用 poll 方法时,会执行如下的方法:

我们可以简单的理解为每隔 5 秒执行一次提交分配队列的消费偏移量。

假如设置为手工提交,我们执行同步提交方法 commitSync 方法,本质是调用 commitAll 方法 。

5、消费重试

拉模式下,消费失败时,我们可以调用 seek 方法修改队列的消费进度偏移量。

代码语言:javascript
复制
MessageExt first = messageExts.get(0);
MessageQueue messageQueue = new MessageQueue(first.getTopic(), first.getBrokerName(), first.getQueueId());
// 回滚到第一条消息的点位
litePullConsumer.seek(messageQueue, first.getQueueOffset());

下面我们简要分析下 seek 方法:

核心流程:

  1. 获取队列的对象锁;
  2. 消除缓存中的队列数据 ;
  3. 中断旧的拉取任务并从任务表中移除;
  4. 设置队列的拉取偏移量,便于下次拉取时从该偏移量查询(这个是精髓)
  5. 判断是否需要启动新的拉取任务。

4 应用场景

在笔者看来,拉模式是非常值得学习和应用的技巧,体现在如下两点:

1、灵活控制消费动作

拉模式非常灵活,我们可以在消费前执行判断逻辑,也可以在消费失败后,回滚到指定偏移量。

2、拉取效率高,适合大数据场景

拉模式消费者默认会创建启动 20 个拉取线程,每一个线程对应一个队列拉取任务 ,在消息拉取效率方面比 PUSH 模型具有无可比拟的优势,特别适合大数据领域的批处理任务。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2026-01-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 勇哥java实战分享 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 示例-自动提交消费进度
  • 2 示例-手工提交消费进度
  • 3 实现原理
  • 4 应用场景
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档