前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >@RabbitListener注解你不知道的都在这

@RabbitListener注解你不知道的都在这

作者头像
java进阶架构师
发布2021-01-05 11:10:29
5.1K0
发布2021-01-05 11:10:29
举报
文章被收录于专栏:Java进阶架构师

1. 前言

在消息中间件你该了解的秘密一文中详细介绍了如何使用RabbitMQ发送消息、消费消息;如何保证生产者发送消息的可靠性;如何保证消费消息的可靠性、如何横向扩展消费者以及如何对消费者进行流向削峰。

2.初衷

本文的初衷旨在搞懂为什么使用@Component +@RabbitListener注解就可以完成消息的消费,如何对消费者进行定制化配置?带着这些疑问开启本次的源码分析之路。

3.源码分析

3.1 寻找自动配置类

众所周知,所有与SpringBoot整合的中间件都是以starter的方式引入到项目中,这种情况下SpringBoot会有一个相关的自动配置类帮我们做一些默认配置,从而达到开箱即用的效果。寻找相关的自动配置类方法也很简单,只需要输入与之相关的名称即可,例如输入RabbitAuto就可以搜索RabbitAutoConfiguration这样一个自动配置类

3.2 自动配置类分析

打开RabbitAutoConfiguration自动配置类,可以看到如上内容:

  • @Configuration注解表明这一个配置类,会被框架扫描解析并注入到IOC容器中
  • @ConditionalOnClass注解表明,只有classpath路径下存在RabbitTemplate.classChannel.class这两个类才会去扫描解析当前自动配置类
  • @EnableConfigurationProperties注解实例化RabbitProperties并将yml或properties文件中的相关属性注入到该对象中
  • @Import注解表明要引入RabbitAnnotationDrivenConfiguration配置类

根据RabbitAnnotationDrivenConfiguration名称可以猜出该配置类大概就是我们要分析的入口类了

3.3 入口类分析

3.3.1 初始化消息相关属性
3.3.2 声明监听器容器工厂配置以及监听器容器工厂

进入配置类配置容器工厂的方法中

这里我们可以得出一个重要的信息,那就是可以通过配置文件中的配置对SimpleRabbitListenerContainerFactory进行相关配置操作

代码语言:javascript
复制
spring:
  rabbitmq:
    listener:
      simple:
        concurrency: 4
        max-concurrency: 10
        batch-size: 100
        prefetch: 100
        acknowledge-mode: MANUAL

到目前一共出现了几个比较重要的类:

  • SimpleRabbitListenerContainerFactoryConfigurer
  • SimpleRabbitListenerContainerFactory
  • RabbitProperties

它们之间的关系是这样的SimpleRabbitListenerContainerFactoryConfigurer容器工厂配置类持有RabbitProperties属性配置对象,然后对SimpleRabbitListenerContainerFactory进行相关配置,RabbitProperties属性又可以在application.yml文件中进行配置

3.3.3 开启RabbitMQ功能

@EnableRabbit注解引入了两个重要的类RabbitListenerAnnotationBeanPostProcessorRabbitListenerEndpointRegistryRabbitListenerAnnotationBeanPostProcessor用于Bean的后置处理,这里可以想象一下该后置处理器会Bean实例化之后,对含有@RabbitListener注解的类进行特殊代理,从而实现对消息的消费

3.4 后置处理

3.4.1 后置处理方法

RabbitListenerAnnotationBeanPostProcessor后置处理器的后置处理方法

代码语言:javascript
复制
@Override
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
    Class<?> targetClass = AopUtils.getTargetClass(bean);
    final TypeMetadata metadata = this.typeCache.computeIfAbsent(targetClass, this::buildMetadata);
    for (ListenerMethod lm : metadata.listenerMethods) {
        for (RabbitListener rabbitListener : lm.annotations) {
            processAmqpListener(rabbitListener, lm.method, bean, beanName);
        }
    }
    return bean;
}

中会对含有@RabbitListener注解的bean进行处理

代码语言:javascript
复制
protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName) {
    Method methodToUse = checkProxy(method, bean);
    MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();
    endpoint.setMethod(methodToUse);
    processListener(endpoint, rabbitListener, bean, methodToUse, beanName);
}

protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean,
   Object target, String beanName) {

  endpoint.setBean(bean);
  endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
  endpoint.setId(getEndpointId(rabbitListener));
  endpoint.setQueueNames(resolveQueues(rabbitListener));
  endpoint.setConcurrency(resolveExpressionAsStringOrInteger(rabbitListener.concurrency(), "concurrency"));
  endpoint.setBeanFactory(this.beanFactory);
  endpoint.setReturnExceptions(resolveExpressionAsBoolean(rabbitListener.returnExceptions()));
  // ...省略部分代码
  resolveExecutor(endpoint, rabbitListener, target, beanName);
  resolveAdmin(endpoint, rabbitListener, target);
  resolveAckMode(endpoint, rabbitListener);
  resolvePostProcessor(endpoint, rabbitListener, target, beanName);
  RabbitListenerContainerFactory<?> factory = resolveContainerFactory(rabbitListener, target, beanName);

  this.registrar.registerEndpoint(endpoint, factory);
 }

每个含有@RabbitListener注解的方法对应一个MethodRabbitListenerEndpoint对象,该对象会存储被@RabbitListener 注解方法相关属性以及@RabbitListener注解指定的属性,最终和SimpleRabbitListenerContainerFactory组成AmqpListenerEndpointDescriptor对象放入endpointDescriptors集合中

3.4.3 Bean创建完成后的初始化方法

由于RabbitListenerAnnotationBeanPostProcessor实现了SmartInitializingSingleton接口,因此会回调afterSingletonsInstantiated()方法,在回调方法中会遍历3.4.2endpointDescriptors集合进行SimpleMessageListenerContainer注册,然后使用SimpleRabbitListenerContainerFactoryMethodRabbitListenerEndpoint进行属性配置,最终把创建的SimpleMessageListenerContainer放入RabbitListenerEndpointRegistrylistenerContainers容器中

3.5 应用生命周期

RabbitListenerEndpointRegistry实现了SmartLifecycle接口,在应用启动完成之后会回调start()方法

3.6 小结

  • RabbitListenerAnnotationBeanPostProcessor后置处理方法会处理含有@RabbitListener注解的方法,创建MethodRabbitListenerEndpoint对象与SimpleRabbitListenerContainerFactory组成一对放入集合中
  • RabbitListenerAnnotationBeanPostProcessor初始化方法会遍历上一步的集合根据SimpleRabbitListenerContainerFactory创建SimpleMessageListenerContainer对象,然后使用SimpleRabbitListenerContainerFactoryMethodRabbitListenerEndpointSimpleMessageListenerContainer进行配置,最终将SimpleMessageListenerContainer放入map容器中
  • RabbitListenerEndpointRegistry启动方法会遍历上一步的map容器,启动SimpleMessageListenerContainer

4. 启动监听容器

4.1 启动
代码语言:javascript
复制
protected void doStart() {
    synchronized (this.consumersMonitor) {
        // 1.根据并发数来初始化消费者
        int newConsumers = initializeConsumers();
        Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();
        for (BlockingQueueConsumer consumer : this.consumers) {
            // 2.每个消费者创建一个AsyncMessageProcessingConsumer对象
            AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
            processors.add(processor);
            // 3.使用一个线程去执行
            getTaskExecutor().execute(processor);
            if (getApplicationEventPublisher() != null) {
                getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
            }
        }
        waitForConsumersToStart(processors);
    }
}
4.2 执行任务
代码语言:javascript
复制
@Override // NOSONAR - complexity - many catch blocks
public void run() { // NOSONAR - line count
    try {
        // 1.启动消费者
        initialize();
        // 2.消费消息
        while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
            mainLoop();
        }
    }
}
4.2 启动消费者

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#run--->org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#initialize--->org.springframework.amqp.rabbit.listener.BlockingQueueConsumer#start--->org.springframework.amqp.rabbit.listener.BlockingQueueConsumer#setQosAndreateConsumers--->org.springframework.amqp.rabbit.listener.BlockingQueueConsumer#consumeFromQueue

