前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RocketMQ 源码分析 —— 集成 Spring Boot

RocketMQ 源码分析 —— 集成 Spring Boot

作者头像
芋道源码
发布2020-05-19 15:24:06
1.6K0
发布2020-05-19 15:24:06
举报
文章被收录于专栏:芋道源码1024芋道源码1024

摘要: 原创出处 http://www.iocoder.cn/RocketMQ/spring-boot-integration/ 「芋道源码」欢迎转载,保留摘要,谢谢!

  • 1. 概述
  • 2. 调试环境搭建
  • 3. 项目结构一览
  • 5. annotation
  • 6. autoconfigure
  • 7. config
  • 8. support
  • 9. core
  • 666. 彩蛋

1. 概述

在开始分享 https://github.com/apache/rocketmq-spring 项目(RocketMQ 集成到 Spring Boot 中),我们先恶趣味的看一段历史:

  • 2014-08 Spring Boot 1 正式发布。
  • 2018-03 Spring Boot 2 正式发布。
  • 2018-12 RocketMQ 团队发布 RocketMQ 集成到 Spring Boot 的解决方案,并且提供了中文文档。

在阅读本文之前,希望胖友能够先熟读 中文文档 。最好呢,当然不强制,可以操练下每个 Demo 。

2. 调试环境搭建

在读源码之前,我们当然是先把调试环境搭建起来。

2.1 依赖工具

  • JDK :1.8+
  • Maven
  • IntelliJ IDEA

2. 源码拉取

从官方仓库 https://github.com/apache/rocketmq-spring Fork 出属于自己的仓库。为什么要 Fork ?既然开始阅读、调试源码,我们可能会写一些注释,有了自己的仓库,可以进行自由的提交。?

使用 IntelliJ IDEA 从 Fork 出来的仓库拉取代码。拉取完成后,Maven 会下载依赖包,可能会花费一些时间,耐心等待下。


在等待的过程中,我来简单说下,搭建调试环境的过程:

  1. 启动 RocketMQ Namesrv
  2. 启动 RocketMQ Broker
  3. 启动 RocketMQ Spring Boot Producer
  4. 启动 RocketMQ Spring Boot Consumer

最小化的 RocketMQ 的环境,暂时不考虑 Namesrv 集群、Broker 集群、Consumer 集群。

? 另外,本文使用的 rocketmq-spring 版本是 2.0.2-SNAPSHOT

2.3 启动 RocketMQ Namesrv

方式一,可以参考 《RocketMQ 源码解析 —— 调试环境搭建》 的 「3. 启动 RocketMQ Namesrv」 的方式,进行启动 RocketMQ Namesrv 。

方式一,可以方便调试 RocketMQ Namesrv 的代码。

方式二,也可以按照 《Apache RocketMQ —— Quick Start》 的 「Start Name Server」 的方式,进行启动 RocketMQ Namesrv 。

方式二,比较方便。

2.4 启动 RocketMQ Broker

方式一,可以参考 《RocketMQ 源码解析 —— 调试环境搭建》 的 「4. 启动 RocketMQ Broker」 的方式,进行启动 RocketMQ Broker 。

  • 需要注意的是,要删除 org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListenerorg.apache.rocketmq.broker.transaction.TransactionalMessageService 两个 SPI 配置文件,否则事务功能,无法正常使用。

方式二,也可以按照 《Apache RocketMQ —— Quick Start》 的 「Start Broker」 的方式,进行启动 RocketMQ Broker 。

2.5 启动 RocketMQ Spring Boot Producer

第一步,打开根目录的 pom.xml 文件,将 rocketmq-spring-boot-samples 示例项目的注释去掉。如下:

代码语言:javascript
复制
<!-- pom -->

<modules>
    <module>rocketmq-spring-boot-parent</module>
    <module>rocketmq-spring-boot</module>
    <module>rocketmq-spring-boot-starter</module>
    <!-- Note: The samples need to mvn compiple in its own directory
            <module>rocketmq-spring-boot-samples</module>
    -->
    <module>rocketmq-spring-boot-samples</module>
</modules>

此时,Maven 又会下载依赖包,可能会花费一些时间,耐心等待下。

第二步,右键运行 rocketmq-produce-demo 的 ProducerApplication 的 #main(String[] args) 方法,Producer 就启动完成了。输出日志如下图:

此时,可能会报 Intellij IDEA 报错:Error : java 不支持发行版本5 。可以参考 《Intellij idea 报错:Error : java 不支持发行版本5》 文章,进行解决。

2.6 启动 RocketMQ Spring Boot Consumer

右键运行 rocketmq-consumer-demo 的 ConsumerApplication 的 #main(String[] args) 方法,Consumer 就启动完成了。输出日志如下图:

? 后面,我们就可以愉快的各种调试玩耍了~

3. 项目结构一览

本文主要分享 rocketmq-spring项目结构。 希望通过本文能让胖友对 rocketmq-spring 的整体项目有个简单的了解。

项目结构一览

3.1 代码统计

这里先分享一个小技巧。笔者在开始源码学习时,会首先了解项目的代码量。

第一种方式,使用 IDEA Statistic 插件,统计整体代码量。

Statistic 统计代码量

我们可以粗略的看到,总的代码量在 1700 行。这其中还包括单元测试,示例等等代码。 所以,不慌,一点不慌~

第二种方式,使用 Shell 脚本命令逐个 Maven 模块统计 。

一般情况下,笔者使用 find . -name "*.java"|xargs cat|grep -v -e ^

当然,考虑到准确性,胖友需要手动 cd 到每个 Maven 项目的 src/main/java 目录下,以达到排除单元测试的代码量。

Shell 脚本统计代码量

统计完后,是不是更加不慌了。哈哈哈哈。

3.2 rocketmq-spring-boot-parent 模块

rocketmq-spring-boot-parent 模块,无具体代码,作为其它项目的 Maven Parent 项目,例如定义了依赖版本号。

3.3 rocketmq-spring-boot-starter 模块

rocketmq-spring-boot-starter 模块,无具体代码,作为 Spring Boot RocketMQ Starter 模块。其 pom.xml 的代码如下:

代码语言:javascript
复制
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-parent</artifactId>
        <version>2.0.2-SNAPSHOT</version>
        <relativePath>../rocketmq-spring-boot-parent/pom.xml</relativePath>
    </parent>

    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <packaging>jar</packaging>

    <name>RocketMQ Spring Boot Starter</name>
    <description>SRocketMQ Spring Boot Starter</description>
    <url>https://github.com/apache/rocketmq-spring</url>

    <dependencies>
        <!-- Spring Boot RocketMQ 具体实现 -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot</artifactId>
        </dependency>
        <!-- Spring Boot Starter -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <!-- 提供 Validation 功能 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-validation</artifactId>
        </dependency>
    </dependencies>
    
</project>

3.4 rocketmq-spring-boot 模块

rocketmq-spring-boot 模块,1979 行代码,提供了 Spring Boot RocketMQ 的具体实现。其每个 package 包的功能,分别如下:

  • annotation :注解和注解相关的枚举。
  • autoconfigure :自动配置。
  • config :配置类。 有点难解释。等后面直接撸源码。
  • core :核心实现。
  • support :提供支持,例如说工具类。 有点难解释。等后面直接撸源码。

3.5 rocketmq-spring-boot-samples 模块

rocketmq-spring-boot-samples 模块,435 行代码,提供示例。* rocketmq-consume-demo 模块,提供消费者示例。* rocketmq-produce-demo 模块,提供生产者示例。

艿艿:后面的小节,我们开始看具体的源码。

5. annotation

5.1 @RocketMQMessageListener

org.apache.rocketmq.spring.annotation.@RocketMQMessageListener 注解,声明指定 Bean 是 RocketMQ 消费者的 MessageListener 。代码如下:

代码语言:javascript
复制
// RocketMQMessageListener.java

@Target(ElementType.TYPE) // 表名,注解在类上
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RocketMQMessageListener {

    /**
     * 消费分组
     *
     * Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve
     * load balance. It's required and needs to be globally unique.
     *
     *
     * See <a href="http://rocketmq.apache.org/docs/core-concept/">here</a> for further discussion.
     */
    String consumerGroup();

    /**
     * 消费主体
     *
     * Topic name.
     */
    String topic();

    /**
     * 选择消息的方式
     *
     * Control how to selector message.
     *
     * @see SelectorType
     */
    SelectorType selectorType() default SelectorType.TAG;

    /**
     * 选择消息的表达式
     *
     * Control which message can be select. Grammar please see {@link SelectorType#TAG} and {@link SelectorType#SQL92}
     */
    String selectorExpression() default "*";

    /**
     * 消费模式
     *
     * Control consume mode, you can choice receive message concurrently or orderly.
     */
    ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;

    /**
     * 消费模型
     *
     * Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice.
     */
    MessageModel messageModel() default MessageModel.CLUSTERING;

    /**
     * 消费线程数
     *
     * Max consumer thread number.
     */
    int consumeThreadMax() default 64;

}
  • 具体使用,见示例 OrderPaidEventConsumer 。
  • selectorType 属性,org.apache.rocketmq.spring.annotation.SelectorType 枚举,选择消息的方式。代码如下: // SelectorType.java public enum SelectorType { /** * @see ExpressionType#TAG * * 标签 */ TAG, /** * @see ExpressionType#SQL92 * * SQL */ SQL92 }
  • consumeMode 属性,org.apache.rocketmq.spring.annotation.ConsumeMode ,消费模式。代码如下: // ConsumeMode.java public enum ConsumeMode { /** * Receive asynchronously delivered messages concurrently * * 并发消费 */ CONCURRENTLY, /** * Receive asynchronously delivered messages orderly. one queue, one thread * * 顺序消费 */ ORDERLY }
  • messageModel 属性,org.apache.rocketmq.spring.annotation.MessageModel ,消费模型。代码如下: // MessageModel.java public enum MessageModel { /** * 广播消费 */ BROADCASTING("BROADCASTING"), /** * 集群消费 */ CLUSTERING("CLUSTERING"); private final String modeCN; MessageModel(String modeCN) { this.modeCN = modeCN; } public String getModeCN() { return this.modeCN; } }

