前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >「 从0到1学习微服务SpringCloud 」08 构建消息驱动微服务的框架 Spring Cloud Stream

「 从0到1学习微服务SpringCloud 」08 构建消息驱动微服务的框架 Spring Cloud Stream

作者头像
KEN DO EVERTHING
发布2019-04-24 15:30:48
4890
发布2019-04-24 15:30:48
举报
文章被收录于专栏:KEN DO EVERTHING

简介

官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。 简单来说,它就是用来与消息中间件进行交互的,我们不需要直接对消息中间件进行操作,而是通过Spring Cloud Stream,从而简化了对中间件的操作,并进行了解耦(想要更换消息中间件时,无需更改代码)。

应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream 中binder 交互,而 Spring Cloud Stream 的 binder 负责与中间件交互。 所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。最大的好处莫过于对中间件的再次封装,可以做到代码层面对消息中间件的无感知,甚至于动态的切换中间件。

目前Stream只提供了RabiitMq和Kafka的binder,若要使用其他的消息中间件,需要自己自定义binder。

基本使用

消费者

1.新建一个项目, micro-service1用于接收消息,作为eureka client,增加mq,stream-mq的maven,修改相关配置等不再累述,与之前一样

2.定义一个接口,将input绑定名为"input"的消息通道

代码语言:javascript
复制
public interface Receiver {
    //消息通道名称
    String INPUT = "input";

    //绑定可订阅的通道
    @Input(INPUT)
    SubscribableChannel input();
}

3.定义Stream接收类

代码语言:javascript
复制
@Component
//@EnableBinding注解可以接收一个或多个接口类作为对象
// 声明绑定的消息通到,实现与消息代理的连接
@EnableBinding(Receiver.class)
@Log4j2
public class StreamReceiver {

    //监听binding的input
    @StreamListener(Receiver.INPUT)
    //message为接收到信息消息
    public void input(Message<String> message){
        log.info("StreamReceiver: {}", message.getPayload());
    }
}

启动,默认是会创建一个临时队列,临时队列绑定的exchange为 “input” 所有发送 exchange 为“input” 的MQ消息都会被投递到这个临时队列,并通过上述方法接收。

以上代码就完成了最基本的消费者部分。

生产者

1.新建一个项目, micro-service2用于发送消息,具体步骤步骤累述

2.定义一个接口,,将output绑定名为"input"的消息通道

代码语言:javascript
复制
public interface Sender {
    //消息通道名称
    String OUTPUT = "input";

    @Output(OUTPUT)
    MessageChannel output();
}

3.定义Stream发送类Controller

代码语言:javascript
复制
@RestController
@EnableBinding(Sender.class)
@Log4j2
public class SendController {
    @Autowired
    @Qualifier(Sender.OUTPUT)
    MessageChannel output;

    @GetMapping("send")
    public void send(){
        String message = "Hello! I am Stream Message!";
        log.info("发送Stream消息: {}",message);
        output.send(MessageBuilder.withPayload(message).build());
    }
}

以上代码就完成了最基本的消费者部分。

启动后,调用/send接口,可看到收发消息成功的日志

消息分组

当消费者集群部署时,它们当中应当只有一个能接受到消息。但按照现在的配置,每个消费者都能收到消息,我们来看看。

1.启动两个micro-service1,设置不同接口 2.调用/send接口,两个应用均能收到消息

显然这是不合理,这里就需要用到消息分组

3.在micro-service1应用中添加Stream分组配置

代码语言:javascript
复制
cloud:
    stream:
      bindings:
        #为input消息通道添加分组
        input:
          group: testGroup

4.启动两个micro-service1,调用/send接口。现在,发送一条信息,只能在其中一个应用中接收到消息,两个应用轮训接收。

Spring Cloud Stream的简单使用讲解就到这里了,下期再见啦~

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-03-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 java从心 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 简介
  • 基本使用
相关产品与服务
消息队列 TDMQ
消息队列 TDMQ (Tencent Distributed Message Queue)是腾讯基于 Apache Pulsar 自研的一个云原生消息中间件系列,其中包含兼容Pulsar、RabbitMQ、RocketMQ 等协议的消息队列子产品,得益于其底层计算与存储分离的架构,TDMQ 具备良好的弹性伸缩以及故障恢复能力。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档