代码语言:javascript
复制
private void consumeFromQueue(String queue) throws IOException {
    // 1.设置消息回调处理
    InternalConsumer consumer = new InternalConsumer(this.channel, queue);
    // 2.告诉消息服务器当前channel消费的队列名称,消息ack模式以及当消息服务器推送消息后要执行的回调consumer
    String consumerTag = this.channel.basicConsume(queue, this.acknowledgeMode.isAutoAck(),
                                                   (this.tagStrategy != null ? this.tagStrategy.createConsumerTag(queue) : ""), this.noLocal,
                                                   this.exclusive, this.consumerArgs,
                                                   consumer);

    if (consumerTag != null) {
        this.consumers.put(queue, consumer);
        if (logger.isDebugEnabled()) {
            logger.debug("Started on queue '" + queue + "' with tag " + consumerTag + ": " + this);
        }
    }
    else {
        logger.error("Null consumer tag received for queue " + queue);
    }
}
4.3 消息回调处理

消息回调处理由InternalConsumer负责

代码语言:javascript
复制
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                           byte[] body) {
    try {
        if (BlockingQueueConsumer.this.abortStarted > 0) {
            // 1.当消息服务器将消息派发给消费者时由回调将消息放入到队列中
            if (!BlockingQueueConsumer.this.queue.offer(
                new Delivery(consumerTag, envelope, properties, body, this.queueName),
                BlockingQueueConsumer.this.shutdownTimeout, TimeUnit.MILLISECONDS)) {
            }
        }
        else {
            BlockingQueueConsumer.this.queue
                .put(new Delivery(consumerTag, envelope, properties, body, this.queueName));
        }
    }
}
4.4 消费消息

回到4.2部分的代码,进入mainLoop()查看消费消息的逻辑

代码语言:javascript
复制
private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Exception { //NOSONAR

    Channel channel = consumer.getChannel();

    List<Message> messages = null;
    long deliveryTag = 0;

    for (int i = 0; i < this.batchSize; i++) {

        logger.trace("Waiting for message from consumer.");
        // 1.从队列中取消息
        Message message = consumer.nextMessage(this.receiveTimeout);
        if (this.consumerBatchEnabled) {
        }
        else {
            try {
                // 2.调用监听器方法进行执行业务逻辑
                executeListener(channel, message);
            }
        }
    }
    if (this.consumerBatchEnabled && messages != null) {
        executeWithList(channel, messages, deliveryTag, consumer);
    }
 // 3.向消息服务器发送ack
    return consumer.commitIfNecessary(isChannelLocallyTransacted());

}
4.5 小结
  • 启动SimpleMessageListenerContainer,根据并发数创建消费者
  • 告诉消息服务器要消费的队列、ack模式,指定处理消息的回调
  • 消息服务器推送消息给消费者,执行回调,回调将消息放入队列中
  • 消费者线程无限循环从队列中取消息,消费消息执行业务逻辑
  • 执行完业务逻辑后向消息服务器发送ack
  • 之前,给大家发过三份Java面试宝典,这次新增了一份,目前总共是四份面试宝典,相信在跳槽前一个月按照面试宝典准备准备,基本没大问题。《java面试宝典5.0》(初中级)《350道Java面试题:整理自100+公司》(中高级)《资深java面试宝典-视频版》(资深)《Java[BAT]面试必备》(资深)分别适用于初中级,中高级,资深级工程师的面试复习。内容包含java基础、javaweb、mysql性能优化、JVM、锁、百万并发、消息队列,高性能缓存、反射、Spring全家桶原理、微服务、Zookeeper、数据结构、限流熔断降级等等。获取方式:点“在看”,V信关注上述Java最全面试题库号并回复 【面试】即可领取,更多精彩陆续奉上。 看到这里,证明有所收获必须点个在看支持呀,喵
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-01-03,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 java进阶架构师 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 前言
  • 2.初衷
  • 3.源码分析
    • 3.1 寻找自动配置类
      • 3.2 自动配置类分析
        • 3.3 入口类分析
          • 3.3.1 初始化消息相关属性
          • 3.3.2 声明监听器容器工厂配置以及监听器容器工厂
          • 3.3.3 开启RabbitMQ功能
        • 3.4 后置处理
          • 3.4.1 后置处理方法
          • 3.4.3 Bean创建完成后的初始化方法
        • 3.5 应用生命周期
          • 3.6 小结
            • 4.1 启动
            • 4.2 执行任务
            • 4.2 启动消费者
            • 4.3 消息回调处理
            • 4.4 消费消息
            • 4.5 小结
        • 4. 启动监听容器
        相关产品与服务
        容器服务
        腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档