5.2 @RocketMQTransactionListener

org.apache.rocketmq.spring.annotatio.@RocketMQTransactionListener 注解,声明指定 Bean 是 RocketMQ 生产者的 RocketMQLocalTransactionListener 。代码如下:

代码语言:javascript
复制
// RocketMQTransactionListener.java

@Target({ElementType.TYPE, ElementType.ANNOTATION_TYPE})  // 表名,注解在类上
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component // 默认带了 @Component 注解,所以只要添加到了类上,就会注册成 Spring Bean 对象
public @interface RocketMQTransactionListener {

    /**
     * 生产者分组
     *
     * Declare the txProducerGroup that is used to relate callback event to the listener, rocketMQTemplate must send a
     * transactional message with the declared txProducerGroup.
     * <p>
     * <p>It is suggested to use the default txProducerGroup if your system only needs to define a TransactionListener class.
     */
    String txProducerGroup() default RocketMQConfigUtils.ROCKETMQ_TRANSACTION_DEFAULT_GLOBAL_NAME;

    /**
     * Set ExecutorService params -- corePoolSize
     */
    int corePoolSize() default 1;

    /**
     * Set ExecutorService params -- maximumPoolSize
     */
    int maximumPoolSize() default 1;

    /**
     * Set ExecutorService params -- keepAliveTime
     */
    long keepAliveTime() default 1000 * 60; //60ms

    /**
     * Set ExecutorService params -- blockingQueueSize
     */
    int blockingQueueSize() default 2000;

}

// RocketMQConfigUtils.java

public static final String ROCKETMQ_TRANSACTION_DEFAULT_GLOBAL_NAME = "rocketmq_transaction_default_global_name";

6. autoconfigure

6.1 RocketMQProperties

org.apache.rocketmq.spring.autoconfigure.RocketMQProperties ,RocketMQ 客户端的 Properties 对象。代码如下:

代码语言:javascript
复制
// RocketMQProperties.java

@ConfigurationProperties(prefix = "rocketmq") // 配置文件中 rocketmq 前缀
public class RocketMQProperties {

    /**
     * The name server for rocketMQ, formats: `host:port;host:port`.
     *
     * Namesrv 地址
     */
    private String nameServer;

    /**
     * Producer 配置
     */
    private Producer producer;

    // ... 省略 setting/getting 方法

    public static class Producer {

        /**
         * Name of producer.
         */
        private String group;

        /**
         * Millis of send message timeout.
         */
        private int sendMessageTimeout = 3000;

        /**
         * Compress message body threshold, namely, message body larger than 4k will be compressed on default.
         */
        private int compressMessageBodyThreshold = 1024 * 4;

        /**
         * Maximum number of retry to perform internally before claiming sending failure in synchronous mode.
         * This may potentially cause message duplication which is up to application developers to resolve.
         */
        private int retryTimesWhenSendFailed = 2;

        /**
         * <p> Maximum number of retry to perform internally before claiming sending failure in asynchronous mode. </p>
         * This may potentially cause message duplication which is up to application developers to resolve.
         */
        private int retryTimesWhenSendAsyncFailed = 2;

        /**
         * Indicate whether to retry another broker on sending failure internally.
         */
        private boolean retryNextServer = false;

        /**
         * Maximum allowed message size in bytes.
         */
        private int maxMessageSize = 1024 * 1024 * 4;

        // ... 省略 setting/getting 方法
    }

}

6.2 RocketMQAutoConfiguration

org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration ,RocketMQ 自动配置类。代码如下:

代码语言:javascript
复制
// RocketMQAutoConfiguration.java

@Configuration // 标识是配置类
@EnableConfigurationProperties(RocketMQProperties.class) // 指定 RocketMQProperties 自动配置
@ConditionalOnClass({ MQAdmin.class, ObjectMapper.class }) // 要求有 MQAdmin、ObjectMapper 类
@ConditionalOnProperty(prefix = "rocketmq", value = "name-server") // 要求有 rocketmq 开头,且 name-server 的配置
@Import({ JacksonFallbackConfiguration.class, ListenerContainerConfiguration.class }) // 引入 JacksonFallbackConfiguration 和 ListenerContainerConfiguration 配置类
@AutoConfigureAfter(JacksonAutoConfiguration.class) // 在 JacksonAutoConfiguration 之后初始化
public class RocketMQAutoConfiguration {

    // ... 省略配置方法
    
}

6.2.1 defaultMQProducer

#defaultMQProducer() 方法,创建 DefaultMQProducer Bean 对象。代码如下:

代码语言:javascript
复制
// RocketMQAutoConfiguration.java

@Bean
@ConditionalOnMissingBean(DefaultMQProducer.class) // 不存在 DefaultMQProducer Bean 对象
@ConditionalOnProperty(prefix = "rocketmq", value = {"name-server", "producer.group"}) // 要求有 rocketmq 开头,且 name-server、producer.group 的配置
public DefaultMQProducer defaultMQProducer(RocketMQProperties rocketMQProperties) {
    // 校验配置
    RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();
    String nameServer = rocketMQProperties.getNameServer();
    String groupName = producerConfig.getGroup();
    Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
    Assert.hasText(groupName, "[rocketmq.producer.group] must not be null");

    // 创建 DefaultMQProducer 对象
    DefaultMQProducer producer = new DefaultMQProducer(groupName);
    // 将 RocketMQProperties.Producer 配置,设置到 producer 中
    producer.setNamesrvAddr(nameServer);
    producer.setSendMsgTimeout(producerConfig.getSendMessageTimeout());
    producer.setRetryTimesWhenSendFailed(producerConfig.getRetryTimesWhenSendFailed());
    producer.setRetryTimesWhenSendAsyncFailed(producerConfig.getRetryTimesWhenSendAsyncFailed());
    producer.setMaxMessageSize(producerConfig.getMaxMessageSize());
    producer.setCompressMsgBodyOverHowmuch(producerConfig.getCompressMessageBodyThreshold());
    producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryNextServer());

    return producer;
}

6.2.2 rocketMQTemplate

#rocketMQTemplate(DefaultMQProducer mqProducer, ObjectMapper rocketMQMessageObjectMapper) 方法,创建 RocketMQTemplate Bean 对象。代码如下:

代码语言:javascript
复制
// RocketMQAutoConfiguration.java

@Bean(destroyMethod = "destroy") // 声明了销毁时,调用 destroy 方法
@ConditionalOnBean(DefaultMQProducer.class) // 有 DefaultMQProducer Bean 的情况下
@ConditionalOnMissingBean(RocketMQTemplate.class) // 不存在 RocketMQTemplate Bean 对象
public RocketMQTemplate rocketMQTemplate(DefaultMQProducer mqProducer, ObjectMapper rocketMQMessageObjectMapper) {
    // 创建 RocketMQTemplate 对象
    RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
    // 设置其属性
    rocketMQTemplate.setProducer(mqProducer);
    rocketMQTemplate.setObjectMapper(rocketMQMessageObjectMapper);
    return rocketMQTemplate;
}
  • 关于 RocketMQTemplate 类,详细解析,见 「9.4 RocketMQTemplate」 中。

6.2.3 transactionHandlerRegistry

#transactionHandlerRegistry(RocketMQTemplate template) 方法,创建 TransactionHandlerRegistry Bean 对象。代码如下:

代码语言:javascript
复制
// RocketMQAutoConfiguration.java

@Bean
@ConditionalOnBean(RocketMQTemplate.class) // 有 RocketMQTemplate Bean 的情况下
@ConditionalOnMissingBean(TransactionHandlerRegistry.class) // 不存在 TransactionHandlerRegistry Bean 对象
public TransactionHandlerRegistry transactionHandlerRegistry(RocketMQTemplate template) {
    // 创建 TransactionHandlerRegistry 对象
    return new TransactionHandlerRegistry(template);
}
  • 详细解析,见 「7.2 TransactionHandlerRegistry」 中。

6.2.4 transactionAnnotationProcessor

#transactionAnnotationProcessor(TransactionHandlerRegistry transactionHandlerRegistry) 方法,创建 RocketMQTransactionAnnotationProcessor Bean 对象。代码如下:

代码语言:javascript
复制
// RocketMQAutoConfiguration.java

@Bean(name = RocketMQConfigUtils.ROCKETMQ_TRANSACTION_ANNOTATION_PROCESSOR_BEAN_NAME) // Bean 的名字
@ConditionalOnBean(TransactionHandlerRegistry.class) // 有 TransactionHandlerRegistry Bean 的情况下
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public static RocketMQTransactionAnnotationProcessor transactionAnnotationProcessor(TransactionHandlerRegistry transactionHandlerRegistry) {
    // 创建 RocketMQTransactionAnnotationProcessor 对象
    return new RocketMQTransactionAnnotationProcessor(transactionHandlerRegistry);
}

