前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【RabbitMQ】重识

【RabbitMQ】重识

原创
作者头像
后端码匠
发布2023-11-12 11:06:01
2210
发布2023-11-12 11:06:01
举报
文章被收录于专栏:后端码匠

简介

RabbitMQ 是实现 AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 RabbitMQ 主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层。保存这个数据。

协议

AMQP,即 Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。AMQP 的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

RabbitMQ 是一个开源的 AMQP 实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

相关概念

通常我们谈到队列服务, 会有三个概念: 发消息者、队列、收消息者,RabbitMQ 在这个基本概念之上, 多做了一层抽象, 在发消息者和 队列之间, 加入了交换器 (Exchange). 这样发消息者和队列就没有直接联系, 转而变成发消息者把消息给交换器, 交换器根据调度策略再把消息再给队列。

左侧 P 代表 生产者,也就是往 RabbitMQ 发消息的程序。

中间即是 RabbitMQ,其中包括了 交换机 和 队列。

右侧 C 代表 消费者,也就是往 RabbitMQ 拿消息的程序。那么,其中比较重要的概念有 4 个,分别为:虚拟主机,交换机,队列,和绑定。

虚拟主机:一个虚拟主机持有一组交换机、队列和绑定。为什么需要多个虚拟主机呢?很简单, RabbitMQ 当中,用户只能在虚拟主机的粒度进行权限控制。 因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。每一个 RabbitMQ 服务器都有一个默认的虚拟主机“/”。

交换机:Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。 这里有一个比较重要的概念:路由键 。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。

绑定:也就是交换机需要和队列相绑定,这其中如上图所示,是多对多的关系。

工作模式

RabbitMQ提供了6种工作模式,6中模式如下:

  1. Simple模式:这是最简单的模式,只有一个生产者和一个消费者;
  2. Work模式:在这种模式下,每个消费者都有一个独立的线程,可以处理多个消息;
  3. Publish/Subscribe模式:在这种模式下,生产者将消息发布到交换机上,消费者从交换机上订阅消息;
  4. Routing模式:在这种模式下,消息根据路由键被分发到不同的队列中;
  5. Topic模式:在这种模式下,生产者和消费者都不需要关心消息的具体类型,只需要发送或接收消息即可;
  6. RPC模式:在这种模式下,生产者和消费者之间通过远程过程调用进行通信。
代码语言:html
复制
<!-- 引入 rabbitmq 的相关依赖 -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.7.2</version>
</dependency>

Simple简单模式

生产者发送消息到队列,有一个或多个消费者,当多个消费者同时监听一个队列时,他们并不能同时消费一条消息,而是随机消费消息,即一个队列中一条消息,只能被一个消费者消费。

代码如下所示,无论是生产者还是消费者都需要先调用getChannel()方法获取连接,获取连接的步骤是固定的:

  1. 首先输入地址、端口号等信息创建连接工厂。
  2. 通过工厂建立连接,获取到连接对象建立通信通道。

首先启动消费者时刻监听hello队列,一旦有消息传来立即消费,并打印消息到控制台,这里的消费者可以创建多个。然后运行producer()生产者方法,向队列发送信息。

代码

代码语言:java
复制
package cn.com.codingce.test;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

@Slf4j
public class CodingceHello {

    // 队列名称
    private final static String QUEUE_NAME = "codingceHelloWorld";

    public static void main(String[] args) throws Exception {
        // 创建生产者
        Channel channel = getChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = "Hello World!";
        // 发送消息
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
        ///////////////////////////////////////////////////////////
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 创建消费者
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, StandardCharsets.UTF_8);
                log.info("CodingceHello Consumer customer1收到消息:{}", message);
            }
        };
        // 创建消费者
        Consumer consumer2 = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, StandardCharsets.UTF_8);
                log.info("CodingceHello Consumer customer2收到消息:{}", message);
            }
        };
        // 监听队列并消费消息
        channel.basicConsume(QUEUE_NAME, true, consumer);
        channel.basicConsume(QUEUE_NAME, true, consumer2);
    }

    /**
     * 获取RabbitMQ连接对象
     */
    public static Channel getChannel() throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        // 创建连接
        Connection connection = factory.newConnection();
        // 创建通道
        return connection.createChannel();
    }
}

