Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >消息中间件—RocketMQ消息消费(一)

消息中间件—RocketMQ消息消费(一)

作者头像
用户2991389
发布于 2018-09-05 04:52:54
发布于 2018-09-05 04:52:54
1.9K00
代码可运行
举报
运行总次数:0
代码可运行

文章摘要:在发送消息给RocketMQ后,消费者需要消费。消息的消费比发送要复杂一些,那么RocketMQ是如何来做的呢? 在RocketMQ系列文章的前面几篇幅中已经对其“RPC通信部分”和“普通消息发送”两部分进行了详细的阐述,本文将主要从消息消费为切入点简要地介绍下“RocketMQ中Pull和Push的两种消费方式”、“RocketMQ中消费者(Push模式)的启动流程”和“RocketMQ中Pull和Push两种消费方式的简要流程”。在阅读本篇之前希望读者能够先仔细阅读下关于RocketMQ分布式消息队列的前几篇文章: (1)消息中间件—RocketMQ的RPC通信(一) (2)消息中间件—RocketMQ的RPC通信(二) (3)消息中间件—RocketMQ消息发送

一、如何选择消息消费的方式—Pull or Push?

1.1 MQ中Pull和Push的两种消费方式

对于任何一款消息中间件而言,消费者客户端一般有两种方式从消息中间件获取消息并消费: (1)Push方式:由消息中间件(MQ消息服务器代理)主动地将消息推送给消费者;采用Push方式,可以尽可能实时地将消息发送给消费者进行消费。但是,在消费者的处理消息的能力较弱的时候(比如,消费者端的业务系统处理一条消息的流程比较复杂,其中的调用链路比较多导致消费时间比较久。概括起来地说就是“慢消费问题”),而MQ不断地向消费者Push消息,消费者端的缓冲区可能会溢出,导致异常; (2)Pull方式:由消费者客户端主动向消息中间件(MQ消息服务器代理)拉取消息;采用Pull方式,如何设置Pull消息的频率需要重点去考虑,举个例子来说,可能1分钟内连续来了1000条消息,然后2小时内没有新消息产生(概括起来说就是“消息延迟与忙等待”)。如果每次Pull的时间间隔比较久,会增加消息的延迟,即消息到达消费者的时间加长,MQ中消息的堆积量变大;若每次Pull的时间间隔较短,但是在一段时间内MQ中并没有任何消息可以消费,那么会产生很多无效的Pull请求的RPC开销,影响MQ整体的网络性能;

1.2 RocketMQ消息消费的长轮询机制

思考题: 上面简要说明了Push和Pull两种消息消费方式的概念和各自特点。如果长时间没有消息,而消费者端又不停的发送Pull请求不就会导致RocketMQ中Broker端负载很高吗?那么在RocketMQ中如何解决以做到高效的消息消费呢?

通过研究源码可知,RocketMQ的消费方式都是基于拉模式拉取消息的,而在这其中有一种长轮询机制(对普通轮询的一种优化),来平衡上面Push/Pull模型的各自缺点。基本设计思路是:消费者如果第一次尝试Pull消息失败(比如:Broker端没有可以消费的消息),并不立即给消费者客户端返回Response的响应,而是先hold住并且挂起请求(将请求保存至pullRequestTable本地缓存变量中),然后Broker端的后台独立线程—PullRequestHoldService会从pullRequestTable本地缓存变量中不断地去取,具体的做法是查询待拉取消息的偏移量是否小于消费队列最大偏移量,如果条件成立则说明有新消息达到Broker端(这里,在RocketMQ的Broker端会有一个后台独立线程—ReputMessageService不停地构建ConsumeQueue/IndexFile数据,同时取出hold住的请求并进行二次处理),则通过重新调用一次业务处理器—PullMessageProcessor的处理请求方法—processRequest()来重新尝试拉取消息(此处,每隔5S重试一次,默认长轮询整体的时间设置为30s)。 RocketMQ消息Pull的长轮询机制的关键在于Broker端的PullRequestHoldService和ReputMessageService两个后台线程。对于RocketMQ的长轮询(LongPolling)消费模式后面会专门详细介绍。