// RocketMQConfigUtils.java

/**
 * The bean name of the internally managed RocketMQ transaction annotation processor.
 */
public static final String ROCKETMQ_TRANSACTION_ANNOTATION_PROCESSOR_BEAN_NAME = "org.springframework.rocketmq.spring.starter.internalRocketMQTransAnnotationProcessor";
  • 详细解析,见 「7.3 RocketMQTransactionAnnotationProcessor」 中。

6.3 JacksonFallbackConfiguration

org.apache.rocketmq.spring.autoconfigure.JacksonFallbackConfiguration ,创建 ObjectMapper Bean 对象的配置类。代码如下:

代码语言:javascript
复制
// JacksonFallbackConfiguration.java

import com.fasterxml.jackson.databind.ObjectMapper;

@Configuration
@ConditionalOnMissingBean(ObjectMapper.class) // 不存在 ObjectMapper Bean 时
class JacksonFallbackConfiguration {

    @Bean
    public ObjectMapper rocketMQMessageObjectMapper() {
        return new ObjectMapper();
    }

}
  • com.fasterxml.jackson.databind.ObjectMapper ,是 Jackson 提供的 JSON 序列化工具类。
    • 生产者发送消息时,将消息使用 Jackson 进行序列化。
    • 消费者拉取消息时,将消息使用 Jackson 进行反序列化。

6.4 ListenerContainerConfiguration

org.apache.rocketmq.spring.autoconfigure.ListenerContainerConfiguration ,实现 ApplicationContextAware、SmartInitializingSingleton 接口,给每个带有注解的 @RocketMQMessageListener Bean 对象,生成对应的 DefaultRocketMQListenerContainer Bean 对象。

DefaultRocketMQListenerContainer 类,正如其名,是 DefaultRocketMQListener(RocketMQ 消费者的监听器)容器,负责创建 DefaultRocketMQListener 对象,并启动其对应的 DefaultMQPushConsumer(消费者),从而消费消息。

6.4.1 构造方法

代码语言:javascript
复制
// ListenerContainerConfiguration.java

@Configuration
public class ListenerContainerConfiguration implements ApplicationContextAware, SmartInitializingSingleton {

    private static final Logger log = LoggerFactory.getLogger(ListenerContainerConfiguration.class);

    private ConfigurableApplicationContext applicationContext;

    /**
     * 计数器,用于在 {@link #registerContainer(String, Object)} 方法中,创建 DefaultRocketMQListenerContainer Bean 时,生成 Bean 的名字。
     */
    private AtomicLong counter = new AtomicLong(0);

    private StandardEnvironment environment;

    private RocketMQProperties rocketMQProperties;

    private ObjectMapper objectMapper;

    public ListenerContainerConfiguration(ObjectMapper rocketMQMessageObjectMapper, StandardEnvironment environment, RocketMQProperties rocketMQProperties) {
        this.objectMapper = rocketMQMessageObjectMapper;
        this.environment = environment;
        this.rocketMQProperties = rocketMQProperties;
    }

    @Override // 实现自 ApplicationContextAware 接口
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = (ConfigurableApplicationContext) applicationContext;
    }
    
}
  • 严格来说,ListenerContainerConfiguration 并不能说是一个 Configuration 类。这么写的原因,猜测是为了提供给 RocketMQAutoConfiguration 类,进行引入。
  • 当然,如果我们将 @Configuration 注解,修改成 @Component 注解,也是能良好的运行。并且 @Configuration 注解,本身自带 @Component 注解。

6.4.2 afterSingletonsInstantiated

#afterSingletonsInstantiated() 方法,给每个带有注解的 @RocketMQMessageListener Bean 对象,生成对应的 DefaultRocketMQListenerContainer Bean 对象。代码如下:

代码语言:javascript
复制
// ListenerContainerConfiguration.java

@Override // 实现自 SmartInitializingSingleton 接口
public void afterSingletonsInstantiated() {
    // <1> 获得所有 @RocketMQMessageListener 注解的 Bean 们
    Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);
    // 遍历 beans 数组,生成(注册)对应的 DefaultRocketMQListenerContainer Bean 对象。
    if (Objects.nonNull(beans)) {
        beans.forEach(this::registerContainer);
    }
}
  • <1> 处,获得所有 @RocketMQMessageListener 注解的 Bean 们。
  • <2> 处,遍历 beans 数组,调用 #registerContainer(String beanName, Object bean) 方法,生成(注册)对应的 DefaultRocketMQListenerContainer Bean 对象。详细解析,见 「6.4.3 registerContainer」 中。

6.4.3 registerContainer

#registerContainer(String beanName, Object bean) 方法,生成(注册)对应的 DefaultRocketMQListenerContainer Bean 对象。代码如下:

代码语言:javascript
复制
// ListenerContainerConfiguration.java

private void registerContainer(String beanName, Object bean) {
    // <1.1> 获得 Bean 对应的 Class 类名。因为有可能被 AOP 代理过。
    Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
    // <1.2> 如果未实现 RocketMQListener 接口,直接抛出 IllegalStateException 异常。
    if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) {
        throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName());
    }
    // <1.3> 获得 @RocketMQMessageListener 注解
    RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
    // <1.4> 校验注解配置
    validate(annotation);

    // <2.1> 生成 DefaultRocketMQListenerContainer Bean 的名字
    String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(), counter.incrementAndGet());
    GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
    // <2.2> 创建 DefaultRocketMQListenerContainer Bean 对象,并注册到 Spring 容器中。
    genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,
            () -> createRocketMQListenerContainer(bean, annotation));

    // <3.1> 从 Spring 容器中,获得刚注册的 DefaultRocketMQListenerContainer Bean 对象
    DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName, DefaultRocketMQListenerContainer.class);
    // <3.2> 如果未启动,则进行启动
    if (!container.isRunning()) {
        try {
            container.start();
        } catch (Exception e) {
            log.error("Started container failed. {}", container, e);
            throw new RuntimeException(e);
        }
    }

    // 打印日志
    log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
}
  • <1.1> 处,获得 Bean 对应的 Class 类名。因为有可能被 AOP 代理过。
  • <1.2> 处,如果未实现 RocketMQListener 接口,直接抛出 IllegalStateException 异常。
  • <1.3> 处,获得 @RocketMQMessageListener 注解。
  • <1.4> 处,调用 #validate(RocketMQMessageListener annotation) 方法,校验注解配置。代码如下: // ListenerContainerConfiguration.java private void validate(RocketMQMessageListener annotation) { // 禁止顺序消费 + 广播消费 if (annotation.consumeMode() == ConsumeMode.ORDERLY && annotation.messageModel() == MessageModel.BROADCASTING) { throw new BeanDefinitionValidationException("Bad annotation definition in @RocketMQMessageListener, messageModel BROADCASTING does not support ORDERLY message!"); } }
  • <2.1> 处,生成 DefaultRocketMQListenerContainer Bean 的名字。此处,就可以看到 counter 计数器。
  • <2.2> 处,调用 #createRocketMQListenerContainer(Object bean, RocketMQMessageListener annotation) 方法,创建 DefaultRocketMQListenerContainer Bean 对象,然后注册到 Spring 容器中。代码如下: // ListenerContainerConfiguration.java private DefaultRocketMQListenerContainer createRocketMQListenerContainer(Object bean, RocketMQMessageListener annotation) { // 创建 DefaultRocketMQListenerContainer 对象 DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer(); // 设置其属性 container.setNameServer(rocketMQProperties.getNameServer()); container.setTopic(environment.resolvePlaceholders(annotation.topic())); container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup())); container.setRocketMQMessageListener(annotation); container.setRocketMQListener((RocketMQListener) bean); container.setObjectMapper(objectMapper); return container; }
  • <3.1> 处,从 Spring 容器中,获得刚注册的 DefaultRocketMQListenerContainer Bean 对象。? 具体为什么这里要重新 get 一次,艿艿暂时不是很明白。不过呢,先暂时不纠结~继续往下看。
  • <3.2> 处,调用 DefaultRocketMQListenerContainer#isRunning() 方法,判断到未启动,则调用 DefaultRocketMQListenerContainer#start() 方法,则进行启动。
  • ? 详细的,后续我们结合 「8.5 DefaultRocketMQListenerContainer」 一起瞅瞅。

7. config

7.1 TransactionHandler

org.apache.rocketmq.spring.config.TransactionHandler ,解析 @RocketMQTransactionListener 注解后,封装的对象。代码如下:

代码语言:javascript
复制
// TransactionHandler.java

class TransactionHandler {

    /**
     * {@link RocketMQTransactionListener#txProducerGroup()}
     */
    private String name;
    /**
     * {@link RocketMQTransactionListener#corePoolSize()}
     * {@link RocketMQTransactionListener#maximumPoolSize()}
     * {@link RocketMQTransactionListener#maximumPoolSize()}
     * {@link RocketMQTransactionListener#keepAliveTime()}
     * {@link RocketMQTransactionListener#blockingQueueSize()}
     */
    private ThreadPoolExecutor checkExecutor;

    /**
     * Bean 的名字
     */
    private String beanName;
    /**
     * 对应的 RocketMQLocalTransactionListener 对象
     */
    private RocketMQLocalTransactionListener bean;
    private BeanFactory beanFactory;
    
