专栏首页程序员历小冰Spring Cloud Stream 基础应用实战

Spring Cloud Stream 基础应用实战

本文摘自笔者出版的书籍《Spring Cloud 微服务架构进阶》

SpringCloudStream应用模型下图所示。Spring Cloud Stream由一个中间件中立的核组成。应用通过Spring Cloud Stream插入的input和output通道与外界交流。通道通过指定中间件的Binder实现与外部代理连接。

业务开发者不再关注具体消息中间件,只需关注Binder对应用程序提供的抽象概念来使用消息中间件实现业务即可。

通过定义绑定器作为中间层,实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的Channel通过,是的应用程序不需要再考虑各种不同的消息中间件的实现。当需要升级消息中间件,或者是更换其他消息中间件产品时,我们需要做的就是更换对应的Binder绑定器而不需要修改任何应用逻辑 。

目前只提供了RabbitMQ和Kafka的Binder实现

本小节主要讲述 SpringCloudStream的编程模型。 SpringCloudStream提供了一系列的预先定义的注解来声明input和output channel。

你可以通过给一个应用的配置类(configuration class)添加 @EnableBinding注解来将一个 Spring应用转变成 SpringCloudStream应用。@EnableBinding注解本身[带了] @Configuration元注解并且触发 SpringCloudStream框架的配置。

...@Import(...)@Configuration@EnableIntegrationpublic @interface EnableBinding {    ...    Class<?>[] value() default {};}

@EnableBinding注解可以[携带]具有提供可绑定组件函数的接口类作为参数(比如说消息信道)。@EnableBinding注解只能使用在你的 Configuration类上,你可以尽可能多的提供你需要的接口作为该注解的参数,比如说 @EnableBinding(value={Order.class,Payment.class}OrderPayment接口都可以声明 @Input@OutputChannel。

SpringCloudStream应用中,一个接口可以通过 @Input@Output函数来声明随意数目的input和output channels。

public interface Barista {
    @Input    SubscribableChannel orders();
    @Output    MessageChannel hotDrinks();
    @Output    MessageChannel coldDrinks();}

使用这个接口当作 @EnableBinding的参数可以触发 SpringCloudStream框架创建三个信道,名字分别为 orders, hotDrinkscoldDrinks

@EnableBinding(Barista.class)public class CafeConfiguration {   ...}

使用 @Input@Output注解,你可以给每个信道一个自定义的名称,就像下面这个例子一样。

public interface Barista {    ...    @Input("inboundOrders")    SubscribableChannel orders();}

在这个例子,信道名称就是 inboundOrders

SpringCloudStream提供了预先定义的三中接口来定义input channel和output channel。

Source用来声明输出型channel。

public interface Source {
  String OUTPUT = "output";
  @Output(Source.OUTPUT)  MessageChannel output();
}

Sink用来声明输入型channel。

public interface Sink {
  String INPUT = "input";
  @Input(Sink.INPUT)  SubscribableChannel input();
}

Processor用来声明既可以输出也可以输入型的channel。

public interface Processor extends Source, Sink {}

对于任何一个bound interface, SpringCloudStream将会生成一个该接口的bean。调用这些bean的被 @Input@Output修饰的方法可以返回相应的bound channel。在下面例子中,当调用 SendingBean对象的 hello方法时会给output channel发送一个信息。它调用注入的 Sourcebean来获取目标target。

@Componentpublic class SendingBean {
    private Source source;
    @Autowired    public SendingBean(Source source) {        this.source = source;    }
    public void sayHello(String name) {         source.output().send(MessageBuilder.withPayload(name).build());    }}
Bound channels也可以直接被注入。
@Componentpublic class SendingBean {
    private MessageChannel output;
    @Autowired    public SendingBean(MessageChannel output) {        this.output = output;    }
    public void sayHello(String name) {         output.send(MessageBuilder.withPayload(name).build());    }}

如果在声明channel时自定义了channel的名称,那么这个名称将会替换方法的名称,在注入时发挥作用。比如下面这个例子:

public interface CustomSource {    ...    @Output("customOutput")    MessageChannel output();}

这个channel可以通过下面这个方式来进行注入。

@Componentpublic class SendingBean {
    private MessageChannel output;
    @Autowired    public SendingBean(@Qualifier("customOutput") MessageChannel output) {        this.output = output;    }
    public void sayHello(String name) {         this.output.send(MessageBuilder.withPayload(name).build());    }}

你可以通过使用 SpringIntegration注解或者 SpringCloudStream@StreamListener注解来编写 SpringCloudStream应用。@StreamListener注解基于 SpringMessaging注解来建模(比如说 @MessageMapping@JmsListener@RabbitListener)。除此之外,该注解添加了content类型管理和类型强制特性。

Spring Integration支持

因为 SpringCloudStream是基于 SpringIntegration,Stream完全继承了Integration的架构和基础组件。比如说,你可以把 Source的output channel绑定到 MessageSource

@EnableBinding(Source.class)public class TimerSource {
  @Value("${format}")  private String format;
  @Bean  @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "${fixedDelay}", maxMessagesPerPoll = "1"))  public MessageSource<String> timerMessageSource() {    return () -> new GenericMessage<>(new SimpleDateFormat(format).format(new Date()));  }}