二、RocketMQ中两种消费方式的demo代码

(1)Pull模式的Consumer端代码如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setInstanceName("consumer");
        consumer.start();

        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest111");
        for (MessageQueue mq : mqs) {
            System.out.printf("Consume from the queue: %s%n", mq);
            SINGLE_MQ:
            while (true) {
                try {
                    PullResult pullResult =
                        consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    System.out.printf("%s%n", pullResult);
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            System.out.println(pullResult.getMsgFoundList().get(0).toString());
                            break;
                        case NO_NEW_MSG:
                            break SINGLE_MQ;
                        case NO_MATCHED_MSG:
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }
                } catch (Exception e) {
                    //TODO
                }
            }
        }
        consumer.shutdown();

在示例代码中,可以看到业务工程在Consumer启动后,Consumer主动获取MessageQueue的Set集合,遍历该集合中的每一个队列,发送Pull的请求(参数中带有队列中的消息偏移量),同时需要Consumer端自己保存消息消费的offset偏移量至本地变量中。在Pull模式下,需要业务应用代码自身去完成比较多的事情,因此在实际应用中用的较少。 (2)Push模式的Consumer端代码如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
        consumer.subscribe("TopicTest111", "*");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.setInstanceName("consumer1");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();

在示例代码中,业务工程的应用程序使用Push方式进行消费时,Consumer端注册了一个监听器,Consumer在收到消息后主动调用这个监听器完成消费并进行对应的业务逻辑处理。由此可见,业务应用代码只需要完成消息消费即可,无需参与MQ本身的一些任务处理(ps:业务代码显得更为简洁一些)。

三、RocketMQ中消费者Push方式的启动流程

这一节主要先讲下RocketMQ消费者的启动流程,看下在启动的时候究竟完成了什么样的操作。由于RocketMQ的DefaultMQPushConsumer和DefaultMQPullConsumer启动流程大部分类似,而DefaultMQPushConsumer更为复杂一些,因此这一节内容主要讲的是DefaultMQPushConsumer启动流程。Push方式的Consumer启动流程的时序图如下图所示:

RocketMQ的PushConsumer启动时序图.jpg

从上面的时序图上可以看出,Push方式的Consumer启动流程完成的任务比较多,主要任务如下: (1)设置consumerGroup、NameServer服务地址、消费起始偏移地址并根据参数Topic构建Consumer端的SubscriptionData(订阅关系值); (2)在Consumer端注册消费者监听器,当消息到来时完成消费消息; (3)启动defaultMQPushConsumerImpl实例,主要完成前置校验、复制订阅关系(将defaultMQPushConsumer的订阅关系复制至rebalanceImpl中,包括retryTopic(重试主题)对应的订阅关系)、创建MQClientInstance实例、设置rebalanceImpl的各个属性值、pullAPIWrapper包装类对象的初始化、初始化offsetStore实例并加载消费进度、启动消息消费服务线程以及在MQClientInstance中注册consumer等任务; (4)启动MQClientInstance实例,其中包括完成客户端网络通信线程、拉取消息服务线程、负载均衡服务线程和若干个定时任务的启动; (5)向所有的Broker端发送心跳(采用加锁方式); (6)最后,唤醒负载均衡服务线程在Consumer端开始负载均衡;

四、RocketMQ中Pull和Push两种消费模式流程简析