    public void setCheckExecutor(int corePoolSize, int maxPoolSize, long keepAliveTime, int blockingQueueSize) {
        // 创建 ThreadPoolExecutor 对象
        this.checkExecutor = new ThreadPoolExecutor(corePoolSize, maxPoolSize,
            keepAliveTime, TimeUnit.MILLISECONDS,
            new LinkedBlockingDeque<>(blockingQueueSize));
    }

    // ... 省略 setting/getting 方法
}

7.2 TransactionHandlerRegistry

TransactionHandlerRegistry 对应的 Bean 对象,在 「6.2.3 transactionHandlerRegistry」 中被创建。

org.apache.rocketmq.spring.config.TransactionHandlerRegistry ,实现 DisposableBean 接口,是 TransactionHandler 的注册表。代码如下:

代码语言:javascript
复制
// TransactionHandlerRegistry.java

public class TransactionHandlerRegistry implements DisposableBean {


    private RocketMQTemplate rocketMQTemplate;
    /**
     * {@link TransactionHandler#name} 的 集合
     */
    private final Set<String> listenerContainers = new ConcurrentSet<>();

    public TransactionHandlerRegistry(RocketMQTemplate template) {
        this.rocketMQTemplate = template;
    }

    @Override
    public void destroy() throws Exception {
        listenerContainers.clear();
    }

    public void registerTransactionHandler(TransactionHandler handler) throws MQClientException {
        // <1.1> 不能声明重复的 TransactionHandler
        if (listenerContainers.contains(handler.getName())) {
            throw new MQClientException(-1,
                String.format("The transaction name [%s] has been defined in TransactionListener [%s]", handler.getName(), handler.getBeanName()));
        }
        // <1.2> 添加到 listenerContainers 中
        listenerContainers.add(handler.getName());

        // <2> 创建并启动 TransactionMQProducer
        rocketMQTemplate.createAndStartTransactionMQProducer(handler.getName(), handler.getListener(), handler.getCheckExecutor());
    }

}
  • 重心在于 #registerTransactionHandler(TransactionHandler handler) 方法,注册 TransactionHandler 对象。而正如我们在 「7.1 TransactionHandler」 所说,TransactionHandler 目前仅来自 @RocketMQTransactionListener 注解。
  • <1.1> 处,通过 listenerContainers 里来判断是否存在, 不能声明有重复名字的 TransactionHandler 。
  • <1.2> 处,添加到 listenerContainers 中。
  • <2> 处,调用 RocketMQTemplate#createAndStartTransactionMQProducer(String txProducerGroup, RocketMQLocalTransactionListener transactionListener, ExecutorService executorService) 方法,创建并启动 TransactionMQProducer 。后续的逻辑,关于 RocketMQTemplate 类,详细解析,见 「9.4 RocketMQTemplate」 中。

7.3 RocketMQTransactionAnnotationProcessor

TransactionHandlerRegistry 对应的 Bean 对象,在 「6.2.3 transactionHandlerRegistry」 中被创建。

org.apache.rocketmq.spring.config.RocketMQTransactionAnnotationProcessor ,实现 BeanPostProcessor、BeanFactoryAware、Ordered 接口,扫描 @RocketMQTransactionListener 注解的 Bean 对象,注册到 TransactionHandlerRegistry 中。

7.3.1 构造方法

代码语言:javascript
复制
// RocketMQTransactionAnnotationProcessor.java

public class RocketMQTransactionAnnotationProcessor implements BeanPostProcessor, Ordered, BeanFactoryAware {

    private static final Logger log = LoggerFactory.getLogger(RocketMQTransactionAnnotationProcessor.class);

    private BeanFactory beanFactory;
    /**
     * 不处理的类的集合
     */
    private final Set<Class<?>> nonProcessedClasses = Collections.newSetFromMap(new ConcurrentHashMap<>(64));

    private TransactionHandlerRegistry transactionHandlerRegistry;

    public RocketMQTransactionAnnotationProcessor(TransactionHandlerRegistry transactionHandlerRegistry) {
        this.transactionHandlerRegistry = transactionHandlerRegistry;
    }

    @Override // 实现自 BeanFactoryAware 接口
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        this.beanFactory = beanFactory;
    }

    @Override // 实现自 BeanPostProcessor 接口
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        return bean;
    }
    
    @Override // 实现自 Ordered 接口
    public int getOrder() {
        return LOWEST_PRECEDENCE;
    }

}

7.3.2 postProcessAfterInitialization

实现 #postProcessAfterInitialization(Object bean, String beanName) 方法,扫描 @RocketMQTransactionListener 注解的 Bean 对象,注册到 TransactionHandlerRegistry 中。代码如下:

代码语言:javascript
复制
// RocketMQTransactionAnnotationProcessor.java

@Override // 实现自 BeanPostProcessor 接口
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
    // <1.1> 如果 nonProcessedClasses 不存在
    if (!this.nonProcessedClasses.contains(bean.getClass())) {
        // <1.2> 获得 Bean 对应的 Class 类名。因为有可能被 AOP 代理过。
        Class<?> targetClass = AopUtils.getTargetClass(bean);
        // <1.3> 添加到 nonProcessedClasses 中,表示后面不处理。
        this.nonProcessedClasses.add(bean.getClass());
        // <2.1> 获得 @RocketMQTransactionListener 注解
        RocketMQTransactionListener listener = AnnotationUtils.findAnnotation(targetClass, RocketMQTransactionListener.class);
        // <2.2> 如果无注解,则不进行任何逻辑
        if (listener == null) { // for quick search
            log.trace("No @RocketMQTransactionListener annotations found on bean type: {}", bean.getClass());
        } else {
            // <2.3> 如果有注解,则注册到 TransactionHandlerRegistry 中
            try {
                processTransactionListenerAnnotation(listener, bean);
            } catch (MQClientException e) {
                log.error("Failed to process annotation " + listener, e);
                throw new BeanCreationException("Failed to process annotation " + listener, e);
            }
        }
    }

    return bean;
}
  • <1.1> 处,如果 nonProcessedClasses 不存在。
  • <1.2> 处,获得 Bean 对应的 Class 类名。因为有可能被 AOP 代理过。
  • <1.3> 处,添加到 nonProcessedClasses 中,表示后面不处理。从本质上来说,通过使用 nonProcessedClasses ,来保证一个 @RocketMQTransactionListener 注解的 Bean 对象,只会被处理一次。? 不过貌似,也没碰到会处理多次的情况呀~
  • <2.1> 处,获得 @RocketMQTransactionListener 注解。
  • <2.2> 处,如果无注解,则不进行任何逻辑。
  • <2.3> 处,如果有注解,则调用 #processTransactionListenerAnnotation(RocketMQTransactionListener listener, Object bean) 方法,注册 @RocketMQTransactionListener 到 TransactionHandlerRegistry 中。

7.3.3 processTransactionListenerAnnotation

#processTransactionListenerAnnotation(RocketMQTransactionListener listener, Object bean) 方法,注册 @RocketMQTransactionListener 到 TransactionHandlerRegistry 中。代码如下:

代码语言:javascript
复制
// RocketMQTransactionAnnotationProcessor.java

private void processTransactionListenerAnnotation(RocketMQTransactionListener listener, Object bean) throws MQClientException {
    // <1.1> 校验 @RocketMQTransactionListener 非空
    if (transactionHandlerRegistry == null) {
        throw new MQClientException("Bad usage of @RocketMQTransactionListener, " + "the class must work with RocketMQTemplate", null);
    }
    // <1.2> 如果未实现 RocketMQLocalTransactionListener 接口,直接抛出 IllegalStateException 异常。
    if (!RocketMQLocalTransactionListener.class.isAssignableFrom(bean.getClass())) {
        throw new MQClientException("Bad usage of @RocketMQTransactionListener, " + "the class must implement interface RocketMQLocalTransactionListener", null);
    }

    // <2> 将 @RocketMQTransactionListener 注解,创建成 TransactionHandler 对象
    TransactionHandler transactionHandler = new TransactionHandler();
    transactionHandler.setBeanFactory(this.beanFactory);
    transactionHandler.setName(listener.txProducerGroup());
    transactionHandler.setBeanName(bean.getClass().getName());
    transactionHandler.setListener((RocketMQLocalTransactionListener) bean);
    transactionHandler.setCheckExecutor(listener.corePoolSize(), listener.maximumPoolSize(), listener.keepAliveTime(), listener.blockingQueueSize());

    // <3> 注册 TransactionHandler 到 transactionHandlerRegistry 中
    transactionHandlerRegistry.registerTransactionHandler(transactionHandler);
}
  • <1.1> 处,校验 @RocketMQTransactionListener 注解非空。
  • <1.2> 处,如果未实现 RocketMQLocalTransactionListener 接口,直接抛出 IllegalStateException 异常。
  • <2> 处,将 @RocketMQTransactionListener 注解,创建成 TransactionHandler 对象。
  • <3> 处,调用 TransactionHandlerRegistry#registerTransactionHandler(TransactionHandler handler) 方法,注册 TransactionHandler 对象。

8. support

8.1 RocketMQHeaders

org.apache.rocketmq.spring.support.RocketMQHeaders ,RocketMQ Message Header 属性名的枚举。代码如下:

代码语言:javascript
复制
// RocketMQHeaders.java

/**
 * Represents the RocketMQ message protocol that is used during the data exchange.
 */
public class RocketMQHeaders {