Log

代码语言:shell
复制
10:54:23.526 [pool-1-thread-4] INFO cn.com.codingce.test.CodingceHello - CodingceHello Consumer customer1收到消息:Hello World!

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

  • QUEUE_NAME:队列的名称,是一个字符串类型的参数;
  • 第2个参数表示这个队列是否持久化,false表示不持久化,也就是一旦服务器重启,队列就会被删除;
  • 第3个参数表示这个队列是否独占,即是否只允许一个消费者同时消费这个队列中的消息。false表示不独占,也就是多个消费者可以同时消费这个队列中的消息;
  • 第4个参数表示这个队列是否自动删除,即当最后一个消费者取消订阅后,是否自动删除这个队列。false表示不自动删除,也就是需要手动删除这个队列;
  • null:表示队列的属性,这里传入null表示使用默认属性。

常见的交换机类型

  • direct直连交换机:最简单的交换机类型,它将消息直接路由到与消息中的路由键完全匹配的队列。当路由键与绑定时指定的路由键完全匹配时,消息将被投递到对应的队列。
  • fanout扇形交换机:扇形交换机将消息广播到绑定到该交换机的所有队列,它忽略消息的路由键,只需将消息发送到所有绑定的队列。
  • topic主题交换机:主题交换机根据消息的路由键和绑定的模式进行匹配,将消息路由到一个或多个队列。模式可以使用通配符进行匹配,例如*代表一个单词,#代表零个或多个单词。
  • headers头交换机:头交换机根据消息的头部属性进行匹配和路由。消息中的头部属性与绑定时指定的头部属性进行匹配,如果匹配成功,则消息被路由到对应的队列。在绑定消息队列与交换机之前声明一个map键值对,通过这个map对象实现消息队列和交换机的绑定。当消息发送到RabbitMQ时会取到该消息的headers与Exchange绑定时指定的键值对进行匹配;如果完全匹配则消息会路由到该队列,否则不会路由到该队列,有两种匹配规则:
    • x-match = all :表示所有的键值对都匹配才能接受到消息
    • x-match = any :表示只要有键值对匹配就能接受到消息
  • 总结:fanout只和交换机有关,只要消费者绑定在此交换机就会收到消息;而Direct的消费者只有当交换机和路由键同时匹配才能收到消息;Topic提供更为强大的通配符来表示路由,类似MySQL的like模糊查询功能;Headers与路由键无关,匹配消息头中的属性信息,用的较少。
  • 发布订阅模式一般使用fanout,有多个消费者消费,也就是发布消息后,所有订阅此交换机的都会收到消息进行消费。
  • Routing模式一般使用direct,绑定交换机后再通过路由Key确定消费队列。

发布/订阅模式

生产者,一个交换机(fanoutExchange),没有路由规则,多个队列,多个消费者。生产者将消息不是直接发送到队列,而是发送到X交换机,然后由交换机发送给两个队列,两个消费者各自监听一个队列,来消费消息。

在发布订阅模式下可以实现一个生产者发送的消息,可以被多个消费者多次消费,之前的消息只能消费一次。来看下面代码,生产者加了交换机名称和路由Key,在本案例中,路由Key等于没用,因为交换机类型设置为fanout,后文有说明。

而消费者创建了两个q1和q2队列,绑定到my_exchange队列上进行消费,当发送消息时,两个队列的消费者会同时接收到消息。如果q1有多个消费者,那么只会有一个q1的消费者接收到消息。

代码

代码语言:java
复制
package cn.com.codingce.test;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

@Slf4j
public class CodingcePubSub {
    
    // 交换机名称
    private static final String EXCHANGE_NAME = "codingce_exchange";

