我们知道,当微服务越来越来多的时候,仅仅是feign的http调用方式已经满足不了我们的使用场景了。这个时候系统就需要接入消息中间件了。相比较于传统的Spring项目、SpringBoot项目使用消息中间件的很多配置不同,SpringCloud Stream抽象了中间件产品的不同,在SpringCloud中你仅仅需要修改几行配置文件就可以灵活的切换中间件产品而不需要修改任何代码。
现在我们以SpringCloud Stream整合RabbitMQ为例来学习一下
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
spring:
cloud:
stream:
binders:
test:
type: rabbit
environment:
spring:
rabbitmq:
addresses: 10.0.20.132
port: 5672
username: root
password: root
virtual-host: /unicode-pay
bindings:
testOutPut:
destination: testRabbit
content-type: application/json
default-binder: test
现在来解释一下这些配置的含义
public interface MqMessageSource {
String TEST_OUT_PUT = "testOutPut";
@Output(TEST_OUT_PUT)
MessageChannel testOutPut();
}
这个通道的名字就是上方binding的名字
@EnableBinding(MqMessageSource.class)
public class MqMessageProducer {
@Autowired
@Output(MqMessageSource.TEST_OUT_PUT)
private MessageChannel channel;
public void sendMsg(String msg) {
channel.send(MessageBuilder.withPayload(msg).build());
System.err.println("消息发送成功:"+msg);
}
}
这里就是使用上方的通道来发送到指定的交换机了。需要注意的是withPayload方法你可以传入任何类型的对象,但是需要实现序列化接口
EnableBinding注解绑定的类默认是被Spring管理的,我们可以在controller中注入它
@Autowired
private MqMessageProducer mqMessageProducer;
@GetMapping(value = "/testMq")
public String testMq(@RequestParam("msg")String msg){
mqMessageProducer.sendMsg(msg);
return "发送成功";
}
生产者的代码到此已经完成了。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
spring:
cloud:
stream:
binders:
test:
type: rabbit
environment:
spring:
rabbitmq:
addresses: 10.0.20.132
port: 5672
username: root
password: root
virtual-host: /unicode-pay
bindings:
testInPut:
destination: testRabbit
content-type: application/json
default-binder: test
这里与生产者唯一不同的地方就是testIntPut了,相信你已经明白了,它是binding的名字,也是通道与交换机绑定的关键
public interface MqMessageSource {
String TEST_IN_PUT = "testInPut";
@Input(TEST_IN_PUT)
SubscribableChannel testInPut();
}
@EnableBinding(MqMessageSource.class)
public class MqMessageConsumer {
@StreamListener(MqMessageSource.TEST_IN_PUT)
public void messageInPut(Message<String> message) {
System.err.println(" 消息接收成功:" + message.getPayload());
}
}
这个时候启动Eureka、消息生产者和消费者,然后调用生产者的接口应该就可以接受到来自mq的消息了。