    public static final String KEYS = "KEYS";
    public static final String TAGS = "TAGS";
    public static final String TOPIC = "TOPIC";
    public static final String MESSAGE_ID = "MESSAGE_ID";
    public static final String BORN_TIMESTAMP = "BORN_TIMESTAMP";
    public static final String BORN_HOST = "BORN_HOST";
    public static final String FLAG = "FLAG";
    public static final String QUEUE_ID = "QUEUE_ID";
    public static final String SYS_FLAG = "SYS_FLAG";
    public static final String TRANSACTION_ID = "TRANSACTION_ID";
    public static final String PROPERTIES = "PROPERTIES";

}

8.2 RocketMQUtil

org.apache.rocketmq.spring.support.RocketMQUtil ,RocketMQ 工具类。我们先不看这里的代码解析,等到需要看的时候,艿艿会专门提到。

Let's Go ~我们先跳到 「8.5 DefaultRocketMQListenerContainer」 中。

8.2.1 convertToRocketMessage

#convertToRocketMessage(objectMapper, charset, destination, message) 方法,将 message 转换成 RocketMQ Message 对象。代码如下:

代码语言:javascript
复制
// RocketMQUtil.java

public static org.apache.rocketmq.common.message.Message convertToRocketMessage(ObjectMapper objectMapper, String charset,
    String destination, org.springframework.messaging.Message<?> message) {
    // 生成消息的 bytes 数组
    Object payloadObj = message.getPayload();
    byte[] payloads;
    if (payloadObj instanceof String) {
        payloads = ((String) payloadObj).getBytes(Charset.forName(charset));
    } else if (payloadObj instanceof byte[]) {
        payloads = (byte[]) message.getPayload();
    } else {
        try {
            // <X>
            String jsonObj = objectMapper.writeValueAsString(payloadObj);
            payloads = jsonObj.getBytes(Charset.forName(charset));
        } catch (Exception e) {
            throw new RuntimeException("convert to RocketMQ message failed.", e);
        }
    }

    // 获得 topic、tag 属性
    String[] tempArr = destination.split(":", 2);
    String topic = tempArr[0];
    String tags = "";
    if (tempArr.length > 1) {
        tags = tempArr[1];
    }

    // 创建 Message 对象,传入上述变量到其构造方法
    org.apache.rocketmq.common.message.Message rocketMsg = new org.apache.rocketmq.common.message.Message(topic, tags, payloads);

    // 如果 MessageHeaders 非空,逐个处理
    MessageHeaders headers = message.getHeaders();
    if (Objects.nonNull(headers) && !headers.isEmpty()) {
        // 设置 KEYS 属性
        Object keys = headers.get(RocketMQHeaders.KEYS);
        if (!StringUtils.isEmpty(keys)) { // if headers has 'KEYS', set rocketMQ message key
            rocketMsg.setKeys(keys.toString());
        }

        // 设置 FLAG 属性
        Object flagObj = headers.getOrDefault("FLAG", "0");
        int flag = 0;
        try {
            flag = Integer.parseInt(flagObj.toString());
        } catch (NumberFormatException e) {
            // Ignore it
            log.info("flag must be integer, flagObj:{}", flagObj);
        }
        rocketMsg.setFlag(flag);

        // 设置 WAIT 属性
        Object waitStoreMsgOkObj = headers.getOrDefault("WAIT_STORE_MSG_OK", "true");
        boolean waitStoreMsgOK = Boolean.TRUE.equals(waitStoreMsgOkObj);
        rocketMsg.setWaitStoreMsgOK(waitStoreMsgOK);

        // 设置 USERS_ 属性
        headers.entrySet().stream()
            .filter(entry -> !Objects.equals(entry.getKey(), RocketMQHeaders.KEYS)
                && !Objects.equals(entry.getKey(), "FLAG")
                && !Objects.equals(entry.getKey(), "WAIT_STORE_MSG_OK")) // exclude "KEYS", "FLAG", "WAIT_STORE_MSG_OK"
            .forEach(entry -> {
                rocketMsg.putUserProperty("USERS_" + entry.getKey(), String.valueOf(entry.getValue())); // add other properties with prefix "USERS_"
            });

    }

    return rocketMsg;
}
  • 比较简单,就是创建 RocketMQ Message 对象的过程。
  • <X> 处,我们会看到使用 objectMapper 写入,使用 JSON 序列化,将 messageType 转换成 String 类型。

8.2.2 convert

#convert(RocketMQLocalTransactionListener listener) 方法,将 Spring Boot RocketMQ RocketMQLocalTransactionListener 监听器,转换成 RocketMQ TransactionListener 监听器。代码如下:

代码语言:javascript
复制
// RocketMQUtil.java

public static TransactionListener convert(RocketMQLocalTransactionListener listener) {
    return new TransactionListener() {

        @Override
        public LocalTransactionState executeLocalTransaction(Message message, Object obj) {
            // <1> 转换 RocketMQ Message 转换成 Spring Message 对象
            // <2> 回调 RocketMQLocalTransactionListener 监听器
            RocketMQLocalTransactionState state = listener.executeLocalTransaction(convertToSpringMessage(message), obj);
            // <3> 转换 Spring Boot RocketMQ RocketMQLocalTransactionState 事务状态,成 RocketMQ  LocalTransactionState 事务状态
            return convertLocalTransactionState(state);
        }

        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
            // <1> 转换 RocketMQ Message 转换成 Spring Message 对象
            // <2> 回调 RocketMQLocalTransactionListener 监听器
            RocketMQLocalTransactionState state = listener.checkLocalTransaction(convertToSpringMessage(messageExt));
            // <3> 转换 Spring Boot RocketMQ RocketMQLocalTransactionState 事务状态,成 RocketMQ  LocalTransactionState 事务状态
            return convertLocalTransactionState(state);
        }

    };
}
  • <1> 处,调用 #convertToSpringMessage(Message message) 方法,转换 RocketMQ Message 转换成 Spring Message 对象。代码如下: // RocketMQUtil.java // checkLocalTransaction 调用 public static org.springframework.messaging.Message convertToSpringMessage(org.apache.rocketmq.common.message.MessageExt message) { org.springframework.messaging.Message retMessage = MessageBuilder.withPayload(message.getBody()). setHeader(RocketMQHeaders.KEYS, message.getKeys()). setHeader(RocketMQHeaders.TAGS, message.getTags()). setHeader(RocketMQHeaders.TOPIC, message.getTopic()). setHeader(RocketMQHeaders.MESSAGE_ID, message.getMsgId()). setHeader(RocketMQHeaders.BORN_TIMESTAMP, message.getBornTimestamp()). setHeader(RocketMQHeaders.BORN_HOST, message.getBornHostString()). setHeader(RocketMQHeaders.FLAG, message.getFlag()). setHeader(RocketMQHeaders.QUEUE_ID, message.getQueueId()). setHeader(RocketMQHeaders.SYS_FLAG, message.getSysFlag()). setHeader(RocketMQHeaders.TRANSACTION_ID, message.getTransactionId()). setHeader(RocketMQHeaders.PROPERTIES, message.getProperties()). build(); return retMessage; } // executeLocalTransaction 调用 public static org.springframework.messaging.Message convertToSpringMessage(org.apache.rocketmq.common.message.Message message) { org.springframework.messaging.Message retMessage = MessageBuilder.withPayload(message.getBody()). setHeader(RocketMQHeaders.KEYS, message.getKeys()). setHeader(RocketMQHeaders.TAGS, message.getTags()). setHeader(RocketMQHeaders.TOPIC, message.getTopic()). setHeader(RocketMQHeaders.FLAG, message.getFlag()). setHeader(RocketMQHeaders.TRANSACTION_ID, message.getTransactionId()). setHeader(RocketMQHeaders.PROPERTIES, message.getProperties()). build(); return retMessage; }
  • <2> 处,回调 RocketMQLocalTransactionListener 监听器的对应的方法。
  • <3> 处,调用 #convertLocalTransactionState(RocketMQLocalTransactionState state) 方法,转换 Spring Boot RocketMQ RocketMQLocalTransactionState 事务状态,成 RocketMQ LocalTransactionState 事务状态。代码如下: // RocketMQUtil.java private static LocalTransactionState convertLocalTransactionState(RocketMQLocalTransactionState state) { switch (state) { case UNKNOWN: return LocalTransactionState.UNKNOW; case COMMIT: return LocalTransactionState.COMMIT_MESSAGE; case ROLLBACK: return LocalTransactionState.ROLLBACK_MESSAGE; } // Never happen log.warn("Failed to covert enum type RocketMQLocalTransactionState.%s", state); return LocalTransactionState.UNKNOW; }

8.4 RocketMQListenerContainer

org.apache.rocketmq.spring.support.RocketMQListenerContainer ,实现 DisposableBean 接口,RocketMQ 消费者 Listener 容器(Container)接口。代码如下:

代码语言:javascript
复制
// RocketMQListenerContainer.java

public interface RocketMQListenerContainer extends DisposableBean {

    /**
     * Setup the message listener to use. Throws an {@link IllegalArgumentException} if that message listener type is
     * not supported.
     */
    void setupMessageListener(RocketMQListener<?> messageListener);

}
  • 关于 #setupMessageListener(RocketMQListener<?> messageListener) 的实现方法,我们会在 DefaultRocketMQListenerContainer 中看到。

8.5 DefaultRocketMQListenerContainer

org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer ,实现 InitializingBean、RocketMQListenerContainer、SmartLifecycle 接口,DefaultRocketMQListener(RocketMQ 消费者的监听器)容器,负责创建 DefaultRocketMQListener 对象,并启动其对应的 DefaultMQPushConsumer(消费者),从而消费消息。