或者你也可以通过transformer来使用一个processor channel。

@EnableBinding(Processor.class)public class TransformProcessor {  @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)  public Object transform(String message) {    return message.toUpperCase();  }}
使用@StreamListener

作为Spring Integration的补充, SpringCloudStream提供了它自己的 @StreamListener注解,该注解基于Spring Messaging注解(比如说 @MessageMapping@JmsListener@RabbitListener)。@StreamListener注解提供了处理inbound message的更加简便的模型。  SpringCloudStream提供了可扩展的 MessageConverter机制来处理数据转化,并将转化后的数据分配给相应的被 @StreamListener修饰的方法。下面这个例子展示了一个处理外部 Vote事件的应用。

@EnableBinding(Sink.class)public class VoteHandler {
  @Autowired  VotingService votingService;
  @StreamListener(Sink.INPUT)  public void handle(Vote vote) {    votingService.record(vote);  }}

@StreamListener和Spring Integration的 @ServiceActivator的区别可以在下面这个例子中展现。一个inbound的 Message对象有一个string类型的payload和一个值为 application/jsoncontentType。在使用 @StreamListener时, MessageConverter原理会使用 contentType来解析 Stringpayload并赋值给 Vote对象。 就像其他的Spring Messaging方法一样,被 @StreamListener注解的方法的参数可以使用 @Payload@Headers@Header进行注解。 对于会返回数据的方法,你必须使用 @SendTo注解来指定该返回数据发送到哪个output channel。

@EnableBinding(Processor.class)public class TransformProcessor {
  @Autowired  VotingService votingService;
  @StreamListener(Processor.INPUT)  @SendTo(Processor.OUTPUT)  public VoteResult handle(Vote vote) {    return votingService.record(vote);  }}
使用@StreamListener来分配消息

SpringCloudStream支持将消息分配到多个 @StreamListener修饰的方法。 为了能使用该分配机制,一个方法必须首先满足下列条件:

  • 方法不能有返回值。
  • 方法必须是单独一类消息的处理函数(响应式编程的方法并不支持)

使用注解的 condition属性中的SpEL表达式可以首先上述的消息分配机制。所有匹配了该 condition的方法都会在同一个线程中被调用,但是方法调用相对顺序不能保证。 下面就是一个 @StreamListener分配消息的例子。在这个例子中,所有携带值为 footype头部的消息都会被分配给 receiveFoo方法,所有携带值为 bartype头部的消息都会被分配给 receiveBar方法。

