Spring Cloud Stream应用与自定义RocketMQ Binder:实现RocketMQ绑定器

前言: 本文作者张天,节选自笔者与其合著的《Spring Cloud微服务架构进阶》,即将在八月出版问世。本文将其中Spring Cloud Stream应用与自定义Rocketmq Binder的内容抽取出来,主要介绍实现Spring Cloud Stream 的RocketMQ绑定器。

Stream的Binder机制

上一篇中,介绍了Spring Cloud Stream基本的概念及其编程模型。除此之外,Spring Cloud Stream提供了Binder接口来用于和外部消息队列进行绑定。本文将讲述Binder SPI的基本概念,主要组件和实现细节。 Binder SPI通过一系列的接口,工具类和检测机制提供了与外部消息队列绑定的绑定器机制。SPI的关键点是Binder接口,这个接口负责提供和外部消息队列进行绑定的具体实现。

1public interface Binder<T, C extends ConsumerProperties, P extends ProducerProperties> {
2    Binding<T> bindConsumer(String name, String group, T inboundBindTarget, C consumerProperties);
3    Binding<T> bindProducer(String name, T outboundBindTarget, P producerProperties);
4}

一个典型的自定义Binder组件实现应该包括以下几点:

  • 一个实现Binder接口的类。
  • 一个Spring的@Configuration类来创建上述类型的实例。
  • 在classpath上一个包含自定义Binder相关配置类的META-INF/spring.binders文件,比如说:
1kafka:\
2org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration

Spring Cloud Stream基于Binder SPI的实现来进行channel和消息队列的绑定任务。不同类型的消息队列中间件实现了不同的绑定器Binder。比如说:Spring-Cloud-Stream-Binder-Kafka是针对Kafka的Binder实现,而Spring-Cloud-Stream-Binder-Rabbit则是针对RabbitMQ的Binder实现。

Spring Cloud Stream依赖于Spring Boot的自动配置机制来配置Binder。如果一个Binder实现在项目的classpath中被发现,Spring Cloud Stream将会自动使用它。比如说,一个Spring Cloud Stream项目需要绑定RabbitMQ中间件的Binder,在pom文件中加入下面的依赖来轻松实现。

1<dependency>
2  <groupId>org.springframework.cloud</groupId>
3  <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
4</dependency>

Binder For RocketMQ

Spring Cloud Stream为接入不同的消息队列提供了一整套的自定义机制,通过为每个消息队里开发一个Binder来接入该消息队列。目前官方认定的Binder为rabbitmq binder和kafka binder。但是开发人员可以基于Stream Binder的机制来制定自己的Binder。下面我们就构建一个简单的RocketMQ的Binder。

配置类

需要在resources/META-INF/spring.binders文件中配置有关RocketMQ的Configuration类,该配置类会使用@Import来导入为RocketMQ制定的RocketMessageChannelBinderConfiguration

1rocket:\
2org.springframework.cloud.stream.binder.rocket.config.RocketServiceAutoConfiguration

RocketMessageChannelBinderConfiguration将会提供两个极其重要的bean实例,分别为RocketMessageChannelBinderRocketExchangeQueueProvisionerRocketMessageChannelBinder主要是用于channel和消息队列的绑定,而RocketExchangeQueueProvisioner则封装了RocketMQ的相关API,可以用于创建消息队列的基础组件,比如说队列,交换器等。

 1@Configuration
 2public class RocketMessageChannelBinderConfiguration {
 3    @Autowired
 4    private ConnectionFactory rocketConnectionFactory;
 5    @Autowired
 6    private RocketProperties  rocketProperties;
 7    @Bean
 8    RocketMessageChannelBinder rocketMessageChannelBinder() throws Exception {
 9        RocketMessageChannelBinder binder = new RocketMessageChannelBinder(this.rocketConnectionFactory,
10                this.rocketProperties, provisioningProvider());
11        return binder;
12    }
13    @Bean
14    RocketExchangeQueueProvisioner provisioningProvider() {
15        return new RocketExchangeQueueProvisioner(this.rocketConnectionFactory);
16    }
17}

RocketMessageChannelBinder继承了抽象类AbstractMessageChannelBinder,并实现了#producerMessageHandler和#createConsumerEndpoint函数。

MessageHandler有向消息队列发送消息的能力,#createProducerMessageHandler函数就是为了创建MessageHandler对象,来将输出型Channel的消息发送到消息队列上。

1protected MessageHandler createProducerMessageHandler(
2        ProducerDestination destination,             
3        ExtendedProducerProperties<RocketProducerProperties> producerProperties,                    
4        MessageChannel errorChannel) 
5        throws Exception {
6    final AmqpOutboundEndpoint endpoint = new AmqpOutboundEndpoint(
7            buildRocketTemplate(producerProperties.getExtension(), errorChannel != null));
8    return endpoint;
9}

MessageProducer能够从消息队列接收消息,并将该消息发送输入型Channel。