    // 路由Key
    private static final String ROUTING_KEY = "codingce_key";

    public static void main(String[] args) throws Exception {
        // 创建生产者
        Channel channel = CodingceHello.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        String message = "Hello World EXCHANGE_NAME ROUTING_KEY !";
        // 比之前多了一个交换机名称,发送6条消息方便测试
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
        log.info("CodingcePubSub basicPublish done");
        ////////////////////////////////////////////////
        // 声明一个交换机,并设置其类型为"fanout"
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        // 声明一个队列并绑定交换机和路由Key
        String queueName = channel.queueDeclare("q1", false, false, false, null).getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY);
        String queueName2 = channel.queueDeclare("q2", false, false, false, null).getQueue();
        channel.queueBind(queueName2, EXCHANGE_NAME, ROUTING_KEY);

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, StandardCharsets.UTF_8);
                log.info("CodingcePubSub Consumer handleDelivery q1 customer收到消息:{}", message);
            }
        };
        Consumer consumer1 = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, StandardCharsets.UTF_8);
                log.info("CodingcePubSub Consumer handleDelivery q11 1customer收到消息:{}", message);
            }
        };
        // q1队列有两个消费者,但每次只会有一个q1的消费者收到消息
        channel.basicConsume("q1", true, consumer);
        channel.basicConsume("q1", true, consumer1);

        Consumer consumer2 = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, StandardCharsets.UTF_8);
                log.info("CodingcePubSub Consumer handleDelivery q2 customer收到消息:{}", message);
            }
        };
        // q2只绑定一个消费者,所以这个消费者100%会收到消息。
        channel.basicConsume("q2", true, consumer2);
    }

}

Log

代码语言:shell
复制
10:51:56.868 [main] INFO cn.com.codingce.test.CodingcePubSub - CodingcePubSub basicPublish done
10:51:56.877 [pool-1-thread-4] INFO cn.com.codingce.test.CodingcePubSub - CodingcePubSub Consumer handleDelivery q1 customer收到消息:Hello World EXCHANGE_NAME ROUTING_KEY !
10:51:56.877 [pool-1-thread-4] INFO cn.com.codingce.test.CodingcePubSub - CodingcePubSub Consumer handleDelivery q1 customer收到消息:Hello World EXCHANGE_NAME ROUTING_KEY !
10:51:56.877 [pool-1-thread-4] INFO cn.com.codingce.test.CodingcePubSub - CodingcePubSub Consumer handleDelivery q1 customer收到消息:Hello World EXCHANGE_NAME ROUTING_KEY !
10:51:56.877 [pool-1-thread-4] INFO cn.com.codingce.test.CodingcePubSub - CodingcePubSub Consumer handleDelivery q1 customer收到消息:Hello World EXCHANGE_NAME ROUTING_KEY !
10:51:56.877 [pool-1-thread-4] INFO cn.com.codingce.test.CodingcePubSub - CodingcePubSub Consumer handleDelivery q1 customer收到消息:Hello World EXCHANGE_NAME ROUTING_KEY !
10:51:56.877 [pool-1-thread-4] INFO cn.com.codingce.test.CodingcePubSub - CodingcePubSub Consumer handleDelivery q1 customer收到消息:Hello World EXCHANGE_NAME ROUTING_KEY !
10:51:56.884 [pool-1-thread-6] INFO cn.com.codingce.test.CodingcePubSub - CodingcePubSub Consumer handleDelivery q2 customer收到消息:Hello World EXCHANGE_NAME ROUTING_KEY !
10:51:56.884 [pool-1-thread-6] INFO cn.com.codingce.test.CodingcePubSub - CodingcePubSub Consumer handleDelivery q2 customer收到消息:Hello World EXCHANGE_NAME ROUTING_KEY !
10:51:56.884 [pool-1-thread-6] INFO cn.com.codingce.test.CodingcePubSub - CodingcePubSub Consumer handleDelivery q2 customer收到消息:Hello World EXCHANGE_NAME ROUTING_KEY !
10:51:56.884 [pool-1-thread-6] INFO cn.com.codingce.test.CodingcePubSub - CodingcePubSub Consumer handleDelivery q2 customer收到消息:Hello World EXCHANGE_NAME ROUTING_KEY !
10:51:56.884 [pool-1-thread-7] INFO cn.com.codingce.test.CodingcePubSub - CodingcePubSub Consumer handleDelivery q2 customer收到消息:Hello World EXCHANGE_NAME ROUTING_KEY !
10:51:56.884 [pool-1-thread-7] INFO cn.com.codingce.test.CodingcePubSub - CodingcePubSub Consumer handleDelivery q2 customer收到消息:Hello World EXCHANGE_NAME ROUTING_KEY !