8.5.1 构造方法

代码语言:javascript
复制
// DefaultRocketMQListenerContainer.java

public class DefaultRocketMQListenerContainer implements InitializingBean, RocketMQListenerContainer, SmartLifecycle {

    private final static Logger log = LoggerFactory.getLogger(DefaultRocketMQListenerContainer.class);

    private long suspendCurrentQueueTimeMillis = 1000;

    /**
     * Message consume retry strategy<br> -1,no retry,put into DLQ directly<br> 0,broker control retry frequency<br>
     * >0,client control retry frequency.
     */
    private int delayLevelWhenNextConsume = 0;

    private String nameServer;

    private String consumerGroup;

    private String topic;

    private int consumeThreadMax = 64;

    private String charset = "UTF-8";

    private ObjectMapper objectMapper;

    private RocketMQListener rocketMQListener;

    private RocketMQMessageListener rocketMQMessageListener;

    /**
     * DefaultMQPushConsumer 对象。
     *
     * 在 {@link #initRocketMQPushConsumer()} 方法中,进行创建
     */
    private DefaultMQPushConsumer consumer;

    /**
     * 消息类型
     */
    private Class messageType;

    /**
     * 是否在运行中
     */
    private boolean running;

    // The following properties came from @RocketMQMessageListener.
    private ConsumeMode consumeMode;
    private SelectorType selectorType;
    private String selectorExpression;
    private MessageModel messageModel;
    
    // 省略各种 setting/getting 方法
}

8.5.2 afterPropertiesSet

实现 #afterPropertiesSet() 方法,代码如下:

代码语言:javascript
复制
// DefaultRocketMQListenerContainer.java

@Override
public void afterPropertiesSet() throws Exception {
    // 初始化 DefaultMQPushConsumer 对象
    initRocketMQPushConsumer();
    
    // 获得 messageType 属性
    this.messageType = getMessageType();
    log.debug("RocketMQ messageType: {}", messageType.getName());
}
  • <1> 处,初始化 DefaultMQPushConsumer 对象。详细解析,见 「8.5.2 initRocketMQPushConsumer」 。
  • <2> 处,调用 #getMessageType() 方法,获得 messageType 属性。代码如下:// DefaultRocketMQListenerContainer.java private Class getMessageType() { // 获得 Bean 对应的 Class 类名。因为有可能被 AOP 代理过。 Class<?> targetClass = AopProxyUtils.ultimateTargetClass(rocketMQListener); // 获得接口的 Type 数组 Type[] interfaces = targetClass.getGenericInterfaces(); Class<?> superclass = targetClass.getSuperclass(); while ((Objects.isNull(interfaces) || 0 == interfaces.length) && Objects.nonNull(superclass)) { // 此处,是以父类的接口为准 interfaces = superclass.getGenericInterfaces(); superclass = targetClass.getSuperclass(); } if (Objects.nonNull(interfaces)) { // 遍历 interfaces 数组 for (Type type : interfaces) { // 要求 type 是泛型参数 if (type instanceof ParameterizedType) { ParameterizedType parameterizedType = (ParameterizedType) type; // 要求是 RocketMQListener 接口 if (Objects.equals(parameterizedType.getRawType(), RocketMQListener.class)) { Type[] actualTypeArguments = parameterizedType.getActualTypeArguments(); // 取首个元素 if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) { return (Class) actualTypeArguments[0]; } else { return Object.class; } } } } return Object.class; } else { return Object.class; } }
    • 没啥好说,不理解的胖友,可以看看 《Java 中的 Type 类型详解》 。
    • 当然,也可以不看,知道意思就好列。
8.5.2.1 initRocketMQPushConsumer

#initRocketMQPushConsumer() 方法,初始化 DefaultMQPushConsumer 对象。代码如下:

代码语言:javascript
复制
// DefaultRocketMQListenerContainer.java

private void initRocketMQPushConsumer() throws MQClientException {
    Assert.notNull(rocketMQListener, "Property 'rocketMQListener' is required");
    Assert.notNull(consumerGroup, "Property 'consumerGroup' is required");
    Assert.notNull(nameServer, "Property 'nameServer' is required");
    Assert.notNull(topic, "Property 'topic' is required");

    // <1> 创建 DefaultMQPushConsumer 对象
    consumer = new DefaultMQPushConsumer(consumerGroup);
    // <2.1> 设置其属性
    consumer.setNamesrvAddr(nameServer);
    consumer.setConsumeThreadMax(consumeThreadMax);
    if (consumeThreadMax < consumer.getConsumeThreadMin()) {
        consumer.setConsumeThreadMin(consumeThreadMax);
    }

    // <2.2> 设置 messageModel 属性
    switch (messageModel) {
        case BROADCASTING:
            consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
            break;
        case CLUSTERING:
            consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
            break;
        default:
            throw new IllegalArgumentException("Property 'messageModel' was wrong.");
    }

    // <2.3> 设置 selectorType 属性
    switch (selectorType) {
        case TAG:
            consumer.subscribe(topic, selectorExpression);
            break;
        case SQL92:
            consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));
            break;
        default:
            throw new IllegalArgumentException("Property 'selectorType' was wrong.");
    }

    // <2.4> 设置 messageListener 属性
    switch (consumeMode) {
        case ORDERLY:
            consumer.setMessageListener(new DefaultMessageListenerOrderly());
            break;
        case CONCURRENTLY:
            consumer.setMessageListener(new DefaultMessageListenerConcurrently());
            break;
        default:
            throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
    }

    // <3> 如果实现了 RocketMQPushConsumerLifecycleListener 接口,则调用 prepareStart 方法,执行准备初始化的方法
    if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
        ((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);
    }

}
  • <1> 处,创建 DefaultMQPushConsumer 对象。
  • <2.1> 处,设置其属性。
  • <2.2> 处,设置 messageModel 属性。
  • <2.3> 处,设置 selectorType 属性。
  • <2.4> 处,设置 messageListener 属性。其中,我们会看到,根据不同的 consumeMode 值,创建不同的 MessageListener 对象。所以,我们放在 「8.5.2.2 DefaultMessageListenerOrderly」 和 「8.5.2.3 DefaultMessageListenerConcurrently」 中解析。
  • <3> 处,如果实现了 RocketMQPushConsumerLifecycleListener 接口,则调用 RocketMQPushConsumerLifecycleListener#prepareStart(consumer) 方法,执行准备初始化的方法。
8.5.2.2 DefaultMessageListenerConcurrently

DefaultMessageListenerConcurrently ,是 DefaultRocketMQListenerContainer 的内部类,实现 org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently 接口,提供并发消费。代码如下:

代码语言:javascript
复制
// DefaultRocketMQListenerContainer#DefaultMessageListenerConcurrently.java

public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {

    @SuppressWarnings("unchecked")
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        for (MessageExt messageExt : msgs) {
            log.debug("received msg: {}", messageExt);
            try {
                long now = System.currentTimeMillis();
                // <1> 转换消息
                // <2> 执行消费
                rocketMQListener.onMessage(doConvertMessage(messageExt));
                // 打印日志
                long costTime = System.currentTimeMillis() - now;
                log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
            } catch (Exception e) {
                // <3.1> 发生异常,返回稍后再消费
                log.warn("consume message failed. messageExt:{}", messageExt, e);
                context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        }

        // <3.2> 返回消费成功
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}
  • <1> 处,调用 #doConvertMessage(MessageExt messageExt) 方法,将 RocketMQ MessageExt 消息,转换成对应的 messageType 类型的消息。代码如下: // DefaultRocketMQListenerContainer.java private Object doConvertMessage(MessageExt messageExt) { // 如果是 MessageExt 类型,直接返回 if (Objects.equals(messageType, MessageExt.class)) { return messageExt; } else { // 先将 byte 数组(body),转换成 String 类型 String str = new String(messageExt.getBody(), Charset.forName(charset)); // 如果是 String 类型,直接返回 if (Objects.equals(messageType, String.class)) { return str; } else { // If msgType not string, use objectMapper change it. // <X> 使用 objectMapper 读取,使用 JSON 反序列化,将 String 转换成 messageType 类型 try { return objectMapper.readValue(str, messageType); } catch (Exception e) { log.info("convert failed. str:{}, msgType:{}", str, messageType); throw new RuntimeException("cannot convert message to " + messageType, e); } } } }
    • 总体代码比较简单,胖友自己瞅瞅。
    • <X> 处,我们会看到使用 objectMapper 读取,使用 JSON 反序列化,将 String 转换成 messageType 类型。
  • <2> 处,调用 RocketMQListener#onMessage(T message) 方法,消费消息。
  • <3.1> 处,发生异常,返回稍后再消费。
  • <3.2> 处,返回消费成功。
  • 并发消费的逻辑,是 RocketMQ Consumer 内部逻辑所处理的。
8.5.2.3 DefaultMessageListenerOrderly

DefaultMessageListenerOrderly ,是 DefaultRocketMQListenerContainer 的内部类,实现 org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly 接口,提供顺序消费。代码如下:

代码语言:javascript
复制
// DefaultRocketMQListenerContainer#DefaultMessageListenerOrderly.java

public class DefaultMessageListenerOrderly implements MessageListenerOrderly {

    @SuppressWarnings("unchecked")
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        // 遍历 MessageExt 消息
        for (MessageExt messageExt : msgs) {
            log.debug("received msg: {}", messageExt);
            try {
                long now = System.currentTimeMillis();
                // 转换消息
                // 执行消费
                rocketMQListener.onMessage(doConvertMessage(messageExt));
                // 打印日志
                long costTime = System.currentTimeMillis() - now;
                log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
            } catch (Exception e) {
                // 发生异常,设置中断消费一会
                log.warn("consume message failed. messageExt:{}", messageExt, e);
                context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
                return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
            }
        }

        // 返回消费成功
        return ConsumeOrderlyStatus.SUCCESS;
    }
}
  • 总体逻辑,和 「8.5.2.2 DefaultMessageListenerConcurrently」 类似,就不重复解析了。