RocketMQ提供了两种消费模式,Push和Pull,大多数场景使用的是Push模式,在源码中这两种模式分别对应的是DefaultMQPushConsumer类和DefaultMQPullConsumer类。Push模式实际上在内部还是使用的Pull方式实现的,通过Pull不断地轮询Broker获取消息,当不存在新消息时,Broker端会挂起Pull请求,直到有新消息产生才取消挂起,返回新消息。 (1)RocketMQ的Pull消费模式流程简析 RocketMQ的Pull模式相对来得简单,从上面的demo代码中可以看出,业务应用代码通过由Topic获取到的MessageQueue直接拉取消息(最后真正执行的是PullAPIWrapper的pullKernelImpl()方法,通过发送拉取消息的RPC请求给Broker端)。其中,消息消费的偏移量需要Consumer端自己去维护。 (2)RocketMQ的Push消费模式流程简析 在本文前面已经提到过了,从严格意义上说,RocketMQ并没有实现真正的消息消费的Push模式,而是对Pull模式进行了一定的优化,一方面在Consumer端开启后台独立的线程—PullMessageService不断地从阻塞队列—pullRequestQueue中获取PullRequest请求并通过网络通信模块发送Pull消息的RPC请求给Broker端。另外一方面,后台独立线程—rebalanceService根据Topic中消息队列个数和当前消费组内消费者个数进行负载均衡,将产生的对应PullRequest实例放入阻塞队列—pullRequestQueue中。这里算是比较典型的生产者-消费者模型,实现了准实时的自动消息拉取。然后,再根据业务反馈是否成功消费来推动消费进度。 在Broker端,PullMessageProcessor业务处理器收到Pull消息的RPC请求后,通过MessageStore实例从commitLog获取消息。如1.2节内容所述,如果第一次尝试Pull消息失败(比如Broker端没有可以消费的消息),则通过长轮询机制先hold住并且挂起该请求,然后通过Broker端的后台线程PullRequestHoldService重新尝试和后台线程ReputMessageService的二次处理。

思考题 使用RocketMQ的Pull模式进行消息消费时,由上面可知该模式下无需自动拉取消息,这样在DefaultMQPullConsumerImpl启动时,消息拉取线程—PullMessageService和消息队列负载线程—RebalanceService其实也就没必要启动,但实际上却启动了,这里会有问题么?

五、总结

