前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >SpringCloud-Stream整合RabbitMQ

SpringCloud-Stream整合RabbitMQ

作者头像
Java学习录
发布2019-04-18 14:39:32
2.9K0
发布2019-04-18 14:39:32
举报
文章被收录于专栏:Java学习录Java学习录

我们知道,当微服务越来越来多的时候,仅仅是feign的http调用方式已经满足不了我们的使用场景了。这个时候系统就需要接入消息中间件了。相比较于传统的Spring项目、SpringBoot项目使用消息中间件的很多配置不同,SpringCloud Stream抽象了中间件产品的不同,在SpringCloud中你仅仅需要修改几行配置文件就可以灵活的切换中间件产品而不需要修改任何代码。

现在我们以SpringCloud Stream整合RabbitMQ为例来学习一下

创建生产者

1. 引入依赖
代码语言:javascript
复制
<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
2. 定义配置文件
代码语言:javascript
复制
spring:
  cloud:
    stream:
      binders:
        test:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                addresses: 10.0.20.132
                port: 5672
                username: root
                password: root
                virtual-host: /unicode-pay
      bindings:
        testOutPut:
          destination: testRabbit
          content-type: application/json
          default-binder: test

现在来解释一下这些配置的含义

  1. binders: 这是一组binder的集合,这里配置了一个名为test的binder,这个binder中是包含了一个rabbit的连接信息
  2. bindings:这是一组binding的集合,这里配置了一个名为testOutPut的binding,这个binding中配置了指向名test的binder下的一个交换机testRabbit。
  3. 扩展: 如果我们项目中不仅集成了rabbit还集成了kafka那么就可以新增一个类型为kafka的binder、如果项目中会使用多个交换机那么就使用多个binding,
3.创建通道
代码语言:javascript
复制
public interface  MqMessageSource {
    String TEST_OUT_PUT = "testOutPut";
    @Output(TEST_OUT_PUT)
    MessageChannel testOutPut();
}

这个通道的名字就是上方binding的名字

4. 发送消息
代码语言:javascript
复制
@EnableBinding(MqMessageSource.class)
public class MqMessageProducer {
    @Autowired
    @Output(MqMessageSource.TEST_OUT_PUT)
    private MessageChannel channel;
    public void sendMsg(String msg) {
        channel.send(MessageBuilder.withPayload(msg).build());
        System.err.println("消息发送成功:"+msg);
    }
}

这里就是使用上方的通道来发送到指定的交换机了。需要注意的是withPayload方法你可以传入任何类型的对象,但是需要实现序列化接口

5. 创建测试接口

EnableBinding注解绑定的类默认是被Spring管理的,我们可以在controller中注入它

代码语言:javascript
复制
@Autowired
private MqMessageProducer mqMessageProducer;
@GetMapping(value = "/testMq")
public String testMq(@RequestParam("msg")String msg){
    mqMessageProducer.sendMsg(msg);
    return "发送成功";
}

生产者的代码到此已经完成了。

创建消费者

1. 引入依赖
代码语言:javascript
复制
<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
2. 定义配置文件
代码语言:javascript
复制
spring:
  cloud:
    stream:
      binders:
        test:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                addresses: 10.0.20.132
                port: 5672
                username: root
                password: root
                virtual-host: /unicode-pay
      bindings:
        testInPut:
          destination: testRabbit
          content-type: application/json
          default-binder: test

这里与生产者唯一不同的地方就是testIntPut了,相信你已经明白了,它是binding的名字,也是通道与交换机绑定的关键

3.创建通道
代码语言:javascript
复制
public interface  MqMessageSource {
    String TEST_IN_PUT = "testInPut";
    @Input(TEST_IN_PUT)
    SubscribableChannel testInPut();
}
4. 接受消息
代码语言:javascript
复制
@EnableBinding(MqMessageSource.class)
public class MqMessageConsumer {
    @StreamListener(MqMessageSource.TEST_IN_PUT)
    public void messageInPut(Message<String> message) {
        System.err.println(" 消息接收成功:" + message.getPayload());
    }
}

这个时候启动Eureka、消息生产者和消费者,然后调用生产者的接口应该就可以接受到来自mq的消息了。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-03-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Java学习录 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 创建生产者
    • 1. 引入依赖
      • 2. 定义配置文件
        • 3.创建通道
          • 4. 发送消息
            • 5. 创建测试接口
            • 创建消费者
              • 1. 引入依赖
                • 2. 定义配置文件
                  • 3.创建通道
                    • 4. 接受消息
                    相关产品与服务
                    消息队列 TDMQ
                    消息队列 TDMQ (Tencent Distributed Message Queue)是腾讯基于 Apache Pulsar 自研的一个云原生消息中间件系列,其中包含兼容Pulsar、RabbitMQ、RocketMQ 等协议的消息队列子产品,得益于其底层计算与存储分离的架构,TDMQ 具备良好的弹性伸缩以及故障恢复能力。
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档