@EnableBinding(Sink.class)@EnableAutoConfigurationpublic static class TestPojoWithAnnotatedArguments {
    @StreamListener(target = Sink.INPUT, condition = "headers['type']=='foo'")    public void receiveFoo(@Payload FooPojo fooPojo) {       // handle the message    }
    @StreamListener(target = Sink.INPUT, condition = "headers['type']=='bar'")    public void receiveBar(@Payload BarPojo barPojo) {       // handle the message    }}

代码实例

Stream可以声明自定义的channel,通过使用 @Input@Output来声明channel的名称和方向。@Input是声明输入方向的channel,而 @Output是声明输出方向的channel。

public interface MessageInput {    String INPUT_MESSAGE = "input";    @Input(INPUT_MESSAGE)    SubscribableChannel inputMessagefunction();}
public interface MessageOutput {    String OUTPUT_MESSAGE = "output";    @Output(OUTPUT_MESSAGE)    MessageChannel outputMessagefunction();}

@EnableBinding注解用于声明某个类所需要绑定的channel实例,其 value属性值是需要绑定的channel的定义类。

使用者首先需要使用@EnableBinding注解实现对消息通道的绑定,该注解中还传入了一个参数 MessageInput.classMessageInput是一个接口,该接口是对输入消息通道绑定的定义。然后在 InputController类中定义了 listener方法,并在该方法上添加了 @StreamListener注解,该注解表示该方法为消息中间件上数据流的事件监听器, MessageInput.INPUT_MESSAGE参数表示这是input消息通道上的监听处理器。

@EnableBinding(value = MessageInput.class)public class InputController {    @StreamListener(MessageInput.INPUT_MESSAGE)    public void listener(Message message) {}}

而输出型channel只需要使用 @Autowired注解进行自动注入。然后就可以使用实例的函数进行消息的发送。

@EnableBinding(value = MessageOutput.class)public class OutputController {    @Autowired    private MessageOutput messageOutput;    public void sendMessage() {        messageOutput.outputMessagefunction().send(MessageBuilder.withPayload("Message"));    }}

stream通过 application.yml进行配置,比如说上述代码中 MessageInputMessageOutput接口分别使用 @Input@Output定义了输入和输出的消息通道的绑定信息。一个是 input,一个是 output。配置文件中的bindings字段就对应上述的绑定信息,比如说下面的配置文件中,bindings字段下一共有两个binding配置,分别是input和output,与代码中的名称一致。content-type表明binding接受或者发送消息的类型, binder则声明该binding所对应的绑定器。

binders字段声明了项目中所有的绑定器信息,由于 stream支持多种消息队列,所以将与消息队列交互的实现抽象成 Binder,不同的 Binder对应不同的消息队列。type就是指明绑定器的类型,比如说rabbit或者kafka。environment中是配置了与绑定器交互的消息队列的基本信息,比如说网络信息,认证信息,分区信息等。

// application.ymlcloud:  stream:    bindings:      output:        content-type: application/x-java-object;type=com.example.demo.entity.Message        destination: msg        binder: rabbit1      input:        content-type: application/x-java-object;type=com.example.demo.entity.Message        destination: msg        binder: rabbit1    binders:      rabbit1:        type: rabbit        environment:          spring:            rabbitmq:              host: 127.0.0.1              port: 5672              username: stream              password: stream              virtual-host: /sc

本文分享自微信公众号 - 程序员历小冰(gh_bcc90a2a52c5)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-08-27

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • SpringBoot系列之@Conditional注解用法简介

    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 ...

    用户1208223
  • 从零搭建一个 Spring Boot 开发环境!Spring Boot+Mybatis+Swagger2 环境搭建

    Spring Boot 相对于传统的SSM框架的优点是提供了默认的样板化配置,简化了Spring应用的初始搭建过程,如果你不想被众多的xml配置文件困扰,可以考...

    Java团长
  • 源码学习系列之SpringBoot自动配置(篇二)

    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 ...

    用户1208223
  • 外行人都能看懂的WebFlux,错过了血亏

    如果有关注我公众号文章的同学就会发现,最近我不定时转发了一些比较好的WebFlux的文章,因为我最近在学。

    Rude3Knife的公众号
  • IDEA Debug 无法进入断点的解决方法

    某个多模块项目中使用多个版本的 Spring,如 Spring 4,Spring 5,在使用 IDEA Debug 过程中发现,Spring 部分 jar 如 ...

    andyxh
  • 原创 | Filter、Interceptor和Aspect对请求的拦截,有什么不同?

    注意:关于filterChain.doFilter(request,response,filterChain),执行filterChain.doFilter的意...

    黄泽杰
  • Spring Boot Redis 入门(上)

    摘要: 原创出处 http://www.iocoder.cn/Spring-Boot/Redis/ 「芋道源码」欢迎转载,保留摘要,谢谢!

    芋道源码
  • 用 RSocket 解决响应式服务之间的的通讯-Part 3:基于 RSocket 进行抽象

    如果你看过本系列的前两篇文章,应该已经已经发现 RSocket 提供了一些底层的 API。可以直接使用交互模型中的方法进行操作,而且可以没有任何限制来回发送帧。...

    涤生
  • SpringBoot系列之集成logback实现日志打印(篇二)

    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 ...

    用户1208223
  • Spring-boot+Mybatis+Maven+MySql搭建实例

    我习惯于先创建好maven项目,构建目录再导入到编译器中,这样的好处就是搭建好一个脚手架模板,后面改改参数就可以用到各个工程里面。

    掌上编程

扫码关注云+社区

领取腾讯云代金券