架构图里面包含了四个主要部分:
担任路由消息的提供者。生产者或消费者能够通过NameServer查找各Topic相应的Broker IP列表分别进行发送消息和消费消息。nameServer由多个无状态的节点构成,节点之间无任何信息同步。 broker会定期向NameServer以发送心跳包的方式,轮询向所有NameServer注册以下元数据信息:
也就是说,每个NameServer注册的信息都是一样的,而且是当前系统中的所有broker的元数据信息。
负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。
消息中转角色,负责存储消息、转发消息。在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。
负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。
NameServer 是专为 RocketMQ 设计的轻量级名称服务,为 producer 和 consumer 提供路由信息。具有简单、可集群横吐扩展、无状态,节点之间互不通信等特点。而 RocketMQ 的架构设计决定了只需要一个轻量级的元数据服务器就足够了,只需要保持最终一致,而不需要 Zookeeper 这样的强一致性解决方案,不需要再依赖另一个中间件,从而减少整体维护成本。
RocketMq采用「文件系统进行消息的存储」,相对于ActiveMq采用关系型数据库进行存储的方式就更直接,性能更高了。 RocketMq与Kafka在写消息与发送消息上,继续沿用了Kafka的这两个方面:「顺序写和零拷贝」
操作系统每次从磁盘读写数据的时候,都需要找到数据在磁盘上的地址,再进行读写。而如果是机械硬盘,寻址需要的时间往往会比较长,而一般来说,如果把数据存储在内存上面,少了寻址的过程,性能会好很多;但Kafka 的数据存储在磁盘上面,依然性能很好,这是为什么呢? 这是因为,Kafka采用的是顺序写,直接追加数据到末尾。实际上,磁盘顺序写的性能极高,在磁盘个数一定,转数一定的情况下,基本和内存速度一致,因此,磁盘的顺序写这一机制,极大地保证了Kafka本身的性能。
比如:读取文件,再用socket发送出去这一过程 传统方式实现: 先读取、再发送,实际会经过以下四次复制
传统方式,读取磁盘文件并进行网络发送,经过的四次数据copy是非常繁琐的。那么这里使用了零拷贝,直接由内核缓冲区Read Buffer将数据复制到网卡,省去第二步和第三步的复制。
那么采用零拷贝的方式发送消息,必定会大大减少读取的开销,使得RocketMq读取消息的性能有一个质的提升,零拷贝技术采用了MappedByteBuffer内存映射技术,采用这种技术有一些限制,其中有一条就是传输的文件不能超过2G,这也就是为什么RocketMq的存储消息的文件CommitLog的大小规定为1G。
RocketMq采用文件系统存储消息,并采用顺序写写入消息,使用零拷贝发送消息,极大得保证了RocketMq的性能。
消息生产者发送消息到broker,都是会按照顺序存储在CommitLog文件中,每个commitLog文件的大小为1G
RocketMQ丢消息的场景
在本地事务执行之前发送给RocketMQ,half消息的作用就是确认RocketMQ 服务的可用性。
最终,如果 Broker 收到二次 commit 确认,就可以把消息投递给消费者,反之如果是 rollback,消息会保存下来并且在 3 天后被删除。
在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘操作,快速写入。
在返回应用写成功状态前,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,给应用返回消息写成功的状态。
Dledger搭建的RocketMQ集群,会通过两阶段提交的方式保证文件在主从之间成功同步。 一个是uncommitted阶段,一个是commited阶段:
临时存储+定时任务。
Broker收到延时消息了,会先发送到主题(SCHEDULE_TOPIC_XXXX)的相应时间段的Message Queue中,然后通过一个定时任务轮询这些队列,到期后,把消息投递到目标Topic的队列中,然后消费者就可以正常消费这些消息。
不会,每条消息都会持久化到CommitLog中,每个Consumer连接到Broker后会维持消费进度信息,当有消息消费后只是当前Consumer的消费进度(CommitLog的offset)更新了。
commitlog文件存在一个过期时间,默认为72小时,即三天。除了用户手动清理外,在以下情况下也会被自动清理,无论文件中的消息是否被消费过:
死信队列用于处理无法被正常消费的消息,即死信消息。
当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中,该特殊队列称为死信队列。
RocketMQ没有真正意义的push,都是pull,虽然有push类,但实际底层实现采用的是长轮询机制,即拉取方式。
如果broker主动推送消息的话有可能push速度快,消费速度慢的情况,那么就会造成消息在consumer端堆积过多,同时又不能被其他consumer消费的情况。而pull的方式可以根据当前自身情况来pull,不会造成过多的压力而造成瓶颈。所以采取了pull的方式。
所谓的长轮询,就是Consumer 拉取消息,如果对应的 Queue 如果没有数据,Broker 不会立即返回,而是把 PullReuqest hold起来,等待 queue 有了消息后,或者长轮询阻塞时间到了,再重新处理该 queue 上的所有 PullRequest。
只要通过网络交换数据,就无法避免因为网络不可靠而造成的消息重复这个问题。比如说RocketMq中,当consumer消费完消息后,因为网络问题未及时发送ack到broker,broker就不会删掉当前已经消费过的消息,那么,该消息将会被重复投递给消费者去消费。
虽然 RocketMq 保证了同一个消费组只能消费一次,但会被不同的消费组重复消费,因此这种重复消费的情况不可避免。
RocketMq本身并不保证消息不重复,这样肯定会因为每次的判断,导致性能打折扣,所以它将去重操作直接放在了消费端:
NameServer 因为是无状态,且不相互通信的,所以只要集群部署就可以保证高可用。 RocketMQ 的高可用主要是在体现在Broker的读和写的高可用,Broker的高可用是通过「集群」和「主从」实现的。
Broker可以配置两种角色:Master和Slave,Master角色的Broker支持读和写,Slave角色的Broker只支持读,Master会向Slave同步消息。 也就是说Producer只能向Master角色的Broker写入消息,Cosumer可以从Master和Slave角色的Broker读取消息。 Consumer 的配置文件中,并不需要设置是从 Master 读还是从 Slave读,当 Master 不可用或者繁忙的时候, Consumer 的读请求会被自动切换到从 Slave。有了自动切换 Consumer 这种机制,当一个 Master 角色的机器出现故障后,Consumer 仍然可以从 Slave 读取消息,不影响 Consumer 读取消息,这就实现了读的高可用。
在创建 Topic 的时候,把 Topic 的多个Message Queue 创建在多个 Broker 组上(相同 Broker 名称,不同 brokerId机器组成 Broker 组),这样当 Broker 组的 Master 不可用后,其他组Master 仍然可用, Producer 仍然可以发送消息
RocketMQ 目前还不支持把Slave自动转成 Master ,如果机器资源不足,需要把 Slave 转成 Master ,则要手动停止 Slave 的 Broker ,更改配置文件,用新的配置文件启动 Broker。
在不开启容错的情况下,轮询队列进行发送,如果失败了,重试的时候过滤失败的 Broker。如果开启了容错策略,会通过 RocketMQ 的预测机制来预测一个 Broker 是否可用:
RocketMq采用
图片及部分相关技术知识点来源于网络搜索,侵权删!
参考资料:
https://blog.csdn.net/m0_37900506/article/details/118978969
https://blog.csdn.net/xueguchen/article/details/115607892
https://blog.csdn.net/adminpd/article/details/123977683
https://mp.weixin.qq.com/s/2TeUYAodDKG0gvuuVzAdPw