前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【RabbitMq 篇二】-RabbitMq 发送与消费

【RabbitMq 篇二】-RabbitMq 发送与消费

作者头像
胖虎
发布2019-06-26 17:16:30
1.2K0
发布2019-06-26 17:16:30
举报
文章被收录于专栏:晏霖晏霖

前言

本文介绍RabbitMq各个消息类型,以及用使用Fanout 类型进行消息的发送和消费,让大家对RabbitMq有一个简单的认识。

正文

使用所有框架和中间件的版本

框架

版本

Spring Boot

2.1.5.RELEASE

RabbitMq

3.7.15

JDK

1.8.0_144

Erlang

22.0.2

Exchange类型

RabbitMQ有四种交换机类型,分别是Direct exchange、Fanout exchange、Topic exchange、Headers exchange。

Direct Exchange

处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “abc”,则只有被标记为“abc”的消息才被转发,不会转发abc.def,也不会转发dog.ghi,只会转发abc。

如图A、B、C都绑定在同一个交换器上

Topic exchange

此类型exchange和direct类型差不多,但direct类型要求routingkey完全相等,有局限性,但是在Topic这里的routingkey可以有通配符:'*','#'. ,相比Direct类型来说就放宽了命中的队列,对Direct 类型进行了扩展。 其中'*'表示仅匹配一个单词, '#'则表示匹配零个或一个和或者多个单词。例如 “abc.#” 能匹配“abc.def.ghi” 和 “abc.def” ,但是 “abc.*” 只能匹配到 “abc.def”

如图 A、B、C都绑定在同一个交换器上

Fanout exchange

广播类型没有路由key的概念,因为它的原理是把生产者要发送到队列里的数据给存在当前信道里的每一个队列都发一份,一模一样的复制到每个队列,也就是说,只要是当前交换器绑定的队列就都可以收到消息。

如图A、B、C都绑定在同一个交换器上

Headers exchange

Headers方式是在绑定对列的时候将匹配的条件以字典型数据当参数传入,然后在生产的时候再将要匹配的条件以字典型数据当参数传入来进行匹配。此类型基本不会有用到,不做讲解。

以上就是exchange 类型的总结,一般来说direct和topic用来具体的路由消息,如果要用广播的消息一般用fanout的exchange。

Fanout Demo

读前须知: 一般的,我们使用分布式或微服务的形式开发,所有的服务都会独立出来,为了让各个服务降低耦合性,我们一般会把Model单独拉出来,让各个服务依赖他,如果加了中间件服务的话,也会让我们中间件服务依赖 model,达到与业务服务解耦。

所以本文案例创建了三个工程,分别是一个商品服务、消费者、模型

既然Spring已经为我们封装好了API 那我们就用他们封装的,省的自己还要创建连接和信道,代码好多,好烦,如果有个性化设置需要自己设置参数的,以后的章节我们会介绍。

Maven依赖

代码语言:javascript
复制
<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

RabbitMq配置

代码语言:javascript
复制
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=yanlin
spring.rabbitmq.password=yanlin

生产者

代码语言:javascript
复制
@RestController
public class GoodsController {

    @Autowired
    private AmqpTemplate amqpTemplate;

    @GetMapping("/test/{id}")
    public String goodsTest(@PathVariable Integer id) {
        Order order = new Order(id, "xx");
        amqpTemplate.convertAndSend("GOODS", "", order);
        System.out.println(order);
        return "成功";
    }

}

消费者监听

代码语言:javascript
复制
@Configuration
public class GoodsChangeListener {

    private static final String queue = "goods-change";


    @Bean
    public Queue goodsChangeQueue() {
        return new Queue(queue);
    }


    @Bean
    FanoutExchange goodsChangeExchange() {
        return new FanoutExchange("GOODS");// 配置广播路由器
    }

    //交换器绑定队列
    @Bean
    Binding bindingExchangeGoodsChange(Queue goodsChangeQueue, FanoutExchange goodsChangeExchange) {
        return BindingBuilder.bind(goodsChangeQueue).to(goodsChangeExchange);
    }


}
代码语言:javascript
复制

消费者

代码语言:javascript
复制
@Service
public class GoodsConsumer  {
    @RabbitListener(queues = "goods-change")
    @RabbitHandler
    public void goodsChange( Order order) {
        System.out.println("消费消息,商品名:" + order.getName());
    }

}

需要注意的是,大家在发送消息体的时候如果传递的是对象,那么需要注意你必须要实现序列化接口生成序列化ID,而且对象需要时同一个,类名一样不叫同一个对象,因为内存地址是不同的,所以前面说到要把model单独拉出来。否则会报错,笔者亲身测试。

正确输出内容:

这样,一个简单的生产和消费就完成了。

项目如图

github地址 https://github.com/362460453/rabbitMQ-demo

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-06-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 晏霖 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 正文
  • Exchange类型
    • Direct Exchange
      • Topic exchange
        • Fanout exchange
          • Headers exchange
          • Fanout Demo
            • Maven依赖
              • RabbitMq配置
                • 生产者
                  • 消费者监听
                    • 消费者
                    相关产品与服务
                    文件存储
                    文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档