1@Override
2protected MessageProducer createConsumerEndpoint(ConsumerDestination consumerDestination, String group,
3                                                    ExtendedConsumerProperties<RocketConsumerProperties> properties) throws Exception {
4    SimpleRocketMessageListenerContainer listenerContainer = new SimpleRocketMessageListenerContainer();
5    RocketInboundChannelAdapter rocketInboundChannelAdapter = new RocketInboundChannelAdapter(listenerContainer);
6    return rocketInboundChannelAdapter;
7}

消息接收功能的实现流程

类似于RabbitMQ的Binder,需要实现下面一系列的类来实现从RocketMQ到对应MessageChannel的消息传递。

  1. RocketBlockingQueueConsumer.InnerConsumer实现了MessageListenerConcurrently来接收RocketMQ传递的消息。
  2. RocketBlockingQueueConsumer将InnerConsumer注册给RocketMQ的DefaultMQPushConsumer来接收RocketMQ传递过来的消息,并存储在自身的阻塞队列中。供SimpleRocketMessageListenerContainer获取。
  3. SimpleRocketMessageListenerContainer,启动一个线程来不停从RocketBlockingQueueConsumer获取消息,然后调用RocketInboundChannelAdapter.Listener的回调函数,将消息传递给RocketInboundChannelAdapter。
  4. RocketInboundChannelAdapter.Listener供SimpleRocketMessageListenerContainer回调,将消息发送给RocketInboundChannelAdapter。
  5. RocketInboundChannelAdapter,接受SimpleRocketMessageListenerContainer传递过来的消息,然后通过MessageTemplate发送给相应的MessageChannel。从而传递给由@StreamListener的修饰的函数。

InnerConsumer接收RocketMQ消息

InnerConsumer实现的MessageListenerConcurrently接口是RocketMQ中用于并发接受异步消息的接口,该接口可以接收到RocketMQ发送过来的异步消息。而InnerConsumer在接受到消息之后,会将消息封装成RocketDelivery加入到阻塞队列中。

RocketBlockingQueueConsumer有一个阻塞队列来存储RocketMQ传递给RocketBlockingQueueConsumer.InnerConsumer的消息,而nextMessage函数可以从阻塞队列中拉取一个消息并返回。

AsyncMessageProcessingConsumer获取消息

SimpleRocketMessageListenerContainer.AsyncMessageProcessingConsumer是实现了Runnable接口,在run()接口中会无限循环地调用SimpleRocketMessageListenerContainer本身的receiveAndExecute。

 1@Override
 2public void run() {
 3    if (!isActive()) {
 4        return;
 5    }
 6    try {
 7        //只要consumer的状态正常,就会一直循环
 8        while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
 9            try {
10                boolean receivedOk = receiveAndExecute(this.consumer);
11            }
12            catch (ListenerExecutionFailedException ex) {
13                if (ex.getCause() instanceof NoSuchMethodException) {
14                    throw new FatalListenerExecutionException("Invalid listener", ex);
15                }
16            }
17            catch (AmqpRejectAndDontRequeueException rejectEx) {
18            } catch (Throwable e) {
19            }
20        }
21    } catch (Exception e) {
22    }
23    finally {
24        if (getTransactionManager() != null) {
25            ConsumerChannelRegistry.unRegisterConsumerChannel();
26        }
27    }
28    this.start.countDown();
29    if (!isActive(this.consumer) || aborted) {
30        this.consumer.stop();
31    }
32    else {
33        restart(this.consumer);
34    }
35}

函数#receiveAndExecute最终的作用就是调用RocketBlockingQueueConsumer的nextMessage,然后再将消息调用messageListener.onMessage函数将消息传递出去。

初始化RocketBlockingQueueConsumer和AsyncMessageProcessingConsumer

SimpleRocketMessageListenerContainer的doStart函数会初始化RocketBlockingQueueConsumer并且启动SimpleRocketMessageListenerContainer的AsyncMessageProcessingConsumer会无限循环地从RocketBlockingQueueConsumer中获取RocketMQ传递过来的消息。

 1private void doStart() {
 2    synchronized (this.lifecycleMonitor) {
 3        this.active = true;
 4        this.running = true;
 5        this.lifecycleMonitor.notifyAll();
 6    }
 7    synchronized (this.consumersMonitor) {
 8        if (this.consumers != null) {
 9            throw new IllegalStateException("A stopped container should not have consumers");
10        }
11        //初始化Consumer
12        int newConsumers = initializeConsumers();
13        if (this.consumers == null) {
14            return;
15        }
16        if (newConsumers <= 0) {
17            return;
18        }
19        Set<SimpleRocketMessageListenerContainer.AsyncMessageProcessingConsumer> processors =
20                new HashSet<>();
21        //对于每个RocketBlockingQueueConsumer启动一个
22        //AsyncMessageProcessingConsumer来执行任务
23        for (RocketBlockingQueueConsumer consumer : this.consumers) {
24            SimpleRocketMessageListenerContainer.AsyncMessageProcessingConsumer
25                    processor = new SimpleRocketMessageListenerContainer.AsyncMessageProcessingConsumer(consumer);
26            processors.add(processor);
27            getTaskExecutor().execute(processor);
28        }
29    }
30}