8.5.3 start

实现 #start() 方法,启动 DefaultMQPushConsumer 。代码如下:

代码语言:javascript
复制
// DefaultRocketMQListenerContainer.java

@Override
public void start() {
    // 如果已经启动,则抛出 IllegalStateException 异常
    if (this.isRunning()) {
        throw new IllegalStateException("container already running. " + this.toString());
    }

    // 启动 DefaultMQPushConsumer
    try {
        consumer.start();
    } catch (MQClientException e) {
        throw new IllegalStateException("Failed to start RocketMQ push consumer", e);
    }

    // 标记已经启动
    this.setRunning(true);

    log.info("running container: {}", this.toString());
}

@Override // 实现自 SmartLifecycle->Lifecycle 接口
public boolean isRunning() {
    return running;
}

private void setRunning(boolean running) {
    this.running = running;
}

@Override // 实现自 SmartLifecycle 接口
public boolean isAutoStartup() {
    return true;
}

@Override // 实现自 SmartLifecycle->Phased 接口
public int getPhase() {
    // Returning Integer.MAX_VALUE only suggests that
    // we will be the first bean to shutdown and last bean to start
    return Integer.MAX_VALUE;
}

8.5.4 stop

实现 #stop() 方法,关闭 DefaultMQPushConsumer 。代码如下:

代码语言:javascript
复制
// DefaultRocketMQListenerContainer.java

@Override
public void stop() {
    // 必须在运行中
    if (this.isRunning()) {
        // 关闭 DefaultMQPushConsumer
        if (Objects.nonNull(consumer)) {
            consumer.shutdown();
        }
        // 标记不在启动
        setRunning(false);
    }
}

@Override
public void stop(Runnable callback) {
    stop();
    callback.run();
}

8.5.5 destroy

实现 #destroy() 方法,关闭 DefaultMQPushConsumer 。代码如下:

代码语言:javascript
复制
// DefaultRocketMQListenerContainer.java

@Override
public void destroy() {
    // 标记已经停止
    this.setRunning(false);
    // 关闭 DefaultMQPushConsumer
    if (Objects.nonNull(consumer)) {
        consumer.shutdown();
    }
    log.info("container destroyed, {}", this.toString());
}

9. core

9.1 RocketMQListener

org.apache.rocketmq.spring.core.RocketMQListener ,RocketMQ 消费者的消费消息监听器。代码如下:

代码语言:javascript
复制
// RocketMQListener.java

public interface RocketMQListener<T> { // <T> 泛型,声明消费的消息类型

    /**
     * 消费消息
     *
     * @param message 消息
     */
    void onMessage(T message);

}

9.2 RocketMQPushConsumerLifecycleListener

org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener ,继承 RocketMQConsumerLifecycleListener 接口,Push 模式的消费者的 RocketMQConsumerLifecycleListener 接口。代码如下:

代码语言:javascript
复制
// RocketMQPushConsumerLifecycleListener.java

public interface RocketMQPushConsumerLifecycleListener extends RocketMQConsumerLifecycleListener<DefaultMQPushConsumer> {
}
  • 目前暂无任何方法。使用时,建议实现该接口。
  • 未来来说,估计会新增 RocketMQPullConsumerLifecycleListener 接口,Pull 模式的消费者的 RocketMQConsumerLifecycleListener 接口。

9.3 RocketMQLocalTransactionListener

org.springframework.messaging.Message.RocketMQLocalTransactionListener ,RocketMQ 本地事务监听器接口。代码如下:

代码语言:javascript
复制
// RocketMQLocalTransactionListener.java

public interface RocketMQLocalTransactionListener {

    /**
     * 执行本地事务
     *
     * @param msg 消息
     * @param arg 方法参数
     * @return 本地事务状态
     */
    RocketMQLocalTransactionState executeLocalTransaction(final Message msg, final Object arg);

    /**
     * 检查本地事务状态
     *
     * @param msg 消息
     * @return 本地事务状态
     */
    RocketMQLocalTransactionState checkLocalTransaction(final Message msg);

}
  • 其中,org.apache.rocketmq.spring.core.RocketMQLocalTransactionState ,RocketMQ 本地事务状态。代码如下:// RocketMQLocalTransactionState.java public enum RocketMQLocalTransactionState { /** * 已提交 */ COMMIT, /** * 已回滚 */ ROLLBACK, /** * 未知 */ UNKNOWN }

9.4 RocketMQTemplate

org.apache.rocketmq.spring.core.RocketMQTemplate ,继承 AbstractMessageSendingTemplate 抽象类,实现 InitializingBean、DisposableBean 接口,RocketMQ 客户端的方法模板类。

9.4.1 构造方法

代码语言:javascript
复制
// RocketMQTemplate.java

private static final  Logger log = LoggerFactory.getLogger(RocketMQTemplate.class);

/**
 * 消息生产者
 */
private DefaultMQProducer producer;

private ObjectMapper objectMapper;

private String charset = "UTF-8";

/**
 * 消息队列选择器
 */
private MessageQueueSelector messageQueueSelector = new SelectMessageQueueByHash();

/**
 * TransactionMQProducer 的映射
 *
 * KEY:{@link TransactionMQProducer#getProducerGroup()} 事务生产者对应的分组
 */
private final Map<String, TransactionMQProducer> cache = new ConcurrentHashMap<>(); //only put TransactionMQProducer by now!!!

9.4.2 afterPropertiesSet

实现 #afterPropertiesSet() 方法,启动 DefaultMQProducer 。代码如下:

代码语言:javascript
复制
// RocketMQTemplate.java

@Override
public void afterPropertiesSet() throws Exception {
    Assert.notNull(producer, "Property 'producer' is required");
    producer.start();
}

9.4.3 发送消息

发送消息的方法非常多,如下:

  • syncSend
  • syncSendOrderly
  • asyncSend
  • asyncSendOrderly
  • sendOneWay
  • sendOneWayOrderly

#syncSend(String destination, Message<?> message)方法,同步发送消息。代码如下:

代码语言:javascript
复制
// RocketMQTemplate.java

/**
 * <p> Send message in synchronous mode. This method returns only when the sending procedure totally completes.
 * Reliable synchronous transmission is used in extensive scenes, such as important notification messages, SMS
 * notification, SMS marketing system, etc.. </p>
 * <p>
 * <strong>Warn:</strong> this method has internal retry-mechanism, that is, internal implementation will retry
 * {@link DefaultMQProducer#getRetryTimesWhenSendFailed} times before claiming failure. As a result, multiple
 * messages may potentially delivered to broker(s). It's up to the application developers to resolve potential
 * duplication issue.
 *
 * @param destination formats: `topicName:tags`
 * @param message     {@link org.springframework.messaging.Message}
 * @return {@link SendResult}
 */
public SendResult syncSend(String destination, Message<?> message) {
    return syncSend(destination, message, producer.getSendMsgTimeout());
}

public SendResult syncSend(String destination, Message<?> message, long timeout) {
    return syncSend(destination, message, timeout, 0);
}

public SendResult syncSend(String destination, Message<?> message, long timeout, int delayLevel) {
    // <1> 校验消息
    if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
        log.error("syncSend failed. destination:{}, message is null ", destination);
        throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
    }

    try {
        long now = System.currentTimeMillis();
        // <2> 将 message 转换成 RocketMQ Message 对象
        org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper, charset, destination, message);
        // <3> 设置 delayLevel 属性
        if (delayLevel > 0) {
            rocketMsg.setDelayTimeLevel(delayLevel);
        }
        // <4> 同步发送消息
        SendResult sendResult = producer.send(rocketMsg, timeout);
        // 打印日志
        long costTime = System.currentTimeMillis() - now;
        log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
        return sendResult;
    } catch (Exception e) {
        log.error("syncSend failed. destination:{}, message:{} ", destination, message);
        throw new MessagingException(e.getMessage(), e);
    }
}
  • <1> 处,校验消息。
  • <2> 处,调用 RocketMQUtil#convertToRocketMessage(objectMapper, charset, destination, message) 方法,将 message 转换成 RocketMQ Message 对象。此时,我们可以跳转到 「8.2.1 convertToRocketMessage」 。当然,也可以不看。嘿嘿~
  • <3> 处,设置 delayLevel 属性。
  • <4> 处,调用 DefaultMQProducer#send(rocketMsg, timeout) 方法,同步发送消息。
  • 另外,有一点要注意,传入的方法 message 参数,是 org.apache.rocketmq.common.message.Message 对象。所以说,Spring Boot RocketMQ 库,还有一个作用,是屏蔽底层的 RocketMQ 的存在,这样呢,未来如果我们希望替换掉 RocketMQ ,也是非常方便的。? 算是一个“彩蛋”。感兴趣的胖友,可以再看看其它地方,也会发现有这样一个设计目的。

