SpringCloudStream
应用模型下图所示。Spring Cloud Stream由一个中间件中立的核组成。应用通过Spring Cloud Stream插入的input和output通道与外界交流。通道通过指定中间件的Binder实现与外部代理连接。
业务开发者不再关注具体消息中间件,只需关注Binder对应用程序提供的抽象概念来使用消息中间件实现业务即可。
通过定义绑定器作为中间层,实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的Channel通过,是的应用程序不需要再考虑各种不同的消息中间件的实现。当需要升级消息中间件,或者是更换其他消息中间件产品时,我们需要做的就是更换对应的Binder绑定器而不需要修改任何应用逻辑 。
目前只提供了RabbitMQ和Kafka的Binder实现
SpringCloudStream
的编程模型。 SpringCloudStream
提供了一系列的预先定义的注解来声明input和output channel。
你可以通过给一个应用的配置类(configuration class)添加 @EnableBinding
注解来将一个 Spring
应用转变成 SpringCloudStream
应用。@EnableBinding
注解本身[带了] @Configuration
元注解并且触发 SpringCloudStream
框架的配置。
...
@Import(...)
@Configuration
@EnableIntegration
public @interface EnableBinding {
...
Class<?>[] value() default {};
}
@EnableBinding
注解可以[携带]具有提供可绑定组件函数的接口类作为参数(比如说消息信道)。@EnableBinding
注解只能使用在你的 Configuration
类上,你可以尽可能多的提供你需要的接口作为该注解的参数,比如说 @EnableBinding(value={Order.class,Payment.class}
。Order
和 Payment
接口都可以声明 @Input
和 @Output
Channel。
SpringCloudStream
应用中,一个接口可以通过 @Input
和 @Output
函数来声明随意数目的input和output channels。
public interface Barista {
@Input
SubscribableChannel orders();
@Output
MessageChannel hotDrinks();
@Output
MessageChannel coldDrinks();
}
使用这个接口当作 @EnableBinding
的参数可以触发 SpringCloudStream
框架创建三个信道,名字分别为 orders
, hotDrinks
和 coldDrinks
。
@EnableBinding(Barista.class)
public class CafeConfiguration {
...
}
@Input
和 @Output
注解,你可以给每个信道一个自定义的名称,就像下面这个例子一样。
public interface Barista {
...
@Input("inboundOrders")
SubscribableChannel orders();
}
在这个例子,信道名称就是 inboundOrders
。
SpringCloudStream
提供了预先定义的三中接口来定义input channel和output channel。
Source
用来声明输出型channel。
public interface Source {
String OUTPUT = "output";
@Output(Source.OUTPUT)
MessageChannel output();
}
Sink
用来声明输入型channel。
public interface Sink {
String INPUT = "input";
@Input(Sink.INPUT)
SubscribableChannel input();
}
Processor
用来声明既可以输出也可以输入型的channel。
public interface Processor extends Source, Sink {
}
对于任何一个bound interface, SpringCloudStream
将会生成一个该接口的bean。调用这些bean的被 @Input
和 @Output
修饰的方法可以返回相应的bound channel。在下面例子中,当调用 SendingBean
对象的 hello
方法时会给output channel发送一个信息。它调用注入的 Source
bean来获取目标target。
@Component
public class SendingBean {
private Source source;
@Autowired
public SendingBean(Source source) {
this.source = source;
}
public void sayHello(String name) {
source.output().send(MessageBuilder.withPayload(name).build());
}
}
@Component
public class SendingBean {
private MessageChannel output;
@Autowired
public SendingBean(MessageChannel output) {
this.output = output;
}
public void sayHello(String name) {
output.send(MessageBuilder.withPayload(name).build());
}
}
如果在声明channel时自定义了channel的名称,那么这个名称将会替换方法的名称,在注入时发挥作用。比如下面这个例子:
public interface CustomSource {
...
@Output("customOutput")
MessageChannel output();
}
这个channel可以通过下面这个方式来进行注入。
@Component
public class SendingBean {
private MessageChannel output;
@Autowired
public SendingBean(@Qualifier("customOutput") MessageChannel output) {
this.output = output;
}
public void sayHello(String name) {
this.output.send(MessageBuilder.withPayload(name).build());
}
}
SpringIntegration
注解或者 SpringCloudStream
的 @StreamListener
注解来编写 SpringCloudStream
应用。@StreamListener
注解基于 SpringMessaging
注解来建模(比如说 @MessageMapping
, @JmsListener
, @RabbitListener
)。除此之外,该注解添加了content类型管理和类型强制特性。
因为 SpringCloudStream
是基于 SpringIntegration
,Stream完全继承了Integration的架构和基础组件。比如说,你可以把 Source
的output channel绑定到 MessageSource
:
@EnableBinding(Source.class)
public class TimerSource {
@Value("${format}")
private String format;
@Bean
@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "${fixedDelay}", maxMessagesPerPoll = "1"))
public MessageSource<String> timerMessageSource() {
return () -> new GenericMessage<>(new SimpleDateFormat(format).format(new Date()));
}
}
或者你也可以通过transformer来使用一个processor channel。
@EnableBinding(Processor.class)
public class TransformProcessor {
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public Object transform(String message) {
return message.toUpperCase();
}
}
作为Spring Integration的补充, SpringCloudStream
提供了它自己的 @StreamListener
注解,该注解基于Spring Messaging注解(比如说 @MessageMapping
, @JmsListener
, @RabbitListener
)。@StreamListener
注解提供了处理inbound message的更加简便的模型。 SpringCloudStream
提供了可扩展的 MessageConverter
机制来处理数据转化,并将转化后的数据分配给相应的被 @StreamListener
修饰的方法。下面这个例子展示了一个处理外部 Vote
事件的应用。
@EnableBinding(Sink.class)
public class VoteHandler {
@Autowired
VotingService votingService;
@StreamListener(Sink.INPUT)
public void handle(Vote vote) {
votingService.record(vote);
}
}
@StreamListener
和Spring Integration的 @ServiceActivator
的区别可以在下面这个例子中展现。一个inbound的 Message
对象有一个string类型的payload和一个值为 application/json
的 contentType
。在使用 @StreamListener
时, MessageConverter
原理会使用 contentType
来解析 String
payload并赋值给 Vote
对象。 就像其他的Spring Messaging方法一样,被 @StreamListener
注解的方法的参数可以使用 @Payload
和 @Headers
和 @Header
进行注解。 对于会返回数据的方法,你必须使用 @SendTo
注解来指定该返回数据发送到哪个output channel。
@EnableBinding(Processor.class)
public class TransformProcessor {
@Autowired
VotingService votingService;
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public VoteResult handle(Vote vote) {
return votingService.record(vote);
}
}
SpringCloudStream
支持将消息分配到多个 @StreamListener
修饰的方法。 为了能使用该分配机制,一个方法必须首先满足下列条件:
使用注解的 condition
属性中的SpEL表达式可以首先上述的消息分配机制。所有匹配了该 condition
的方法都会在同一个线程中被调用,但是方法调用相对顺序不能保证。 下面就是一个 @StreamListener
分配消息的例子。在这个例子中,所有携带值为 foo
的 type
头部的消息都会被分配给 receiveFoo
方法,所有携带值为 bar
的 type
头部的消息都会被分配给 receiveBar
方法。
@EnableBinding(Sink.class)
@EnableAutoConfiguration
public static class TestPojoWithAnnotatedArguments {
@StreamListener(target = Sink.INPUT, condition = "headers['type']=='foo'")
public void receiveFoo(@Payload FooPojo fooPojo) {
// handle the message
}
@StreamListener(target = Sink.INPUT, condition = "headers['type']=='bar'")
public void receiveBar(@Payload BarPojo barPojo) {
// handle the message
}
}
Stream
可以声明自定义的channel,通过使用 @Input
和 @Output
来声明channel的名称和方向。@Input
是声明输入方向的channel,而 @Output
是声明输出方向的channel。
public interface MessageInput {
String INPUT_MESSAGE = "input";
@Input(INPUT_MESSAGE)
SubscribableChannel inputMessagefunction();
}
public interface MessageOutput {
String OUTPUT_MESSAGE = "output";
@Output(OUTPUT_MESSAGE)
MessageChannel outputMessagefunction();
}
@EnableBinding
注解用于声明某个类所需要绑定的channel实例,其 value
属性值是需要绑定的channel的定义类。
使用者首先需要使用@EnableBinding注解实现对消息通道的绑定,该注解中还传入了一个参数 MessageInput.class
, MessageInput
是一个接口,该接口是对输入消息通道绑定的定义。然后在 InputController
类中定义了 listener
方法,并在该方法上添加了 @StreamListener
注解,该注解表示该方法为消息中间件上数据流的事件监听器, MessageInput.INPUT_MESSAGE
参数表示这是input消息通道上的监听处理器。
@EnableBinding(value = MessageInput.class)
public class InputController {
@StreamListener(MessageInput.INPUT_MESSAGE)
public void listener(Message message) {}
}
而输出型channel只需要使用 @Autowired
注解进行自动注入。然后就可以使用实例的函数进行消息的发送。
@EnableBinding(value = MessageOutput.class)
public class OutputController {
@Autowired
private MessageOutput messageOutput;
public void sendMessage() {
messageOutput.outputMessagefunction().send(MessageBuilder.withPayload("Message"));
}
}
stream
通过 application.yml
进行配置,比如说上述代码中 MessageInput
和 MessageOutput
接口分别使用 @Input
和 @Output
定义了输入和输出的消息通道的绑定信息。一个是 input
,一个是 output
。配置文件中的bindings字段就对应上述的绑定信息,比如说下面的配置文件中,bindings字段下一共有两个binding配置,分别是input和output,与代码中的名称一致。content-type
表明binding接受或者发送消息的类型, binder
则声明该binding所对应的绑定器。
binders
字段声明了项目中所有的绑定器信息,由于 stream
支持多种消息队列,所以将与消息队列交互的实现抽象成 Binder
,不同的 Binder
对应不同的消息队列。type
就是指明绑定器的类型,比如说rabbit或者kafka。environment
中是配置了与绑定器交互的消息队列的基本信息,比如说网络信息,认证信息,分区信息等。
// application.yml
cloud:
stream:
bindings:
output:
content-type: application/x-java-object;type=com.example.demo.entity.Message
destination: msg
binder: rabbit1
input:
content-type: application/x-java-object;type=com.example.demo.entity.Message
destination: msg
binder: rabbit1
binders:
rabbit1:
type: rabbit
environment:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: stream
password: stream
virtual-host: /sc