本文将带您深入了解 RocketMQ 4.X 的核心知识体系,从架构设计到关键机制,一探这款高可用消息中间件的底层逻辑。
RocketMQ 4.X 架构中包含四种角色 :
1、NameServer
名字服务是是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。它是一个非常简单的 Topic 路由注册中心,其角色类似 Dubbo 中的 zookeeper ,支持 Broker 的动态注册与发现。
2、BrokerServer
Broker 主要负责消息的存储、投递和查询以及服务高可用保证 。
3、Producer
消息发布的角色,Producer 通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
4、Consumer
消息消费的角色,支持以 push 推,pull 拉两种模式对消息进行消费。
RocketMQ 集群工作流程:
1、启动 NameServer,NameServer 起来后监听端口,等待 Broker、Producer 、Consumer 连上来,相当于一个路由控制中心。
2、Broker 启动,跟所有的 NameServer 保持长连接,定时发送心跳包。心跳包中包含当前 Broker信息( IP+端口等 )以及存储所有 Topic 信息。注册成功后,NameServer 集群中就有 Topic 跟 Broker 的映射关系。
3、收发消息前,先创建 Topic,创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时自动创建 Topic。
4、Producer 发送消息,启动时先跟 NameServer 集群中的其中一台建立长连接,并从 NameServer 中获取当前发送的 Topic 存在哪些 Broker 上,轮询从队列列表中选择一个队列,然后与队列所在的 Broker 建立长连接从而向 Broker 发消息。
5、Consumer 跟 Producer 类似,跟其中一台 NameServer 建立长连接,获取当前订阅 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立连接通道,开始消费消息。
传统的消息队列 ActiveMQ 是典型的点对点模式。
图片来自公众号武哥漫谈IT
RocketMQ 和 Kafka 是发布订阅模式。
图片来自公众号武哥漫谈IT
传输内容分为以下四个部分:
1、消息长度:
总长度,四个字节存储,占用一个 int 类型;
2、序列化类型 & 消息头长度:
占用一个 int 类型,第一个字节表示序列化类型,后面三个字节表示消息头长度;
3、消息头数据:
经过序列化后的消息头数据;
4、消息主体数据:
消息主体的二进制字节数据内容。
消息头数据序列化默认是 JSON 格式 ,示例如下:
Reactor 线程模型抽象出三种组件:
Remoting 通讯框架采用了典型的主从多线程模型 ,但还是有变化,即:独立的业务线程池对应不同的请求业务类型。
一个 Reactor 主线程 ( eventLoopGroupBoss
)责监听 TCP网络连接请求,建立好连接,创建 SocketChannel , 并注册到 selector 上。
RocketMQ 源码会自动根据 OS 的类型选择 NIO 和 Epoll ,也可以通过参数配置 ), 然后监听真正的网络数据。
拿到网络数据后,再丢给 Worker 线程池(eventLoopGroupSelector ),再真正执行业务逻辑之前需要进行 SSL 验证、编解码、空闲检查、网络连接管理,这些工作都交给 defaultEventExecutorGroup 去做。
而业务操作由业务线程池中处理,根据 RemotingCommand 的业务请求编号 requestCode , 从处理器表 processorTable 这个本地缓存中找到对应的处理器 , 然后封装成 task 任务后,提交到对应的业务处理器的线程池执行。
RocketMQ 的线程模型如下所示 :
线程数 | 线程名 | 线程具体说明 |
---|---|---|
1 | NettyBoss_%d | Reactor 主线程 |
N | NettyServerEPOLLSelector_%d_%d | Reactor 线程池 |
M1 | NettyServerCodecThread_%d | Worker线程池 |
M2 | RemotingExecutorThread_%d | 业务 processor 处理线程池 |
我们先进入 broker 的文件存储目录 。
消息存储和下面三个文件关系非常紧密:
RocketMQ 采用的是混合型的存储结构,Broker 单个实例下所有的队列共用一个数据文件(commitlog)来存储。
生产者发送消息至 Broker 端,然后 Broker 端使用同步或者异步的方式对消息刷盘持久化,保存至 commitlog 文件中。只要消息被刷盘持久化至磁盘文件 commitlog 中,那么生产者发送的消息就不会丢失。
Broker 端的后台服务线程会不停地分发请求并异步构建 consumequeue(消费文件)和 indexfile(索引文件)。
首先消息是一条一条写入到文件,每条消息的格式是固定的,这种设计对于文件读写来讲有两点优势:
磁盘的存取速度相对内存来讲并不快,一次磁盘 IO 的耗时主要取决于:寻道时间和盘片旋转时间,提高磁盘 IO 性能最有效的方法就是:减少随机 IO,增加顺序 IO 。
《 The Pathologies of Big Data 》这篇文章指出:内存随机读写的速度远远低于磁盘顺序读写的速度。磁盘顺序写入速度可以达到几百兆/s,而随机写入速度只有几百 KB /s,相差上千倍。
因为消息是一条一条写入到 commitlog 文件 ,写入完成后,我们可以得到这条消息的物理偏移量。
每条消息的物理偏移量是唯一的, commitlog 文件名是递增的,可以根据消息的物理偏移量通过二分查找,定位消息位于那个文件中,并获取到消息实体数据。
mmap 是 Linux 提供的一种内存映射文件的机制,它实现了将内核中读缓冲区地址与用户空间缓冲区地址进行映射,从而实现内核缓冲区与用户缓冲区的共享。
基于 mmap + write 系统调用的零拷贝方式,整个拷贝过程会发生 4 次上下文切换,1 次 CPU 拷贝和 2 次 DMA 拷贝。
用户程序读写数据的流程如下:
拷贝方式 | CPU拷贝 | DMA拷贝 | 系统调用 | 上下文切换 |
---|---|---|---|---|
传统方式(read + write) | 2 | 2 | read / write | 4 |
内存映射(mmap + write) | 1 | 2 | mmap / write | 4 |
sendfile | 1 | 2 | sendfile | 2 |
sendfile + DMA gather copy | 0 | 2 | sendfile | 2 |
RocketMQ 选择了 mmap + write 这种零拷贝方式,适用于业务级消息这种小块文件的数据持久化和传输;
而 Kafka 采用的是 sendfile 这种零拷贝方式,适用于系统日志消息这种高吞吐量的大块文件的数据持久化和传输。
核心流程如下:
pullRequest
, 拉取请求保存一个处理队列 processQueue
,内部是红黑树(TreeMap
),用来保存拉取到的消息 ;pullRequestQueue
中弹出拉取消息,执行拉取任务 ,拉取请求是异步回调模式,将拉取到的消息放入到处理队列;pullRequestQueue
中 ;consumeMessageService
的 submitConsumeRequest
方法 ,消费消息服务内部有一个消费线程池;listener.consumeMessage
;updateOffset
,先更新到内存 offsetTable
,定时上报到 Broker ;若消费失败,则将失败消费发送到 Broker 。commitOffset
方法修改内存的消费进度,定时刷盘到 consumerOffset.json
。所有节点都是 master 主节点(比如 2 个或 3 个主节点),没有 slave 从节点的模式。
该模式的优缺点如下:
配置简单 , 性能极高。一个 master 节点的宕机或者重启(维护)对应用程序没有影响。
当磁盘配置为 RAID10 时,消息不会丢失,因为 RAID10 磁盘非常可靠,即使机器不可恢复(消息异步刷盘模式的情况下,会丢失少量消息;如果消息是同步刷盘模式,不会丢失任何消息)。
单台机器宕机时,本机未消费的消息,直到机器恢复后才会订阅,影响消息实时性。
每个主节点配置多个从节点,多对主从。HA 采用异步复制,主节点和从节点之间有短消息延迟(毫秒)。
所谓异步复制,是指消息发送到的 master 后直接返回,不必等待主从复制,而是内部通过异步的方式进行复制。
这种模式的优缺点如下:
即使磁盘损坏,也只会丢失极少的消息,不影响消息的实时性能。
同时,当主节点宕机时,消费者仍然可以消费从节点的消息,这个过程对应用本身是透明的,不需要人为干预。
性能几乎与多 Master 模式一样高。
主节点宕机、磁盘损坏时,会丢失少量消息。
每个 master 节点配置多个 slave 节点,有多对 Master-Slave 。
HA 采用同步双写,即只有消息成功写入到主节点并复制到多个从节点,才会返回成功响应给应用程序。
异步复制指 producer 发送一条消息给 broker 的主节点,只有主节点将数据同步到从节点才会返回结果。
这种模式的优缺点如下:
数据和服务都没有单点故障。在 master 节点关闭的情况下,消息也没有延迟。同时服务可用性和数据可用性非常高。
这种模式下的性能略低于异步复制模式(大约低 10%)。发送单条消息的 RT 略高,目前版本,master 节点宕机后,slave 节点无法自动切换到 master 。
在 RocketMQ 4.5 版本之前,RocketMQ 只有一种 Master/Slave 的部署方式。在这种模式下,一组 broker 包含一个 Master 和零到多个 Slave,Slave 通过同步或异步复制的方式与 Master 保持数据一致。
但这种部署模式提供了一定程度的高可用性,但也存在一些缺陷。例如,在故障转移方面,如果主节点发生故障,仍然需要手动重启或切换,无法自动将一个从节点转换为主节点。
因此,核心问题是:多副本架构需要解决自动故障转移的问题,也就是自动选主。
这个问题的解决方案基本可以分为两种:
1、第三方协调服务
我们利用第三方协调服务集群(如 Zookeeper 或 etcd)进行选主,但这样会引入额外的外部组件,增加了部署、运维和故障诊断的成本,我们不仅需要维护 RocketMQ 集群,还需要维护 Zookeeper 集群。
所以,我们看到 Kafka 的新版本已经摈弃了 Zookeeper 而是选择了第二种方案。
2、不需要引入外部组件,使用 Raft 协议进行自动选主
自动选主逻辑集成在各个节点的进程中,节点之间通过通信即可完成选主。
因此,最终选择 Raft 协议来解决这个问题,而 DLedger 就是基于 Raft 协议的 commitlog 存储库,是 RocketMQ 实现新的高可用多副本架构的关键。
如图,我们定义了两个 DLedger Group ,分别是:RaftNode00 和 RaftNode01。
每个 DLedger Group 要求包含 至少 3 台机器 部署,每台机器部署 Broker 服务 , 机器数量为奇数。
通过 Raft 自动选举出一个 Leader,其余节点作为 Follower,并在 Leader 和 Follower 之间复制数据以保证高可用。
RocketMQ 的 DLedger 模式能自动容灾切换,并保证数据一致,同时支持水平扩展的,即:部署任意多个 RocketMQ Group 同时对外提供服务。
RocketMQ 事务消息是支持在分布式场景下保障消息生产和本地事务的最终一致性。交互流程如下图所示:
1、生产者将消息发送至 Broker 。
2、Broker 将消息持久化成功之后,向生产者返回 Ack 确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。
3、生产者开始执行本地事务逻辑。
4、生产者根据本地事务执行结果向服务端提交二次确认结果( Commit 或是 Rollback ),Broker 收到确认结果后处理逻辑如下:
5、在断网或者是生产者应用重启的特殊情况下,若 Broker 未收到发送者提交的二次确认结果,或 Broker 收到的二次确认结果为 Unknown 未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。
当使用 RocketMQ 广播消费模式时,每条消息推送给集群内所有的消费者,保证消息至少被每个消费者消费一次。
广播消费主要用于两种场景:消息推送和缓存同步。
笔者第一次接触广播消费的业务场景是神州专车司机端的消息推送。
用户下单之后,订单系统生成专车订单,派单系统会根据相关算法将订单派给某司机,司机端就会收到派单推送。
推送服务是一个 TCP 服务(自定义协议),同时也是一个消费者服务,消息模式是广播消费。
司机打开司机端 APP 后,APP 会通过负载均衡和推送服务创建长连接,推送服务会保存 TCP 连接引用 (比如司机编号和 TCP channel 的引用)。
派单服务是生产者,将派单数据发送到 MetaQ , 每个推送服务都会消费到该消息,推送服务判断本地内存中是否存在该司机的 TCP channel , 若存在,则通过 TCP 连接将数据推送给司机端。
肯定有同学会问:假如网络原因,推送失败怎么处理 ?有两个要点:
高并发场景下,很多应用使用本地缓存,提升系统性能 。
如上图,应用A启动后,作为一个 RocketMQ 消费者,消息模式设置为广播消费。为了提升接口性能,每个应用节点都会将字典表加载到本地缓存里。
当字典表数据变更时,可以通过业务系统发送一条消息到 RocketMQ ,每个应用节点都会消费消息,刷新本地缓存。
顺序消息是指对于一个指定的 Topic ,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。
顺序消息分为分区顺序消息和全局顺序消息。
1、分区顺序消息
对于指定的一个 Topic ,所有消息根据 Sharding Key 进行区块分区,同一个分区内的消息按照严格的先进先出(FIFO)原则进行发布和消费。同一分区内的消息保证顺序,不同分区之间的消息顺序不做要求。
2、全局顺序消息
对于指定的一个 Topic ,所有消息按照严格的先入先出(FIFO)的顺序来发布和消费。
全局顺序消息实际上是一种特殊的分区顺序消息,即 Topic 中只有一个分区,因此全局顺序和分区顺序的实现原理相同。 因为分区顺序消息有多个分区,所以分区顺序消息比全局顺序消息的并发度和性能更高。
消息的顺序需要由两个阶段保证:
生产者顺序发送消息封装
消费者消费消息时,需要保证单线程消费每个队列的消息数据,从而实现消费顺序和发布顺序的一致。
顺序消费服务的类是 ConsumeMessageOrderlyService ,在负载均衡阶段,并发消费和顺序消费并没有什么大的差别。
最大的差别在于:顺序消费会向 Borker 申请锁 。消费者根据分配的队列 messageQueue ,向 Borker 申请锁 ,如果申请成功,则会拉取消息,如果失败,则定时任务每隔20秒会重新尝试。
RocketMQ 包含两种部署架构: Master-Slave 架构 和 Deleger 架构 。
首先是 Master-Slave 架构,它的问题很明显,由于组内没有 failover 能力,所以
然后是 Deleger 架构 ,通过 Master 故障后短时间内重新选出新的 Master 来解决上述问题,但是由于 Raft 选主和复制能力在复制链路上,因此存在以下问题:
同时,我们提到了 RocketMQ 4.X 的消费流程,它的消费逻辑有两个非常明显的特点:
RocketMQ 5.0 引入了全新的弹性无状态代理模式,将当前的Broker职责进行拆分,对于客户端协议适配、权限管理、消费管理等计算逻辑进行抽离,独立无状态的代理角色提供服务,Broker则继续专注于存储能力的持续优化。这套模式可以更好地实现在云环境的资源弹性调度。 值得注意的是RocketMQ 5.0的全新模式是和4.0的极简架构模式相容相通的,5.0的代理架构完全可以以Local模式运行,实现与4.0架构完全一致的效果。开发者可以根据自身的业务场景自由选择架构部署。