前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >从RocketMQ的Broker源码层面验证一下这两个点

从RocketMQ的Broker源码层面验证一下这两个点

作者头像
冬夜先生
修改2021-10-08 15:36:34
3720
修改2021-10-08 15:36:34
举报
文章被收录于专栏:csico

之前的文章中,我们知道了RocketMQ中的一些核心概念,例如Broker、NameServer、TopicTag等等。Producer从启动到发送消息的整个过程,从源码级别分析了Producer在发送消息到Broker的时候,是如何拿到Broker的数据的,如何从多个MessageQueue中选择对应的Queue发送消息。

但是由于篇幅原因,文章开头提到的两个已知结论在上篇博客里并没没有对其进行验证,这次就从源码层面来验证一下。

一开头就看到Broker主从架构相关的源码

在上篇博客中提到过,Broker为了保证自身的高可用,会采取一主一从的架构。即使Master Broker因为意外原因挂了,Slave Broker上还有一份完整的数据,Broker可以继续提供服务。

isEnableDLegerCommitLog中提到的DLeger可以先不管,我们目前只需要知道其默认返回的结果是false。所以Broker首次启动的时候,就会执行被If包裹住的逻辑。

RocketMQ本身是有主从架构的,但是功能不够完善,如果Master Broker出现了故障,需要人工的将Slave Broker切换成Master。

就有点类似于手动的将一台Redis设置成另一台Redis的Slave节点,如果此时Redis的Master挂了,还需要手动的进行切换一样。为了解决这个问题,Redis搞出了Sentinel,可以在发生故障的时候自动的实现故障转移。所以RocketMQ在4.5版本之后推出的Dleger差不多也是这么个东西,除此之外,Dleger还可以实现多副本。

不使用Dleger时,主从数据如何进行同步

先给出结论,在RocketMQ的主从架构下,主从同步采取的是Slave主动拉取的方式。

如果当前执行注册的Broker角色是Slave,那就会使用ScheduledExecutorService启动一个周期性的定时任务,每隔10秒就会去Master同步一次,同步的数据包括Topic的相关配置、Consumer的消费偏移量、延迟消息的Offset、订阅组的相关数据和配置。

ScheduledExecutorService的作用和原理下面会做简单介绍。

首次启动时强制进行Broker注册

因为是首次启动,所以参数forceRegister被直接设置成了true。

使用ScheduledExecutorService启动定时任务

通过入口进来之后,Broker会启动一个定时任务,周期性的去注册。ScheduledExecutorService底层就是一个newSingleThreadScheduledExecutor,只有一个线程的线程池,其关键的参数corePoolSize值为1,然后按照指定的频率周期性的执行某个任务。

ScheduledExecutorService主要的功能有两个,分别是:

  • ScheduledExecutorService 以固定的频率执行任务
  • ScheduledExecutorService 执行完之后,间隔制定的时间后再执行下一个任务

使用scheduleAtFixedRate实现心跳机制

此处我们使用的是scheduleAtFixedRate,如下图。

至于执行的频率,我们能够配置的范围最大不能超过一分钟,也就是说这个范围是在10-60秒之间,默认30秒执行一次,这也就验证了每30秒,Broker会向NameServer发送一次心跳。

获取执行频率的这个判断有点意思,甚至看起来有那么一丝丝简洁,但是理解其具体可配置的时间范围可能需要花点时间。在实际业务性代码中,个人建议还是不要这么写,业务中代码的可读性可维护性我认为是需要放在首位的。

值得注意的是,此处启动心跳,给了一个10秒的延迟,因为在不使用Dleger的情况下,在之前的逻辑中已经执行过一次注册了。如果不做延迟,那么几乎是同一个时间就会有两次注册操作,而这明显是不符合预期的;同时forceRegister也从true变成了通过函数isForceRegister来进行获取。

调用registerBrokerAll注册

定时任务注册完成之后,之后的每次触发都会执行registerBrokerAll方法来执行注册,你可能会有疑问,我当前不就是一个Broker吗,怎么名字有个后缀All?那是因为NameServer会有多个,Broker启动的时候会将自己注册到所有的NameServer上去。当然,口说无凭,我们继续看下去。

继续往里走,如果当前满足注册条件,则会实际的执行注册操作。那具体满足什么条件呢?由变量forceRegister和一个needRegister方法来决定,forceRegister默认是true,所以当第一执行这个逻辑的时候是一定会执行注册操作的。

通过对比数据版本判断当前Broker是否需要进行注册

感兴趣的话,可以继续跟随文章了解一下,needRegister是根据什么来判断是否需要注册的。