9.4.4 destroy

实现 #destroy() 方法,关闭生产者们。代码如下:

代码语言:javascript
复制
// RocketMQTemplate.java

@Override // 实现自 DisposableBean 接口
public void destroy() {
    // 关闭 producer
    if (Objects.nonNull(producer)) {
        producer.shutdown();
    }

    // 关闭 cache
    for (Map.Entry<String, TransactionMQProducer> kv : cache.entrySet()) {
        if (Objects.nonNull(kv.getValue())) {
            kv.getValue().shutdown();
        }
    }
    cache.clear();
}

9.4.5 createAndStartTransactionMQProducer

#createAndStartTransactionMQProducer(String txProducerGroup, RocketMQLocalTransactionListener transactionListener, ExecutorService executorService) 方法,创建并启动 TransactionMQProducer 对象。代码如下:

代码语言:javascript
复制
// RocketMQTemplate.java

/**
 * Create and start a transaction MQProducer, this new producer is cached in memory.
 * <p>Note: This method is invoked internally when processing {@code @RocketMQLocalTransactionListener}, it is not
 * recommended to directly use this method by user.
 *
 * @param txProducerGroup     Producer (group) name, unique for each producer
 * @param transactionListener TransactoinListener impl class
 * @param executorService     Nullable.
 * @return true if producer is created and started; false if the named producer already exists in cache.
 * @throws MessagingException
 */
public boolean createAndStartTransactionMQProducer(String txProducerGroup, RocketMQLocalTransactionListener transactionListener,  ExecutorService executorService) throws MessagingException {
    // <1> 如果已经存在,则直接返回
    txProducerGroup = getTxProducerGroupName(txProducerGroup);
    if (cache.containsKey(txProducerGroup)) {
        log.info(String.format("get TransactionMQProducer '%s' from cache", txProducerGroup));
        return false;
    }

    // <2> 创建 TransactionMQProducer 对象
    TransactionMQProducer txProducer = createTransactionMQProducer(txProducerGroup, transactionListener, executorService);
    try {
        // <3> 启动 TransactionMQProducer 对象
        txProducer.start();
        // <4> 添加到 cache 中
        cache.put(txProducerGroup, txProducer);
    } catch (MQClientException e) {
        throw RocketMQUtil.convert(e);
    }

    return true;
}
  • <1> 处,如果已经存在,则直接返回。
  • <2> 处,调用 #createAndStartTransactionMQProducer(String name, RocketMQLocalTransactionListener transactionListener, ExecutorService executorService) 方法,创建 TransactionMQProducer 对象。代码如下: // RocketMQTemplate.java private TransactionMQProducer createTransactionMQProducer(String name, RocketMQLocalTransactionListener transactionListener, ExecutorService executorService) { Assert.notNull(producer, "Property 'producer' is required"); Assert.notNull(transactionListener, "Parameter 'transactionListener' is required"); // 创建 TransactionMQProducer 对象 TransactionMQProducer txProducer = new TransactionMQProducer(name); // <X> 转换监听器,并设置到 txProducer 中 txProducer.setTransactionListener(RocketMQUtil.convert(transactionListener)); // 设置其它属性 txProducer.setNamesrvAddr(producer.getNamesrvAddr()); if (executorService != null) { txProducer.setExecutorService(executorService); } txProducer.setSendMsgTimeout(producer.getSendMsgTimeout()); txProducer.setRetryTimesWhenSendFailed(producer.getRetryTimesWhenSendFailed()); txProducer.setRetryTimesWhenSendAsyncFailed(producer.getRetryTimesWhenSendAsyncFailed()); txProducer.setMaxMessageSize(producer.getMaxMessageSize()); txProducer.setCompressMsgBodyOverHowmuch(producer.getCompressMsgBodyOverHowmuch()); txProducer.setRetryAnotherBrokerWhenNotStoreOK(producer.isRetryAnotherBrokerWhenNotStoreOK()); return txProducer; }
    • <X> 处,会调用 RocketMQUtil#convert(RocketMQLocalTransactionListener listener) 方法,将 RocketMQLocalTransactionListener 转换成 RocketMQ TransactionListener 的监听器。详细解析,见 「8.5.3 convert」 。当然,也可以不看,哈哈哈哈。
  • <3> 处,启动 TransactionMQProducer 对象。
  • <4> 处,添加到 cache 中。

9.4.6 sendMessageInTransaction

#sendMessageInTransaction(final String txProducerGroup, final String destination, final Message<?> message, final Object arg) 方法,发送事务消息。代码如下:

代码语言:javascript
复制
// RocketMQTemplate.java

/**
 * Send Spring Message in Transaction
 *
 * @param txProducerGroup the validate txProducerGroup name, set null if using the default name
 * @param destination     destination formats: `topicName:tags`
 * @param message         message {@link org.springframework.messaging.Message}
 * @param arg             ext arg
 * @return TransactionSendResult
 * @throws MessagingException
 */
public TransactionSendResult sendMessageInTransaction(final String txProducerGroup, final String destination, final Message<?> message, final Object arg) throws MessagingException {
    try {
        // <1> 获得 TransactionMQProducer 对象
        TransactionMQProducer txProducer = this.stageMQProducer(txProducerGroup);
        // <2> 将 message 转换成 RocketMQ Message 对象
        org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper, charset, destination, message);
        // <3> 发送事务消息
        return txProducer.sendMessageInTransaction(rocketMsg, arg);
    } catch (MQClientException e) {
        throw RocketMQUtil.convert(e);
    }
}
  • <1> 处,调用 #stageMQProducer(String name) 方法,获得 TransactionMQProducer 对象。代码如下: // RocketMQTemplate.java private TransactionMQProducer stageMQProducer(String name) throws MessagingException { name = getTxProducerGroupName(name); // 获得 TransactionMQProducer 对象 TransactionMQProducer cachedProducer = cache.get(name); if (cachedProducer == null) { throw new MessagingException(String.format("Can not found MQProducer '%s' in cache! please define @RocketMQLocalTransactionListener class or invoke createOrGetStartedTransactionMQProducer() to create it firstly", name)); } return cachedProducer; } private String getTxProducerGroupName(String name) { return name == null ? RocketMQConfigUtils.ROCKETMQ_TRANSACTION_DEFAULT_GLOBAL_NAME : name; }
  • <2> 处,将 message 转换成 RocketMQ Message 对象。
  • <3> 处,调用 TransactionMQProducer#sendMessageInTransaction(Message msg, Object arg) 方法,发送事务消息。
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-05-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 芋道源码 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 概述
  • 2. 调试环境搭建
    • 2.1 依赖工具
      • 2. 源码拉取
        • 2.3 启动 RocketMQ Namesrv
          • 2.4 启动 RocketMQ Broker
            • 2.5 启动 RocketMQ Spring Boot Producer
              • 2.6 启动 RocketMQ Spring Boot Consumer
              • 3. 项目结构一览
                • 3.1 代码统计
                  • 3.2 rocketmq-spring-boot-parent 模块
                    • 3.3 rocketmq-spring-boot-starter 模块
                      • 3.4 rocketmq-spring-boot 模块
                        • 3.5 rocketmq-spring-boot-samples 模块
                        • 5. annotation 包
                          • 5.1 @RocketMQMessageListener
                            • 5.2 @RocketMQTransactionListener
                            • 6. autoconfigure 包
                              • 6.1 RocketMQProperties
                                • 6.2 RocketMQAutoConfiguration
                                  • 6.2.1 defaultMQProducer
                                  • 6.2.2 rocketMQTemplate
                                  • 6.2.3 transactionHandlerRegistry
                                  • 6.2.4 transactionAnnotationProcessor
                                • 6.3 JacksonFallbackConfiguration
                                  • 6.4 ListenerContainerConfiguration
                                    • 6.4.1 构造方法
                                    • 6.4.2 afterSingletonsInstantiated
                                    • 6.4.3 registerContainer
                                • 7. config 包
                                  • 7.1 TransactionHandler
                                    • 7.2 TransactionHandlerRegistry
                                      • 7.3 RocketMQTransactionAnnotationProcessor
                                        • 7.3.1 构造方法
                                        • 7.3.2 postProcessAfterInitialization
                                        • 7.3.3 processTransactionListenerAnnotation
                                    • 8. support 包
                                      • 8.1 RocketMQHeaders
                                        • 8.2 RocketMQUtil
                                          • 8.2.1 convertToRocketMessage
                                          • 8.2.2 convert
                                        • 8.4 RocketMQListenerContainer
                                          • 8.5 DefaultRocketMQListenerContainer
                                            • 8.5.1 构造方法
                                            • 8.5.2 afterPropertiesSet
                                            • 8.5.3 start
                                            • 8.5.4 stop
                                            • 8.5.5 destroy
                                        • 9. core 包
                                          • 9.1 RocketMQListener
                                            • 9.2 RocketMQPushConsumerLifecycleListener
                                              • 9.3 RocketMQLocalTransactionListener
                                                • 9.4 RocketMQTemplate
                                                  • 9.4.1 构造方法
                                                  • 9.4.2 afterPropertiesSet
                                                  • 9.4.3 发送消息
                                                  • 9.4.4 destroy
                                                  • 9.4.5 createAndStartTransactionMQProducer
                                                  • 9.4.6 sendMessageInTransaction
                                              相关产品与服务
                                              容器服务
                                              腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
                                              领券
                                              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档