Routing模式

生产者分别向ROUTING_KEY1和ROUTING_KEY2发送一条消息,两个Customer都只监听ROUTING_KEY1,控制台只打印ROUTING_KEY1的两条消息。

代码语言:java
复制
package cn.com.codingce.test;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

@Slf4j
public class CodingceRouting {
    // 交换机名称
    private static final String EXCHANGE_NAME = "codingce_exchange_routing";
    // 路由Key
    private static final String ROUTING_KEY1 = "routing1";
    private static final String ROUTING_KEY2 = "routing2";

    public static void main(String[] args) throws Exception {
        // 创建生产者
        Channel channel = CodingceHello.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String message = "ROUTING_KEY1 我是1的消息";
        String message2 = "ROUTING_KEY2 我是2的消息";
        // 比之前多了一个交换机名称
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY1, null, message.getBytes());
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY2, null, message2.getBytes());
        log.info("CodingceRouting basicPublish done");
        /////////////////////////////////////////////////////////////////
        // 声明一个交换机,并设置其类型为"fanout"
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        // 声明一个队列并绑定交换机和路由Key
        String queueName = channel.queueDeclare("r1", false, false, false, null).getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY1);
        String queueName2 = channel.queueDeclare("r2", false, false, false, null).getQueue();
        channel.queueBind(queueName2, EXCHANGE_NAME, ROUTING_KEY1);

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, StandardCharsets.UTF_8);
                log.info("CodingceRouting r1 customer收到消息:{}", message);
            }
        };
        channel.basicConsume("r1", true, consumer);

        Consumer consumer2 = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, StandardCharsets.UTF_8);
                log.info("CodingceRouting r2 customer收到消息:{}", message);
            }
        };
        channel.basicConsume("r2", true, consumer2);
    }

}

Log

代码语言:shell
复制
11:01:05.262 [main] INFO cn.com.codingce.test.CodingceRouting - CodingceRouting basicPublish done
11:01:05.270 [pool-1-thread-4] INFO cn.com.codingce.test.CodingceRouting - CodingceRouting r1 customer收到消息:ROUTING_KEY1 我是1的消息
11:01:05.271 [pool-1-thread-4] INFO cn.com.codingce.test.CodingceRouting - CodingceRouting r2 customer收到消息:ROUTING_KEY1 我是1的消息

我正在参与2023腾讯技术创作特训营第三期有奖征文,组队打卡瓜分大奖!

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 简介
  • 协议
  • 相关概念
  • 工作模式
  • Simple简单模式
  • 常见的交换机类型
  • 发布/订阅模式
  • Routing模式
相关产品与服务
轻量应用服务器
轻量应用服务器(TencentCloud Lighthouse)是新一代开箱即用、面向轻量应用场景的云服务器产品,助力中小企业和开发者便捷高效的在云端构建网站、Web应用、小程序/小游戏、游戏服、电商应用、云盘/图床和开发测试环境,相比普通云服务器更加简单易用且更贴近应用,以套餐形式整体售卖云资源并提供高带宽流量包,将热门软件打包实现一键构建应用,提供极简上云体验。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档