首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

RocketMQ学习-消息发布和订阅

前面一篇文章分析了broker的启动过程,浏览了broker的基本功能。接下来的几篇文章,准备按照十分钟入门RocketMQ一文中提到的一系列特性,依次进行学习。这篇文章准备分析RocketMQ作为MQ的最基本的功能:消息的发布(publish)和订阅(subscribe)。首先,我参考Spring Boot系列文章(六):SpringBoot RocketMQ 整合使用和监控这篇文章完成了一个简单的例子。

一、RocketMQ消息模型

在部署RocketMQ的时候,先启动name server,再启动broker,这时候broker会将自己注册到name server。应用程序中的producer启动的时候,首先连接一台name server,获取broker的地址列表;然后再和broker建立连接,接下来就可以发送消息了。其中:一个producer只与一个name server连接,一个producer会跟所有broker建立连接,每个连接都会有心跳检测机制。

producer会轮询向指定topic的mq集合发送消息。

consumer有两种消费模式:集群消费和广播消费。集群消费:多个consumer平均消费该topic下所有mq的消息,即某个消息在某个message queue中被一个consumer消费后,其他消费者就不会消费到它;广播消费:所有consumer可以消费到发到这个topic下的所有消息。

consumer有两种获取消息的模式:推模式和拉模式,在RocketMQ中,从技术实现角度看,推模式也是在拉模式上做了一层封装。

二、消息发送

生产者Demo

首先给出代码,

生产者中有两个属性:

name server的地址,用于获得broker的相关信息

生产者集合producerGroup,在同一个producer group中有不同的producer实例,如果最早一个producer奔溃,则broker会通知该组内的其他producer实例进行事务提交或回滚。

RocketMQ中的消息,使用Message表示,代码定义如下:

topic:该消息将要往哪个topic发

flag:可以用作消息过滤

properties:暂时没理解【TODO】

body:消息内容

每个消息发送完后,会得到一个SendResult对象,看下该对象的结构:

在这个demo中,我们是将消息内容和消息状态一并打印到控制台。

消息发送源码分析

在RocketMQ中的client模块的包结构如下,可以看出,作者并没有将接口的定义和实现放在一个包下(这在我们的业务应用中是常见的做法,不一定合理)。producer和consumer包下分别定义了生产者和消费者的接口,将具体的实现放在impl包中。

首先关注producer包里的内容,几个主要的类如下:DefaultMQProducer是生产者的默认实现、MQAdmin用于定义一些管理接口、MQProducer用于定义一些生产者特有的接口。

在ProducerDemo中,通过`defaultMQProducer.start();启动生产者,接下来看下start()方法的过程:

根据服务状态决定接下来的动作

对于CREATE_JUST状态

设置服务状态

检查配置

获取或创建MQClientInstance实例

将生产者注册到指定的producerGroup,即producerTable这个数据结构中,是一个map

填充topicPublishInfoTable数据结构

启动生产者

对于RUNNING、STARTFAILED和SHUTDOWNALREADY,抛出异常

顺着 往下跟,可以进一步了解生产者的细节,主要步骤有:

建立请求响应通道

启动各种定时任务,例如:每隔2分钟向name server拉取一次broker集群的地址,这意味着如果某个broker宕机了,生产者在这两分钟之内的消息是投递失败的;定期从name server拉取topic等路由信息;定期清理失效的broker以及向broker发送心跳消息等。

启动拉服务、负载均衡服务、推服务等服务,这三个服务跟消费者有关。这里设计上不太明了,将消费者和生产者的启动逻辑放在一起了。看pullMessageService和rebalanceService和初始化,它们是根据MQClientInstance初始化的,而MQClientInstance又是根据ClientConfig来配置的。

生产者启动后,接下来看下消息的发送过程,如下图所示,DefaultMQProducer提供了很多发送消息的方法,可以实现同步发消息、异步发消息、指定消息队列、OneWay消息、事务消息等。

这里我们只看最简单的 方法,最终在DefaultMQProducerImpl中实现:

发送消息的主要过程如下:

首先检查生产者和消息的合法性

然后获取消息发送的信息,该信息存放在TopicPublishInfo对象中:

选择要发送给该topic下的那个MessageQueue,选择的逻辑分两种情况:(1)默认情况,在上次投递的broker节点上,轮询到下一个message queue来发送;(2)sendLatencyFaultEnable这个值设置为true的时候,这块没太看懂。

投递消息

根据消息队列运行模式,针对投递结果做不同的处理。

二、消息消费

消费者Demo

消费者里有个属性需要看下:

consumerGroup:位于同一个consumerGroup中的consumer实例和producerGroup中的各个produer实例承担的角色类似;consumerGroup中的实例还可以实现负载均衡和容灾。PS:处于同一个consumerGroup里的consumer实例一定是订阅了同一个topic。

nameServer的地址:name server地址,用于获取broker、topic信息

消费者Demo里做了以下几个事情:

设置配置属性

设置订阅的topic,可以指定tag

设置第一次启动的时候,从message queue的哪里开始消费

设置消息处理器

启动消费者

消费者源码分析

前面分析过了,RocketMQ中的client模块统一提供了生产者和消费者客户端,这块我们看下消费者的几个主要的类。前面提到过,RocketMQ实际上都是拉模式,这里的DefaultMQPushConsumer实现了推模式,也只是对拉消息服务做了一层封装,即拉到消息的时候触发业务消费者注册到这里的callback,而具体拉消息的服务是由PullMessageService实现的,这个细节后续再研究。

在ConsumerDemo中,设置好配置信息后,会进行topic订阅,调用了DefaultMQPushConsumer的subscribe方法,源码如下:

第一个参数是topic信息,第二个参数用于用于消息过滤tag字段。真正的订阅发生在DefaultMQPushConsumerImpl中,代码如下:

在ConsumerDemo中,接下里会设置消费者首次启动时消费消息的起始位置,这涉及到DefaultMQPushConsumer中的一个属性——consumeFromWhere,这个值有三个可能的值

CONSUMEFROMLAST_OFFSET,默认值,表示从上次停止时的地方开始消费

CONSUMEFROMFIRST_OFFSET,从队列的头部开始消费

CONSUMEFROMTIMESTAMP,从指定的时间点开始消费

ConsumerDemo接下来会注册一个callback,当消息到达的时候就处理消息(最新的消息监听者支持并发消费):

最后,我们看下ConsumerDemo的启动过程,即DefaultMQPushConsumerImpl的start方法,主要做了下面几件事:

检查配置

将订阅信息拷贝到负载均衡组件(rebalanceImpl)中;

负载均衡组件的几个属性的设置

处理不同消息模式(集群模式或广播模式)的配置

处理顺序消费和并发消费的不同配置

将消费者信息和consumer group注册到MQ客户端实例的consumerTable中

启动消费者客户端

参考资料

分布式开放消息系统(RocketMQ)的原理与实践

买好车提供的rocketmq-spring-boot-starter

Spring Boot系列文章(六):SpringBoot RocketMQ 整合使用和监控

  • 发表于:
  • 原文链接http://kuaibao.qq.com/s/20180405G17BMY00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券