发送消息给MessageChannel

RocketInboundChannelAdapter实现了MessageProducer接口。它主要将SimpleRocketMessageListenerContainer传递过来的消息经过MessageTemplate传递给MessageChannel。

接下来则是RocketInboundChannelAdapter.Listener的实现,它就是RocketBlockingQueueConsumer.nextMessage函数中的messageListener。

 1public class Listener implements ChannelAwareMessageListener, RetryListener {
 2    public void onMessage(Message message, Channel channel) throws Exception {
 3        try {
 4            this.createAndSend(message, channel);
 5        } catch (RuntimeException var7) {
 6            if (RocketInboundChannelAdapter.this.getErrorChannel() == null) {
 7                throw var7;
 8            }
 9       RocketInboundChannelAdapter.this.getMessagingTemplate().send(RocketInboundChannelAdapter.this.getErrorChannel(), RocketInboundChannelAdapter.this.buildErrorMessage((org.springframework.messaging.Message)null, new ListenerExecutionFailedException("Message conversion failed", var7, message)));
10        }
11    }
12    private void createAndSend(Message message, Channel channel) {
13        org.springframework.messaging.Message<Object> messagingMessage = this.createMessage(message, channel);
14        RocketInboundChannelAdapter.this.sendMessage(messagingMessage);
15    }
16    private org.springframework.messaging.Message<Object> createMessage(Message message, Channel channel) {
17        Object payload = RocketInboundChannelAdapter.this.messageConverter.fromMessage(message);
18        org.springframework.messaging.Message<Object> messagingMessage = RocketInboundChannelAdapter.this.getMessageBuilderFactory().withPayload(payload).build();
19        return messagingMessage;
20    }
21}

RocketMQ的管理器

RocketProvisioningProvider实现了ProvisioningProvider接口,它有两个函数:provisionProducerDestination和provisionConsumerDestination,分别用于创建ProducerDestination和ConsumerDestination。RocketProvisioningProvider的实现类似于RabbitProvisioningProvider。只不过在声明队列,交换器和绑定时使用了RocketAdmin所实现的RocketMQ的相关API。

总结

本文概要介绍了Spring Cloud Stream的Rocketmq绑定器的实现,限于篇幅不展开具体的代码讲解。读者感兴趣,可以关注GitHub上的代码。根据Spring Cloud Stream抽象的接口,我们可以自由地实现各种消息队列的绑定器。

项目GitHub地址:https://github.com/ztelur/spring-cloud-stream-binder-rocket

原文发布于微信公众号 - aoho求索(aohoBlog)

原文发表时间:2018-07-01

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏开发与安全

linux网络编程之System V 消息队列(一):消息队列内核结构和msgget、msgctl 函数

一、消息队列 1、消息队列提供了一个从一个进程向另外一个进程发送一块数据的方法 2、每个数据块都被认为是有一个类型,接收者进程接收的数据块可以有不同的类型值 ...

2290
来自专栏Java技术栈

Java 必须掌握的 12 种 Spring 常用注解!

@Configuration 声明当前类为配置类,相当于xml形式的Spring配置(类上)

1472
来自专栏帅小子的日常

使用redis做缓存

7677
来自专栏IT 指南者专栏

Spring框架系列(二)之Bean的注解管理

微信公众号:compassblog 欢迎关注、转发,互相学习,共同进步! 有任何问题,请后台留言联系! 1、Spring中的两种容器 在系列(一)中我们已经知道...

3406
来自专栏用户2442861的专栏

Python日志输出——logging模块

http://blog.csdn.net/chosen0ne/article/details/7319306

1381
来自专栏Java3y

SpringMVC入门就这么简单

什么是SpringMVC? SpringMVC是Spring家族的一员,Spring是将现在开发中流行的组件进行组合而成的一个框架!它用在基于MVC的表现层开发...

6206
来自专栏java学习

Spring常用注解(收藏大全)

如果你是初学者,或者是自学者!你可以加小编微信(xxf960513)!小编可以给你学习上,工作上的一些建议以及可以给你(免费)提供学习资料!最重要我们还可以交个...

1022
来自专栏代码拾遗

深入理解Spring MVC

使用Spring Boot和web,thymeleaf的starter来设置初始工程。xml配置如下:

1082
来自专栏斑斓

Spray中的Authentication和JMeter测试

Spray Authentication 在Spray中,如果需要对REST API添加认证,可以使用Spray提供的Authenticate功能。本质上,Au...

3739
来自专栏Linyb极客之路

Spring Cloud开发注意事项

如果provider中需要引入其他feign client的接口,需在 provider的启动类添加注解 @EnableFeignClients(basePac...

3473

扫码关注云+社区

领取腾讯云代金券