本文参考<RocketMQ技术内幕>
消息存储
不会永久保存消息文件,而是启用文件过期策略,在磁盘空间不足或在凌晨4点删除过期文件,文件默认保存72小时,删除时不会判断该文件上的消息是否被消费
数据文件
- 文件目录
- commitlog
真正的消息体及元数据就存储在这个目录文件下,该目录下,单个文件大小默认1G ,文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件;
- consumequeue
消息消费队列,消息消费是针对主题的,如果要通过遍历commitlog文件来检索对应topic的消息是非常低效的,所以引入了consumequeue的概念。consumequeue文件可以看成是基于topic的commitlog索引文件,存储路径为$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样consumequeue文件采取定长设计,每一个条目共20个字节,分别为8字节的commitlog物理偏移量、4字节的消息长度、8字节tag hashcode,单个文件由30W个条目组成,可以像数组一样随机访问每一个条目,每个consumequeue文件大小约5.72M;
- indexFile
索引文件,IndexFile提供了一种可以通过key或时间区间来查询消息的方法,IndexFile的存储位置是
{fileName},文件名fileName是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存2000W个索引,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故rocketmq的索引文件其底层实现为hash索引
IndexFileHeader(40) + hash槽(500W*4) + 具体索引(2000W * 20)
- IndexFileHeader: 该索引文件消息最小存储时间(8) + 该索引文件消息最大存储时间(8) + 该索引文件消息最小偏移量(8) + 该索引文件消息最大偏移量(8) + hash槽个数(4) + 已有索引个数(4)
- hash槽:500W个, 每个4字节,每个槽存储的是:具有相同hashcode具体索引 最新那条的下标。2000W条具体索引,放到500W个桶里,平均每个桶对应4个具体索引,这个4个具体索引组装成链式结构,槽里存最新的那个具体索引的位置,而具体索引里面又村了上一个具体索引的位置。所以查找的时候,根据key的hashcode找到对应的hash槽,然后根据传进来的key和物理偏移量找到消息进行比较,就可以找到对应的消息了
- 具体索引:2000W个,每个20个字节 key的hashcode(4) + 消息对应的物理偏移量(4) + 该消息存储时间与第一条消息存储时间的差值,小于0该消息无效(4) + 该索引的前一个索引的位置,0表示没有(4)
写入时机
- commitlog
文件对应的是MappedFile,每个文件大小1G
1. 获取消息类型 事务消息,非事务消息,Commit消息
2. 获取一个 MappedFile 对象,内存映射的具体实现
3. 追加消息需要加锁,串行化处理
4. 验证3中的 MappedFile 对象,获取一个可用的 MappedFile ,如果没有,则创建一个
5. 通过MappedFile对象写入文件
6. 根据刷盘策略刷盘
7. 主从同步
- consumequeue
通过定时任务
ReputMessageService
每毫秒执行一次 ,将 准发commitLog文件更新,用于更新 consumequeue文件 和 index 文件
consumequeue文件 都是异步刷盘, 会不会丢数据? - indexFile
和
consumequeue
一样,通过定时任务ReputMessageService
每毫秒执行一次
consumequeue和indexFile恢复
Broker接到消息之后,先将消息写到commitlog
文件,然后通过异步任务更新到consumequeue
和 indexFile
,加入消息写到commitlog
文件之后Broker宕机了,那consumequeue
和 indexFile
和commitlog
文件
中的消息就无法对应,导致有一部分消息永远无法被消费和查找,所以需要有一种恢复机制
- Broker在启动的时候会创建
${ROCKET_HOME}/store/abort
文件,Broker进程注册了JVM钩子函数,在退出时将abort文件删除,如果下次启动时abort文件存在,说明时异常退出,说明需要修复 - 加载
commitlog
文件,加载commitlog
consumequeue
indexFile
的刷盘点,有点复杂,自己看吧
刷盘机制
- 同步刷盘: 消息加到内存后,调用
MappedByteBuffer#force
方法,即消息刷到磁盘 - 异步刷盘: 消息加到内存之后立即返回客户端,默认是异步刷盘,默认异步
负载均衡
主要包括两方面: 发送消息时,均匀点发送到不同的Broker; 消费消息时候,消费者实例分配均匀的消费队列
发送端负载
路由信息中会细化为message queue
的路由信息,每个Broker上的Topic默认有4个消息队列和4个消费队列,每个Topic会分布在不同Broker上,Producer将消息发送到不同的消息队列,也就是说压力会分摊到不同的Broker上了,这样消息的存储和转发均起到了负载均衡的作用.
Producer发消息通过轮询来达到负载均衡的效果, 但有两种模式
- 不启动Broker故障延迟机制, 默认的.
- 启动Broker故障延迟机制, 第一次发送失败之后,暂时将该Broker排除在消息队列选择范围之内
消费端负载
- 一个消费组订阅了某个Topic
- 这个Topic在每个Broker上有4个消费队列
- 一个消费组有多个消费组实例
- 将该Topic所有的消费队列均匀的分给这几个消费者实例
- 一个消费者可以消费多个消费队列;同一时刻,一个消费队列只能被一个消费者消费; 当然在重新负载之后可以分配给别的消费者
有以下几种负载均衡策略
- 均匀分布: 6个queue, 分给A B. 先分A3条, 再分B3条
- 均匀分布: 6个queue, 分给A B. 先分A 1条, 再分B 1条,再分A 1条,再分B 1条,再分A 1条,再分B 1条
- 一致性Hash
- 根据配置,为每个消费者配置固定的消费队列
- 根据Broker部署机房名,对每个消费者负责不同Broker上的队列
Producer
- 构建与namesrv通信的netty客户端
- 默认每30s与namesrv交换获取broker相关信息
- 默认每30s去掉失效的broker信息以及发送心跳到所有broker上面
发送消息
- 同步发送: Producer向broker发送消息,阻塞当前线程等待broker响应结果
- 异步发送: Producer首先构建一个向broker发送消息的任务,把该任务提交给线程池,等执行完该任务时回调用户自定义的回调函数
- 单向: 发送特点为发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答
- 批量消息发送: 将同一个主题的多条消息一起打包发送到消息服务端,减少网络调用次数,有长度限制,如果打包的消息太长,会影响其他线程发送消息的响应时间
- 默认以同步的方式发送消息
- 查找主题的路由信息,这样才知道要发送到哪个Broker,有缓存就去缓存中拿,没缓存就去NameServer中查
- 1个Broker默认有4个消息队列,4个消费队列,发消息的时候怎么知道发送到哪个队列?
Consumer
消费者中保存了哪些信息
- 消息消费模式,集群或者广播
- 消息业务监听器
- 消息消费进度存储器
- 并发消费时,处理队列的最大跨度,默认2000,表示如果队列中最大偏移量的消息和最小偏移量的消息跨度超过2000,则延迟50毫秒后再拉取消息
- 每次消息拉取条数,默认32条
- 消费者最小线程数,默认20,因为线程池使用了无界最大,所以最大线程数只有20
消费者启动
- 构建主题订阅信息缓存起来,主要有两个主题:一个是正常订阅的主题 一个是重试主题
%RETRY%+消费组名
- 初始化消息进度,集群模式消息进度保存在Broker上;广播模式,消息进度存储在消费端
- 根据是否顺序消费,创建消费端消费线程服务. 线程池
- 向
MQClientINstance
注册消费者,并启动MQClientINstance
,在一个JVM中的所有消费者和生产者公用一个MQClientINstance
,MQClientINstance
只会启动一次
消息拉取
Pull模式
应用程序直接调API拉消息即可
消息拉取Push模式
每次消息拉取操作可以看成是一个任务,该任务被抽象成PullRequest
对象,拉取到的消息先存放在PullRequest
对象的ProcessQueue
中,然后提交到消费者线程池消费,消息成功消费后从ProcessQueue
中移除,
消息在其内部为TreeMap结构,key表示消息在ConsumerQueue中的偏移量
- 从
PullRequest
对象中获取ProcessQueue
中,并更新ProcessQueue
的最后更新时间为当前时间 - 进行消息拉取流控,主要包括两方面: 如果
ProcessQueue
当前的消息条数超过了1000,将触发流控,放弃本次拉取,并且该队列的下一次拉取任务将在50毫秒后才加入到拉取队列中; 对ProcessQueue
中最大偏移量和最小偏移量的限制 - 拉取该订阅主题的消息,如果为空,结束本次拉取,关于该队列的下一次拉取任务延迟3s
- 与服务端交互: 从哪个消费队列拉取?消息拉取偏移量?消息过滤表达式?本次拉取最大消息条数,默认32条
- 根据brokerName 和 brokerId 从
MQClientINstance
中获取Broker地址,在整个集群中,相同名称的Broker构成主从结构,brokerId会不一样 - broker端根据topic 和 队列编号获取消息消费队列
- 根据拉取偏移量拉取消息
- 更新消费进度
- 返回消息給消息拉取客户端,即consumer
- consumer收到消息后先更新下次拉取偏移量,然后将拉取到的消息存到
ProcessQueue
中,然后将消息提交到消费者线程消费,等待pullInterval毫秒后,将PullRequest
对象重新放到阻塞队列中,达到持续拉取消息的目的
消息拉取长轮询机制
consumer和broker之间通过长轮询的方式交互,默认超时时间为15s,broker在没有消息的时候,每5s检查一次消息是否可达,同时一有消息到达立马通知挂起线程校验消息是否是自己感兴趣的消息,减轻broker端的压力
消息队列负载与重新分配
PullRequest
对象可以看成是一次拉取任务的抽象,那它是什么时候被创建的?
- 从主题订阅信息缓存表中获取主题的队列信息. 发送请求到broker获取consumer的客户端ID.
Broker中为什么会持有该消费组的所有消费者信息?因为消息者在启动的时候会向
MQClientINstance
中注册自己,然后MQClientINstance
会向所有的broker发送心跳包,所以broker持有这些信息 - 对消息队列排序,然后分配队列给消费者
- 将
PullRequest
对象添加到pullRequestQueue中,唤醒消费消息的线程
集群内多个消费者如何负载主题下的多个消费队列?如果有新的消费者加入,消费队列如何重新分配?
由于每次进行队列重新负载时,会从Broker实时查出当前消费组内的所有消费者,并且对消息队列和消费者进行排序,这样新加入的消费者就会在队列重新分布时分配到消息消费队列
那我又想到一个问题,如果一个消费队列之前属于消费者A,现在被分给了消费者B,这部分过程该怎么处理?
每次进行队列重新负载时,如果一个消费队列被分配给其他的消费者,会设置dropped属性的值为true,会阻止之前的消费者消费该队列的消息
消息消费过程
先区分两个概念:
- 消费者每次去Broker拉取数据时默认时拉取32条数据
- consumerMessageBatchSize: 消息批次,表示从broker拉取到数据后,每次提交给线程池的消息条数,即
MessageListener
中每次接收的消息条数,默认为1. 小于32条就分页,大于32条就直接放到ConsumerRequest
中
所谓的消息消费过程,就是指从broker拉取消息并保存到ProcessQueue
中后,怎么将这些信息提交给工作线程. 支持并发消费和顺序消费
并发消费
并发消费指线程池中的线程可以并发的对同一个消费队列进行消费
- 处理consumerMessageBatchSize,也就是一次消息消费任务
ConsumerRequest
中包含的消息条数,将消息按consumerMessageBatchSize
放到ConsumerRequest
中,然后将ConsumerRequest
中提交到消费者线程池 - 检查
ProcessQueue
的状态,主要是它的dropped
属性,如果该值为true,则停止该队列的消费,在消息队列重新负载时会用到 - 恢复重试消息主题名
- 执行具体的消息消费,调用应用程序的
MessageListener
相关方法 - 根据
MessageListener
返回的结果,计算ackIndex. 如果返回 CONSUMER_SUCCESS, ackIndex=msgs.size()-1 ; 如果返回 CONSUMER_LATER, ackIndex=-1 - 如果时集群模式,业务方返回CONSUMER_LATER,消息不会被重新消费;
- 业务方返回CONSUMER_SUCCESS时,无需ACK确认; 业务方返回CONSUMER_LATER时,该批消息都需要发ACK消息,如果发ACK失败,直接将该批消息再次封装成
ConsumerRequest
,然后延迟5s再重新消;如果ACK发送成功,则该消息会延迟消费 - 从
ProcessQueue
中移除这批次消息,然后更新消息消费进度,以便消费者重启后能从上一次消费进度开始消费,避免消费重复消费. 需要注意的时候就算返回CONSUMER_LATER,也会更新消费进度,这是因为当返回
CONSUMER_LATER时,RockerMQ会创建一条与原先消息属性相同的消息,拥有一个唯一的新msgId,并存储原消息ID,该消息会存入commitlog文件中与原消息没有任务关联
业务方返回CONSUMER_LATER时,需要ACK确认
- 重试队列个数默认为1,每个broker上一个重试队列
- 创建重试主题,重试主题名称
%RETRY+消费组名称
,并从重试队列中随机选择一个队列,构建TopicConfig主题配置信息 - 根据消息偏移量从commitlog获取消息,并将消息的主题存入属性中
- 设置消息重试次数,如果超过最大重试次数,则改变主题为
%DLQ%
, 即死信队列,该主题的权限为只写,说明消息一旦进入DLQ队列,将不能被消费,需要人工干预 - 根据原先的消息创建一个新的消息,并存入到commitlog文件中,该消息的主题名称为重试主题,其他属性与原先的消息保持相同
- 在存入commitlog文件之前,如果消息的延迟级别大于0,替换消息的主题为定时任务主题
SCHEDULE_TOPIC_XXX
,队列ID为延迟级别减1
顺序消费
ROCKETMQ可以保证局部消息顺序消费,即可以保证同一消费队列中的消息被顺序消费,如果要做到全局顺序消费可以将主题配置成一个队列
- 顺序消费与并发消费的一个关键区别是: 顺序消息在创建消息队列拉取任务时,需要在Broker服务器锁定该消费队列
- 顺序消费指消费者内线程池中的线程对消费队列只能串行消费
消费进度
集群模式: 消费进度存在Broker. Broker默认每10s持久化一次
广播模式: 保存在消费者客户端. MQClientINstance
中有一个定时任务,每5s持久化一次
延迟消息
- 只支持特定级别的延迟,因为如果要支持任意精度的延迟,需要做消息排序,消耗太大
- 每一个延迟级别对应一个消息消费队列
- 每一个延迟界别对应一个定时任务,该定时任务根据延迟时间进行延迟调度
- 延迟任务执行: 先从延迟队列中找到消息;然清除消息的延迟属性;然后恢复消息原先的主题与队列;然后创建一条新消息再次写入commitlog;然后消息被正常的消费
- RockerMQ将消息存入commitlog文件时,如果发现消息的延迟级别大于0,会首先将重试出题存入消息的属性中,然后设置主题名称为SCHEDULE_TOPIC,以便时间到后重新参与消息消费
消息过滤机制
- 通过tag过滤: 基于tag的hashcode,hashcode冲突时过滤得不精确
- 通过FilterServer过滤: 用户可以自定义过滤逻辑
主从同步
主从同步有两种模式
- 同步双写: producerd发送消息后,阻塞发送者线程,等待消息同步到slave成功
- 异步复制: slave每5s拉一次数据
数据同步过程
- master启动之后,在特定端口监听slave服务器的连接
- slave主动连接master,master接收客户端的连接,并建立相关TCP连接,这部分没有使用netty,使用Java原生NIO
- slave主动向master发送待拉取消息偏移量,master解析请求并返回消息给slave
- slave保存消息并继续发送新的消息同步请求
- 主从同步不具备主从切换功能,即当master宕机后,slave不会接管消息发送,但可以提供消息读取
master和slave都在运行过程中,消费者是从master拉取消息还是从slave拉取?
默认情况下消息消费者从master拉取,当master积压的消息超过了物理内存的40%,则建议从slave拉取. 但如果slaveReadEnable为false,表示slave不可读,slave也不会接管消息拉取
当消息消费者向slave拉取消息后,会一直从slave拉取
- 如果slave的slaveReadEnable设置为false,则下次拉取从master拉取
- 如果slave允许读取并且slave积压的消息未超过其物理内存的40%,下次拉取使用的Broker为订阅组的brokerId指定的Broker服务器,该值默认为0,代表master
- 如果slave允许读取并且slave积压的消息超过了其物理内存的40%,下次拉取使用的Broker为订阅组的whichBrokerWhenConsumeSlowly指定的Broker服务器,该值默认为1,代表slave
主从服务消息消费进是如何同步的
- 消息消费进度的同步时单向的,slave开启一个定时任务,定时从master同步消息消费进度
- 无论消息消费者是从master拉的消息还是从slave拉取的消息,在向Broker反馈消息消费进度时,优先向master汇报
- 消息消费者向master拉取消息时,如果消息消费者内存中存在消息消费进度时,master会尝试跟新消息消费进度
读写分离
- master负责读写,slave可以为读,也可以什么都不做
- RocketMQ有属于自己的一套读写分离逻辑,它会判断master的消息堆积量来决定消费者是否向master拉取消息消费
事务消息
- Producer通过同步的方式先向Broker发送一个Half消息
- 根据发送消息的结果(Broker返回的ACK结果),设置本地事务状态
如果发送失败,设置本地事务状态为 ROLLBACK_MESSAGE
如果发送成功,则执行本地事务,根据本地事务的执行结果,设置本地事务状态
1. 如果本地事务执行成功,设置本地事务状态为 commit
2. 如果本地事务执行失败,设置本地事务状态为 rollback
- Producer根据本地事务状态执行提交,即向Brocker再发一条确认消息
- 如果Broker收到确认消息
如果收到的结果是 commit 则Broker认为整个事务过程执行成功,将消息下发给Conusmer端消费,即通过定时任务的手段将消息从 RMQ_SYS_TRANS_HALF_TOPIC 主题 恢复到原来的主题
如果收到的结果是 rollback 则broker视为本地事务执行失败,broker删除Half消息,不下发给consumer
- 如果Broker没收到确认消息
broker定时回查本地事务的执行结果;这部分逻辑由用户实现,如果本地事务已经执行则返回commit;如果未执行,则返回rollback
broker接收到的如果是commit,则broker视为整个事务过程执行成功,将消息下发给Conusmer端消费;如果是rollback,则broker视为本地事务执行失败,broker删除Half消息,不下发给consumer。如果broker未接收到回查的结果(或者查到的是unknow),则broker会定时进行重复回查,以确保查到最终的事务结果