首先,Broker一旦注册到了NameServer之后,由于Producer不停的在写入数据,Consumer也在不停的消费数据,Broker也可能因为故障导致MessageQueue等关键路由信息发生变动,NameServer中的数据和Broker中实际的数据就会不一致,如果不及时更新,Producer拉取到的路由数据就可能有误。

所以每次定时任务触发的时候会去对比NameServer和Broker的数据,如果发现数据版本不一致,Broker会重新进行注册,将最新的数据更新到NameServer。说直白一点,就是做一个数据定时更新。以下红框中的代码就是数据对比的核心代码。

当Broker和所有的NameServer节点一一完成数据对比之后,就会进行结果判定,但凡有一个节点数据不一致,都需要进行重新注册,把最新的数据更新到NameServer,核心判断逻辑同样用红框标出。

至此,其实我们就已经完成了 Broker在启动的时候会向所有NameServer进行注册 的验证。但是由于后续仍然有值得关注发光点,我们继续后续的源码阅读。

使用CountDownLatch获取所有注册异步任务的返回结果

除此之外,还值得注意的是在needRegister中,对于和多个NameServer的交互,RocketMQ是通过线程池异步实现的,同时使用了CountDownLatch来等待所有的请求结束,返回结果给主线程。

既然聊到了CountDownLatch,就顺带提一下。假设我们有5个互不依赖的计算任务,如果快速的计算出结果并返回呢?那当然是5个任务并发执行,这就需要通过新开线程实现,结果就无法一起返回了。

而CountDownLatch可以让主线程等待,等待这5个计算任务全部结束之后,唤醒主线程再继续后面的逻辑。这就是CountDownLatch的作用,如果平时只是单纯的CRUD功能的话,可能连CountDownLatch是什么都做不知道,这也是为什么大厂面试会问这些问题,因为在大厂的复杂业务背景下,你必须要会使用它们。

指定需要注册之后,接下来就是核心的注册方法了,核心逻辑由registerBrokerAll来实现。Broker同样会去每一个NameServer节点上注册自己,并且为了提前执行的效率,同样开线程采用了异步的方式。在获取所有结果时,同样的使用了CountDownLatch。

使用CopyOnWriteArrayList存储注册请求的返回

除此之外,用于保存注册结果的列表,使用的是CopyOnWriteArrayList,被面试虐过的同学应该熟悉。我们知道此处开启了多线程去不同的NameServer注册,写入注册结果的时候,多线程对同一个列表进行写入,会产生线程安全的问题。

而我们知道ArrayList是非线程安全的,这也是为什么此处要使用CopyOnWriteArrayList来保存注册结果。为什么CopyOnWriteArrayList能够保证线程安全?

这归功于COW(Copy On Write),读请求时共用同一个List,涉及到写请求时,会复制出一个List,并在写入数据的时候加入独占锁。比起直接对所有操作加锁,读写锁的形式分离了读、写请求,使其互不影响,只对写请求加锁,降低了加锁的消耗,提升了整体操作的并发。

上面并发执行的注册操作,具体做了哪些事情呢?先看代码。

上面就是单个注册的所有逻辑,可以看到在构建完请求之后,有一个oneway的判断。

oneway值为false,表示单向通信,Broker不关心NameServer的返回,也不会触发任何回调函数。接下来Broker就会把已经写进request body的所有数据发送给NameServer。请求数据统一由一个叫TopicConfigSerializeWrapper的Wrapper给包裹住。

其可以看为两部分:

  • 存在该Broker节点上的所有Topic的数据
  • 数据版本

然后带着这些数据,Broker会同步的调用invokeSync发送请求给NameServe,并且在执行之后触发实现特定功能的回调函数。

EOF

至此,我们完成了对开篇所提结论的验证,同时也发现了RocketMQ的主从架构、Master和Slave同步数据的方式、心跳机制的实现等等,也基本从源码中看完了Broker启动的所有流程。看这些老哥写的源码还是挺有意思的,之后有时间随缘再看看NameServer端相关的源码吧。

本文系转载,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文系转载前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一开头就看到Broker主从架构相关的源码
  • 不使用Dleger时,主从数据如何进行同步
  • 首次启动时强制进行Broker注册
  • 使用ScheduledExecutorService启动定时任务
  • 使用scheduleAtFixedRate实现心跳机制
  • 调用registerBrokerAll注册
  • 通过对比数据版本判断当前Broker是否需要进行注册
  • 使用CountDownLatch获取所有注册异步任务的返回结果
  • 使用CopyOnWriteArrayList存储注册请求的返回
  • EOF
相关产品与服务
云数据库 Redis®
腾讯云数据库 Redis®(TencentDB for Redis®)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档