RocketMQ的消息消费(一)(入门篇幅)就先分析到这里了。建议读者可以将作者之前写的三篇文章—“RocketMQ的RPC通信(一)/(二)”以及“RocketMQ消息发送”结合起来读,这样会整体会更加连贯,收获更大。关于RocketMQ消息消费的内容比较多也比较复杂,涉及“Consumer端的负载均衡机制”、“RocketMQ的长轮询机制”和“RocketMQ中Pull和Push消费模式的细节内容”将在后续篇幅进行介绍和分析。限于笔者的才疏学浅,对本文内容可能还有理解不到位的地方,如有阐述不合理之处还望留言一起探讨。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018.08.12 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
消息中间件—RocketMQ消息消费(二)(push模式实现)
摘要:在RocketMQ中,消息消费都是基于Pull消息方式,那么Push模式中又是如何实现Consumer端准实时消费的呢? 在上一篇—“消息中间件—RocketMQ消息消费(一)”中,已经简要地介绍了下RocketMQ中“Pull和Push两种消费方式的简要流程”以及“Push消费方式的启动流程”(ps:如果不熟悉这几块内容的童鞋,可以自己回顾下上一篇的内容)。本文将详细介绍RocketMQ中Push消费方式下的“Pull消息的长轮询机制”和“Consumer端的负载均衡机制”这两块关键核心内容。 由于RocketMQ系列的技术分享存在一定的连续性,因此希望读者能回顾下往期RocketMQ分享的篇幅: (1)消息中间件—RocketMQ的RPC通信(一) (2)消息中间件—RocketMQ的RPC通信(二) (3)消息中间件—RocketMQ消息发送 (4)消息中间件—RocketMQ消息消费(一)
用户2991389
2018/09/05
2K0
消息中间件—RocketMQ消息消费(二)(push模式实现)
【RocketMq实战第四篇】不同类型消费者DefaultMQPushConsumerDefaultMQPullConsumer
前言 生产者和消费者是消息队列的两个重要角色,生产者向消息队列写人数据, 消费者从消息队列里读取数据。本篇讲解两种类型的消费者,一个是 DefaultMQPushConsumer,由系统控制读取操作,收到消息后自动调用传人的 处理方法来处理;另 一个是 DefaultMQPullConsumer,读取操作中的大部分功 能由使用者自主控制 。
胖虎
2019/06/26
3K0
【RocketMq实战第四篇】不同类型消费者DefaultMQPushConsumerDefaultMQPullConsumer
RocketMQ源码(二)消息消费的模式到底是Push还是Pull?
RocketMQ为开发者提供了两种消息的消费模式,分别是Pull和Push,对应的实现是DefaultMQPullConsumer和DefaultMQPushConsumer; 接下来我将带大家通过以下几个方面了解这两种模式:
用户2031163
2023/11/29
7830
RocketMQ之消费者启动与消费流程
RocketMQ是由阿里巴巴开源的分布式消息中间件,支持顺序消息、定时消息、自定义过滤器、负载均衡、pull/push消息等功能。RocketMQ主要由 Producer、Broker、Consumer 、NameServer四部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。NameServer充当名字路由服务,整体架构图如下所示:
2020labs小助手
2022/07/12
1.1K0
消息中间件—RocketMQ消息消费(三)(消息消费重试)
摘要:如果Consumer端消费消息失败,那么RocketMQ是如何对失败的异常情况进行处理? 前面两篇RocketMQ消息消费(一)/(二)篇,主要从Push/Pull两种消费模式的简要流程、长轮询机制和Consumer端负载均衡这几点内容出发,介绍了RocketMQ消息消费的正常流程和细节内容,本篇内容将主要介绍Consumer端消费失败的异常流程。 这里先回顾往期RocketMQ技术分享的篇幅: (1)消息中间件—RocketMQ的RPC通信(一) (2)消息中间件—RocketMQ的RPC通信(二) (3)消息中间件—RocketMQ消息发送 (4)消息中间件—RocketMQ消息消费(一) (5)消息中间件—RocketMQ消息消费(二)(push模式实现)
用户2991389
2018/09/05
3.7K0
消息中间件—RocketMQ消息消费(三)(消息消费重试)
面试系列之-rocketmq长轮询模式
Consumer主动从Broker获取消息,可以设置多久拉取一次、可以设置一次拉取多少条消息等参数;
用户4283147
2022/12/29
6400
面试系列之-rocketmq长轮询模式
详解RocketMQ不同类型的消费者
根据使用者对读取操作的控制情况,分为两种类型。一个是DefaultMQPushConsumer,由系统控制读取操作,收到消息后自动调用传入的处理方法来处理;另一个是DefaultMQPullConsumer,读取操作中的大部分功能由使用者自主控制。 1.DefaultMQPushConsumer的使用 使用DefaultMQPushConsumer主要是设置好各种参数和传入处理消息的函数。系统收到消息后自动调用处理函数来处理消息,自动保存Offset,而且加入新的DefaultMQPushConsumer后会自动做负载均衡。下面结合org.apache.rocketmq.example.quickstart包中的源码来介绍。 代码清单1-1 DefaultMQPushConsumer示例 public class QuickStart {
全栈程序员站长
2021/06/10
8270
消费者原理分析-RocketMQ知识体系4
前文了解了 RocketMQ消息存储的相关原理,本文将讲讲消息消费的过程及相关概念。
DougWang
2021/07/21
1.3K0
深入分析 RocketMQ 的 Push 消费方式实现
RocketMQ 是阿里巴巴旗下一款开源的 MQ 框架,经历过双十一考验,由 Java 编程语言实现,有非常完整的生态系统。RocketMQ 作为一款纯 Java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。
政采云前端团队
2023/11/09
1.4K0
深入分析 RocketMQ 的 Push 消费方式实现
RocketMQ详解(10)——Consumer详解
DefaultMQPushConsumer主要功能实现在DefaultMQPushConsumerImpl中,消息处理逻辑是在pullMessage()方法的PullCallback回调中。在PullCallback回调中有个switch语句,根据Broker返回的消息类型做响应的处理,具体逻辑看源码:
张申傲
2020/09/03
2.1K0
RocketMq之Consumer原理浅析
当一个业务系统部署多台机器时,每台机器都启动了一个Consumer,并且这些Consumer都在同一个ConsumerGroup也就是消费组中,此时一个消费组中多个Consumer消费一个Topic,而一个Topic中会有多个MessageQueue。 那么就会有一个问题,比如有2个Consumer,3个MessageQueue,那么这3个MessageQueue怎么分配呢?这就涉及到Consumer的负载均衡了。 首先 Consumer 在启动时,会把自己注册给所有 Broker ,并保持心跳,让每一个 Broker 都知道消费组中有哪些 Consumer 。 然后 Consumer 在消费时,会随机链接一台 Broker ,获取消费组中的所有 Consumer 。 主要流程如下:
周同学
2020/06/16
1.9K1
RocketMQ(四):消费前如何拉取消息?(长轮询机制)
上篇文章从Broker接收消息开始,到消息持久化到各种文件结束,分析完消息在Broker持久化的流程与原理
菜菜的后端私房菜
2024/09/26
6960
5 张图带你理解 RocketMQ 消费者启动过程
多数消息队列中,消费者和 Broker 通信的方式有两种,PUSH 模式和 PULL 模式:
jinjunzhu
2022/09/23
5120
5 张图带你理解 RocketMQ 消费者启动过程
RocketMQ(一):推拉消费模型客户端实践
现在的的互联网系统中,mq已经必备基础设施了,我们已明显感觉它的必要性与强大。然而,它的本质是啥?存储转发系统罢了!
烂猪皮
2021/01/28
1.3K0
RocketMQ(一):推拉消费模型客户端实践
RocketMQ(八):轻量级拉取消费原理
这里推荐一篇JVM垃圾回收相关的文章:深入解析Java垃圾回收机制:原理、实现与优化策略 文章阐述了JVM垃圾回收的基本概念、机制以及设计原理,总结垃圾回收的算法与实现细节,以及优化策略
菜菜的后端私房菜
2024/11/15
2040
RocketMQ(一):消息中间件缘起,一览整体架构及核心组件
在队列的基础上,加入生产者与消费者模型,使用队列作为载体就能够组成简单的消息队列,在队列中“运输”的数据被称为消息
菜菜的后端私房菜
2024/09/12
9630
云原生中间件RocketMQ-消费者消费模式之广播模式、偏移量offset解析
广播消费: 当使用广播消费模式时, 消息队列 RocketMQ 会将每条消息推送给集群内所有注册过的客户端, 保证消息至少被每台机器消费一次。
共饮一杯无
2022/11/28
1.5K0
云原生中间件RocketMQ-消费者消费模式之广播模式、偏移量offset解析
3分钟白话RocketMQ系列—— 如何消费消息
前面已经介绍了 生产消息、存储消息 两大块内容,那接下来,我们白话一下RocketMQ是如何消费消息的,揭秘消息消费全过程。
阿丸笔记
2023/10/22
1.3K0
3分钟白话RocketMQ系列—— 如何消费消息
RocketMQ原理—5.高可用+高并发+高性能架构
Push同步模式:Producer往Broker主节点写入数据后,Broker主节点会主动把数据推送Push到Broker从节点里。
东阳马生架构
2025/04/06
1320
RocketMQ消息过滤实现原理
RocketMQ消息中间件相比于其他消息中间件提供了更细粒度的消息过滤,相比于Topic做业务维度的区分,Tag,即消息标签,用于对某个Topic下的消息进行进一步分类。消息队列RocketMQ版的生产者在发送消息时,指定消息的Tag,消费者需根据已经指定的Tag来进行订阅。
叔牙
2022/12/18
6230
RocketMQ消息过滤实现原理
推荐阅读
相关推荐
消息中间件—RocketMQ消息消费(二)(push模式实现)
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验