前面一篇文章分析了broker的启动过程,浏览了broker的基本功能。接下来的几篇文章,准备按照十分钟入门RocketMQ一文中提到的一系列特性,依次进行学习。这篇文章准备分析RocketMQ作为MQ的最基本的功能:消息的发布(publish)和订阅(subscribe)。首先,我参考Spring Boot系列文章(六):SpringBoot RocketMQ 整合使用和监控这篇文章完成了一个简单的例子。
一、RocketMQ消息模型
在部署RocketMQ的时候,先启动name server,再启动broker,这时候broker会将自己注册到name server。应用程序中的producer启动的时候,首先连接一台name server,获取broker的地址列表;然后再和broker建立连接,接下来就可以发送消息了。其中:一个producer只与一个name server连接,一个producer会跟所有broker建立连接,每个连接都会有心跳检测机制。
producer会轮询向指定topic的mq集合发送消息。
consumer有两种消费模式:集群消费和广播消费。集群消费:多个consumer平均消费该topic下所有mq的消息,即某个消息在某个message queue中被一个consumer消费后,其他消费者就不会消费到它;广播消费:所有consumer可以消费到发到这个topic下的所有消息。
consumer有两种获取消息的模式:推模式和拉模式,在RocketMQ中,从技术实现角度看,推模式也是在拉模式上做了一层封装。
二、消息发送
生产者Demo
首先给出代码,
生产者中有两个属性:
name server的地址,用于获得broker的相关信息
生产者集合producerGroup,在同一个producer group中有不同的producer实例,如果最早一个producer奔溃,则broker会通知该组内的其他producer实例进行事务提交或回滚。
RocketMQ中的消息,使用Message表示,代码定义如下:
topic:该消息将要往哪个topic发
flag:可以用作消息过滤
properties:暂时没理解【TODO】
body:消息内容
每个消息发送完后,会得到一个SendResult对象,看下该对象的结构:
在这个demo中,我们是将消息内容和消息状态一并打印到控制台。
消息发送源码分析
在RocketMQ中的client模块的包结构如下,可以看出,作者并没有将接口的定义和实现放在一个包下(这在我们的业务应用中是常见的做法,不一定合理)。producer和consumer包下分别定义了生产者和消费者的接口,将具体的实现放在impl包中。
首先关注producer包里的内容,几个主要的类如下:DefaultMQProducer是生产者的默认实现、MQAdmin用于定义一些管理接口、MQProducer用于定义一些生产者特有的接口。
在ProducerDemo中,通过`defaultMQProducer.start();启动生产者,接下来看下start()方法的过程:
根据服务状态决定接下来的动作
对于CREATE_JUST状态
设置服务状态
检查配置
获取或创建MQClientInstance实例
将生产者注册到指定的producerGroup,即producerTable这个数据结构中,是一个map
填充topicPublishInfoTable数据结构
启动生产者
对于RUNNING、STARTFAILED和SHUTDOWNALREADY,抛出异常
顺着 往下跟,可以进一步了解生产者的细节,主要步骤有:
建立请求响应通道
启动各种定时任务,例如:每隔2分钟向name server拉取一次broker集群的地址,这意味着如果某个broker宕机了,生产者在这两分钟之内的消息是投递失败的;定期从name server拉取topic等路由信息;定期清理失效的broker以及向broker发送心跳消息等。
启动拉服务、负载均衡服务、推服务等服务,这三个服务跟消费者有关。这里设计上不太明了,将消费者和生产者的启动逻辑放在一起了。看pullMessageService和rebalanceService和初始化,它们是根据MQClientInstance初始化的,而MQClientInstance又是根据ClientConfig来配置的。
生产者启动后,接下来看下消息的发送过程,如下图所示,DefaultMQProducer提供了很多发送消息的方法,可以实现同步发消息、异步发消息、指定消息队列、OneWay消息、事务消息等。
这里我们只看最简单的 方法,最终在DefaultMQProducerImpl中实现:
发送消息的主要过程如下:
首先检查生产者和消息的合法性
然后获取消息发送的信息,该信息存放在TopicPublishInfo对象中:
选择要发送给该topic下的那个MessageQueue,选择的逻辑分两种情况:(1)默认情况,在上次投递的broker节点上,轮询到下一个message queue来发送;(2)sendLatencyFaultEnable这个值设置为true的时候,这块没太看懂。
投递消息
根据消息队列运行模式,针对投递结果做不同的处理。
二、消息消费
消费者Demo
消费者里有个属性需要看下:
consumerGroup:位于同一个consumerGroup中的consumer实例和producerGroup中的各个produer实例承担的角色类似;consumerGroup中的实例还可以实现负载均衡和容灾。PS:处于同一个consumerGroup里的consumer实例一定是订阅了同一个topic。
nameServer的地址:name server地址,用于获取broker、topic信息
消费者Demo里做了以下几个事情:
设置配置属性
设置订阅的topic,可以指定tag
设置第一次启动的时候,从message queue的哪里开始消费
设置消息处理器
启动消费者
消费者源码分析
前面分析过了,RocketMQ中的client模块统一提供了生产者和消费者客户端,这块我们看下消费者的几个主要的类。前面提到过,RocketMQ实际上都是拉模式,这里的DefaultMQPushConsumer实现了推模式,也只是对拉消息服务做了一层封装,即拉到消息的时候触发业务消费者注册到这里的callback,而具体拉消息的服务是由PullMessageService实现的,这个细节后续再研究。
在ConsumerDemo中,设置好配置信息后,会进行topic订阅,调用了DefaultMQPushConsumer的subscribe方法,源码如下:
第一个参数是topic信息,第二个参数用于用于消息过滤tag字段。真正的订阅发生在DefaultMQPushConsumerImpl中,代码如下:
在ConsumerDemo中,接下里会设置消费者首次启动时消费消息的起始位置,这涉及到DefaultMQPushConsumer中的一个属性——consumeFromWhere,这个值有三个可能的值
CONSUMEFROMLAST_OFFSET,默认值,表示从上次停止时的地方开始消费
CONSUMEFROMFIRST_OFFSET,从队列的头部开始消费
CONSUMEFROMTIMESTAMP,从指定的时间点开始消费
ConsumerDemo接下来会注册一个callback,当消息到达的时候就处理消息(最新的消息监听者支持并发消费):
最后,我们看下ConsumerDemo的启动过程,即DefaultMQPushConsumerImpl的start方法,主要做了下面几件事:
检查配置
将订阅信息拷贝到负载均衡组件(rebalanceImpl)中;
负载均衡组件的几个属性的设置
处理不同消息模式(集群模式或广播模式)的配置
处理顺序消费和并发消费的不同配置
将消费者信息和consumer group注册到MQ客户端实例的consumerTable中
启动消费者客户端
参考资料
分布式开放消息系统(RocketMQ)的原理与实践
买好车提供的rocketmq-spring-boot-starter
Spring Boot系列文章(六):SpringBoot RocketMQ 整合使用和监控
领取专属 10元无门槛券
私享最新 技术干货