前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RocketMq之Consumer原理浅析

RocketMq之Consumer原理浅析

作者头像
炳臣
发布2020-06-16 17:44:34
1.9K1
发布2020-06-16 17:44:34
举报
文章被收录于专栏:一块自留地一块自留地

Consumer是怎么启动的

源码很长,这里就不仔细看了,其实主要就是初始化了三个组件,然后启动后台定时任务

  • RebalanceImpl 均衡消息队列服务,负责分配当前 Consumer 可消费的消息队列( MessageQueue )。当有新的 Consumer 的加入或移除,都会重新分配消息队列
  • PullAPIWrapper 拉取消息组件
  • offsetStore 消费进度组件

  • PullMessageService 从阻塞队列pullRequestQueue中获取consumer的pull请求
  • RebalanceService 负载均衡定时任务,给 Consumer 分配可消费的 MessageQueue
  • fetchNameServerAddr 定时获取 NameSever 地址
  • updateTopicRouteInfoFromNameServer 定时更新Topic路由信息
  • cleanOfflineBroker 定时清理下线Broker
  • sendHeartbeatToAllBrokerWithLock 发送心跳
  • persistAllConsumerOffset 持久化消费进度 ConsumerOffset

启动流程图如下:

ConsumerGroup是怎么分配MessageQueue的

当一个业务系统部署多台机器时,每台机器都启动了一个Consumer,并且这些Consumer都在同一个ConsumerGroup也就是消费组中,此时一个消费组中多个Consumer消费一个Topic,而一个Topic中会有多个MessageQueue。 那么就会有一个问题,比如有2个Consumer,3个MessageQueue,那么这3个MessageQueue怎么分配呢?这就涉及到Consumer的负载均衡了。 首先 Consumer 在启动时,会把自己注册给所有 Broker ,并保持心跳,让每一个 Broker 都知道消费组中有哪些 Consumer 。 然后 Consumer 在消费时,会随机链接一台 Broker ,获取消费组中的所有 Consumer 。 主要流程如下:

注意这里会对Consumer集合做一个排序,为什么要这样做呢?因为每个 consumer 都是在本地负载均衡,所以要排序,否则多个Consumer之间会有冲突。

Consumer是怎么从Broker获取消息的

消费方式

对于任何一款消息中间件而言,消费者客户端一般有两种方式从消息中间件获取消息并消费:

  • Pull 即消费者每隔一定时间主动去 Broker 拉取消息
    • 优点 消费速度、数量可控
    • 缺点 如果间隔时间短,可能会拉空,并且频繁 RPC 请求增加网络开销 如果间隔时间长,则可能会有消息延迟 消费进度offset需要consumer自己来维护
  • Push 即 Broker 主动实时推送消息给消费者
    • 优点 消息实时,保持长链接,不会频繁建立链接
    • 缺点 如果消息数量过大,消费者吞吐量小,肯能会造成消费者缓冲区溢出

Push 本质上也是基于消费者主动拉取实现的,只不过名字叫push,意思是 Broker 会尽可能实时的把消息给消费者处理。

Push消费模式流程简析
  • 后台独立线程RebalanceServic根据Topic中消息队列个数和当前消费组内消费者个数进行负载均衡,将产生的对应PullRequest实例放入阻塞队列pullRequestQueue中。
  • Consumer端开启后台独立的线程PullMessageService不断地从阻塞队列pullRequestQueue中获取PullRequest请求并通过网络通信模块异步发送Pull消息的RPC请求给Broker端。这里算是比较典型的生产者-消费者模型,实现了准实时的自动消息拉取。
  • PullMessageService异步拉取到消息后,通过PullCallback进行回调处理,如果拉取成功,则更新消费进度,putPullRequest到阻塞队列pullRequestQueue中,接着立即进行拉取
  • PullCallback会把拉取到的消息交给Consumerrequest进行处理,Consumerrequest会调用消费者业务方实现的consumeMessage()接口处理具体业务,消费者业务方处理完成后返回ACK给Consumerrequest,如果消费者ACK返回的失败,则在集群模式下把消息发回 Broker 进行重试(广播模型重试的成本太高),最后更新消费进度offsetTable
  • 在Broker端,PullMessageProcessor业务处理器收到Pull消息的RPC请求后,通过MessageStore实例从commitLog获取消息。如果第一次尝试Pull消息失败(比如Broker端没有可以消费的消息),则通过长轮询机制先hold住并且挂起该请求,然后通过Broker端的后台线程PullRequestHoldService重新尝试和后台线程ReputMessageService进行二次处理。
Push消息流程图:

RocketMQ消息消费的长轮询机制

  • 普通轮询比较简单,就是定时发起请求,服务端收到请求后不论数据有没有更新都立即返回 优点就是实现简单,容易理解。 缺点就是服务端是被动的,服务端要不断的处理客户端连接,并且服务端无法控制客户端pull的频率以及客户端数量
  • 长轮询是对普通轮询的优化,依然由客户端发起请求,服务端收到后并不立即响应而是hold住客户端连接,等待数据产生变更后(或者超过指定时间还未产生变更)才回复客户端 说白了,就是对普通轮询加了个控制,你客户端可以随时请求我,但是回不回复我说了算,这就保证了服务端不会被客户端带节奏,导致自己的压力不可控

在 RocketMq 中消费者主动发起pull请求,broker在处理消息拉取请求时,如果没有查询到消息,将不返回消费者任何信息,而是先hold住并且挂起请求,使其不会立即发起下一次拉取请求,会将请求信息pullRequest添加到pullRequestTable中,等待触发通知消费者的事件。 (pullRequestTable表示待处理的消息拉取请求集合,它的key是Topic+queueId,value中包含了消费者信息(与该消费者的长连接channel),以及其想要拉取的消息位置,后面需要根据这些信息来将对应的新消息返回给对应的消费者)。

然后在Broker端,通过后台独立线程PullRequestHoldService遍历所有挂起的请求pullRequestTable,如果有消息,则返回响应给消费者。 同时,另外一个ReputMessageService线程不断地构建ConsumeQueue/IndexFile数据,不断的检测是否有新消息产生,如果有新消息,则从pullRequestTable通过Topic+queueId的key获取对应hold住的请求pullRequest,再根据其中的长链接channel进行通信响应。 通过这种长轮询机制,即可解决Consumer端需要通过不断地发送无效的轮询Pull请求,而导致整个RocketMQ集群中Broker端负载很高的问题。

资料引用: www.jianshu.com/p/fac642f3c… blog.csdn.net/wb_snail/ar… segmentfault.com/a/119000002…

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020年06月15日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Consumer是怎么启动的
  • ConsumerGroup是怎么分配MessageQueue的
  • Consumer是怎么从Broker获取消息的
    • 消费方式
      • Push消费模式流程简析
        • Push消息流程图:
        • RocketMQ消息消费的长轮询机制
        相关产品与服务
        负载均衡
        负载均衡(Cloud Load Balancer,CLB)提供安全快捷的流量分发服务,访问流量经由 CLB 可以自动分配到云中的多台后端服务器上,扩展系统的服务能力并消除单点故障。负载均衡支持亿级连接和千万级并发,可轻松应对大流量访问,满足业务需求。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档