专栏首页程序猿的大杂烩快速入门RabbitMQ核心概念

快速入门RabbitMQ核心概念

哪些互联网大厂在使用RabbitMQ,为什么?

初识RabbitMQ:

  • RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,RabbitMQ是使用Erlang语言来编写的,并且RabbitMQ是基于AMQP协议的。

哪些互联网大厂在使用RabbitMQ:

  • 滴滴、美团、头条、去哪儿、艺龙

为什么使用RabbitMQ:

  • 开源、性能优秀,稳定性保障
  • 提供可靠性消息投递模式(confirm) 、返回模式(return)
  • 与SpringAMQP完美的整合、API丰富
  • 集群模式丰富,表达式配置,HA模式,镜像队列模型
  • 保证数据不丢失的前提做到高可靠性、可用性

RabbitMQ高性能的原因

  • 主要原因是因为RabbitMQ使用Erlang语言编写,Erlang语言最初在于交换机领域的架构模式,这样使得RabbitMQ在Broker之间进行数据交互的性能是非常优秀的
  • Erlang的优点:Erlang有着和原生Socket一样的延迟

AMQP高级消息队列协议与模型

什么是AMQP高级消息队列协议:

  • AMQP全称是:Advanced Message Queuing Protocol,所以AMQP翻译过来就是:高级消息队列协议。AMQP定义:是具有现代特征的二进制协议。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。

AMQP协议模型:


AMQP核心概念

  • Server:又称Broker, 接受客户端的连接,实现AMQP实体服务
  • Connection:连接,应用程序与Broker的网络连接
  • Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道。客户端可建立多个Channel,每个Channel代表一个会话任务
  • Message:消息,服务器和应用程序之间传送的数据,由PropertiesBody组成。Properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body则就是消息体内容
  • Virtual host:虚拟主机,用于进行逻辑隔离,就有点类似于NameSpace或Group的概念,是最上层的消息路由。一个Virtual Host里面可以有若干个Exchange和Queue,同一个Virtual Host里面不能有相同名称的Exchange或Queue
  • Exchange:交换机,接收消息,根据路由键转发消息到绑定的队列
  • Binding:Exchange和Queue之间的虚拟连接,binding中可以包含routing key
  • Routing key:一个路由规则,虚拟机可用它来确定如何路由一个特定消息
  • Queue:也称为Message Queue,消息队列,保存消息并将它们转发给消费者

RabbitMQ整体架构与消息流转

RabbitMQ整体架构图:

RabbitMQ消息流转图:


RabbitMQ环境安装

官方下载地址:

我们知道RabbitMQ是基于Erlang编写的,所以在安装RabbitMQ之前需要确保安装了Erlang环境。RabbitMQ与Erlang是有版本对应关系的,可以参考官方列举的版本对应关系:

例如,我这里要安装3.8.9版本的RabbitMQ,那么按官方的说明,我需要安装 22.3 ~ 23.x 版本的Erlang环境,我这里选择23.1.3版本的Erlang。使用如下命令下载RPM安装包:

[root@rabbitmq01 ~]# cd /usr/local/src
[root@rabbitmq01 /usr/local/src]# wget https://github.com/rabbitmq/erlang-rpm/releases/download/v23.1.3/erlang-23.1.3-1.el7.x86_64.rpm
[root@rabbitmq01 /usr/local/src]# wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.9/rabbitmq-server-3.8.9-1.el7.noarch.rpm
[root@rabbitmq01 /usr/local/src]# ls
erlang-23.1.3-1.el7.x86_64.rpm  rabbitmq-server-3.8.9-1.el7.noarch.rpm
[root@rabbitmq01 /usr/local/src]# 

使用yum命令进行安装,因为yum可自动解决依赖关系:

[root@rabbitmq01 /usr/local/src]# yum install -y erlang-23.1.3-1.el7.x86_64.rpm
[root@rabbitmq01 /usr/local/src]# yum install -y rabbitmq-server-3.8.9-1.el7.noarch.rpm

RabbitMQ新版本没有提供配置文件的示例,需要自己去Github上下载:

将配置文件放到/etc/rabbitmq目录下:

[root@rabbitmq01 /usr/local/src]# mv rabbitmq.conf.example /etc/rabbitmq/rabbitmq.conf

修改配置文件:

[root@rabbitmq01 ~]# vim /etc/rabbitmq/rabbitmq.conf
# 允许默认用户被外部网络访问
loopback_users.guest = false

完成配置后,启动RabbitMQ Server:

[root@rabbitmq01 ~]# rabbitmq-server start &

检查端口是否正常监听,5672是RabbitMQ的默认端口号:

[root@rabbitmq01 ~]# netstat -lntp |grep 5672
tcp        0      0 0.0.0.0:25672           0.0.0.0:*           LISTEN      1922/beam.smp       
tcp6       0      0 :::5672                 :::*                LISTEN      1922/beam.smp       
[root@rabbitmq01 ~]# 

启用RabbitMQ的管控台插件,我们可以在管控台中查看RabbitMQ的基础监控信息,以及对RabbitMQ进行管理:

[root@rabbitmq01 ~]# rabbitmq-plugins enable rabbitmq_management

使用浏览器访问管控台的15672端口,进入到登录界面,默认用户名密码均为guest

登录成功,进入到管控台首页:


rabbitmqctl命令行操作

rabbitmqctl基础操作命令:

# 关闭应用
rabbitmqctl stop_app

# 启动应用
rabbitmqctl start_app

# 节点状态
rabbitmqctl status

# 添加用户
rabbitmqctl add user username password

# 列出所有用户
rabbitmqctl list users

# 删除用户
rabbitmqctl delete_user username

# 清除用户权限
rabbitmqctl clear_permissions -p vhostpath username

# 列出用户权限
rabbitmqctl list_user_permissions username

# 修改密码
rabbitmqctl change_password username newpassword

# 设置用户权限
rabbitmqctl set permissions -p vhostpath username ".*" ".*" ".*"

# 创建虚拟主机
rabbitmqctl add vhost vhostpath

# 列出所有虚拟主机
rabbitmqctl list_vhosts

# 列出虚拟主机上所有权限
rabbitmqctl list_permissions -p vhostpath

# 删除虚拟主机
rabbitmqctl delete_vhost vhostpath

# 查看所有队列信息
rabbitmqctl list_queues

# 清除队列里的消息
rabbitmqctl -p vhostpath purge_queue blue

rabbitmqctl高级操作命令:

# 移除所有数据,要在rabbitmqctIl stop_app之后使用
rabbitmqctl reset

# 组成集群命令
rabbitmqctl join_cluster <clusternode> [--ram]

# 查看集群状态
rabbitmqctl cluster_status

# 修改集群节点的存储形式
rabbitmqctl change_cluster_node_type disc | ram

# 忘记节点(摘除节点)
rabbitmqctl forget cluster_node [--offline]

# 修改节点名称
rabbitmqctl rename_cluster_node oldnode1 newnode1 [oldnode2] [newnode2..] 

生产者消费者代码示例

创建一个Maven工程,在pom文件中添加如下依赖:

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.16</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.10.0</version>
</dependency>

生产者代码示例:

package com.zj.rabbitmq.learn.basic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.SneakyThrows;

/**
 * 生产者
 *
 * @author 01
 * @date 2020-11-23
 **/
public class MyProducer {

    @SneakyThrows
    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.243.164");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        // 通过连接工厂创建连接
        try (Connection connection = factory.newConnection();
             // 通过连接创建一个Channel
             Channel channel = connection.createChannel()) {
            for (int i = 0; i < 5; i++) {
                // 通过Channel发送数据
                String msg = "Hello RabbitMQ!";
                // 不设置Exchange默认走default direct exchange,此时routingKey就是队列名称
                channel.basicPublish("", "test001", null, msg.getBytes());
            }
        }
    }
}

消费者代码示例:

package com.zj.rabbitmq.learn.basic;

import com.rabbitmq.client.*;
import lombok.SneakyThrows;

/**
 * 消费者
 *
 * @author 01
 * @date 2020-11-23
 **/
public class MyConsumer {

    @SneakyThrows
    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.243.164");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        // 通过连接工厂创建连接
        try (Connection connection = factory.newConnection();
             // 通过连接创建一个Channel
             Channel channel = connection.createChannel()) {
            // 声明一个队列,队列不存在会自动创建
            channel.queueDeclare("test001", true, false, false, null);
            // 创建消费者
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) {
                    String message = new String(body);
                    System.out.println("Received: " + message);
                }
            };

            // 持续监听,消费消息
            while (true){
                channel.basicConsume("test001", true, consumer);
                Thread.sleep(1000);
            }
        }
    }
}

先运行消费者,再运行生产者,此时消费者控制台输出如下:


关于交换机

Exchange(交换机)用于接收消息,并根据路由键转发消息所绑定的队列:

交换机属性:

  • Name:交换机名称
  • Type:交换机类型direct、topic、 fanout、 headers
  • Durability:是否需要持久化,true为持久化
  • Auto Delete:当最后一个绑定到Exchange 上的队列删除后,自动删除该Exchange
  • Internal:当前Exchange是否用于RabbitMQ内部使用,默认为False
  • Arguments:扩展参数,用于扩展AMQP协议自制定化使用

Direct Exchange

  • 所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue
  • 注意:Direct模式可以使用RabbitMQ自带的Exchange:default Exchange,所以不需要将Exchange进行任何绑定(binding)操作。消息传递时,RouteKey必须完全匹配才会被队列接收,否则该消息会被抛弃

生产者代码示例:

package com.zj.rabbitmq.learn.exchange;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.SneakyThrows;

public class ProducerOfDirectExchange {
    @SneakyThrows
    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.243.164");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        String exchangeName = "test_direct_exchange";
        String routingKey = "test.direct";
        // 通过连接工厂创建连接
        try (Connection connection = factory.newConnection();
             // 通过连接创建一个Channel
             Channel channel = connection.createChannel()) {
            // 通过Channel发送数据
            String msg = "Hello RabbitMQ of Direct Exchange!";
            channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
        }
    }
}

消费者代码示例:

package com.zj.rabbitmq.learn.exchange;

import com.rabbitmq.client.*;
import lombok.SneakyThrows;

public class ConsumerOfDirectExchange {

    @SneakyThrows
    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.243.164");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        // 通过连接工厂创建连接
        try (Connection connection = factory.newConnection();
             // 通过连接创建一个Channel
             Channel channel = connection.createChannel()) {

            String exchangeName = "test_direct_exchange";
            String exchangeType = "direct";
            String queueName = "test_direct_queue";
            String routingKey = "test.direct";

            // 声明一个direct类型的Exchange
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
            // 声明一个队列,队列不存在会自动创建
            channel.queueDeclare(queueName, true, false, false, null);
            // 将队列绑定到指定的Exchange上
            channel.queueBind(queueName, exchangeName, routingKey);

            // 创建消费者
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) {
                    String message = new String(body);
                    System.out.println("Received: " + message);
                }
            };

            // 持续监听,消费消息
            while (true) {
                channel.basicConsume(queueName, true, consumer);
                Thread.sleep(1000);
            }
        }
    }
}

Topic Exchange

  • 所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上
  • Exchange将RouteKey和某Topic进行模糊匹配,此时队列需要绑定一个Topic
  • 可以使用通配符进行模糊匹配:
    • 符号 "#" 匹配一个或多个词
    • 符号 "*" 匹配不多不少一个词
    • 例如:
      • "log.#" 能够匹配到 "log.info.oa"
      • "log.*" 只会匹配到 "log.error"

生产者代码示例:

package com.zj.rabbitmq.learn.exchange;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.SneakyThrows;

public class ProducerOfTopicExchange {

    @SneakyThrows
    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.243.164");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        String exchangeName = "test_topic_exchange";
        String routingKey1 = "user.save";
        String routingKey2 = "user.update";
        String routingKey3 = "user.delete.abc";

        // 通过连接工厂创建连接
        try (Connection connection = factory.newConnection();
             // 通过连接创建一个Channel
             Channel channel = connection.createChannel()) {
            // 通过Channel发送数据
            String msg = "Hello RabbitMQ of Topic Exchange!";
            channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes());
            channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes());
            channel.basicPublish(exchangeName, routingKey3, null, msg.getBytes());
        }
    }
}

消费者代码示例:

package com.zj.rabbitmq.learn.exchange;

import com.rabbitmq.client.*;
import lombok.SneakyThrows;

public class ConsumerOfTopicExchange {

    @SneakyThrows
    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.243.164");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        // 通过连接工厂创建连接
        try (Connection connection = factory.newConnection();
             // 通过连接创建一个Channel
             Channel channel = connection.createChannel()) {

            String exchangeName = "test_topic_exchange";
            String exchangeType = "topic";
            String queueName = "test_topic_queue";
            //String routingKey = "user.*";
            String routingKey = "user.#";

            // 声明一个topic类型的Exchange
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
            // 声明一个队列,队列不存在会自动创建
            channel.queueDeclare(queueName, true, false, false, null);
            // 将队列绑定到指定的Exchange上
            channel.queueBind(queueName, exchangeName, routingKey);

            // 创建消费者
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) {
                    String message = new String(body);
                    System.out.println("Received: " + message);
                }
            };

            // 持续监听,消费消息
            while (true) {
                channel.basicConsume(queueName, true, consumer);
                Thread.sleep(1000);
            }
        }
    }
}

Fanout Exchange

  • 不处理路由键,只需要简单的将队列绑定到交换机上
  • 发送到交换机的消息都会被转发到与该交换机绑定的所有队列上
  • Fanout交换机转发消息是最快的

生产者代码示例:

package com.zj.rabbitmq.learn.exchange;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.SneakyThrows;

public class ProducerOfFanoutExchange {

    @SneakyThrows
    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.243.164");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        String exchangeName = "test_fanout_exchange";

        // 通过连接工厂创建连接
        try (Connection connection = factory.newConnection();
             // 通过连接创建一个Channel
             Channel channel = connection.createChannel()) {
            for (int i = 0; i < 10; i++) {
                // 通过Channel发送数据
                String msg = "Hello RabbitMQ of Fanout Exchange!";
                channel.basicPublish(exchangeName, "", null, msg.getBytes());
            }
        }
    }
}

消费者代码示例:

package com.zj.rabbitmq.learn.exchange;

import com.rabbitmq.client.*;
import lombok.SneakyThrows;

public class ConsumerOfFanoutExchange {

    @SneakyThrows
    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.243.164");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        // 通过连接工厂创建连接
        try (Connection connection = factory.newConnection();
             // 通过连接创建一个Channel
             Channel channel = connection.createChannel()) {

            String exchangeName = "test_fanout_exchange";
            String exchangeType = "fanout";
            String queueName = "test_fanout_queue";
            // 不设置routingKey
            String routingKey = "";

            // 声明一个fanout类型的Exchange
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
            // 声明一个队列,队列不存在会自动创建
            channel.queueDeclare(queueName, true, false, false, null);
            // 将队列绑定到指定的Exchange上
            channel.queueBind(queueName, exchangeName, routingKey);

            // 创建消费者
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) {
                    String message = new String(body);
                    System.out.println("Received: " + message);
                }
            };

            // 持续监听,消费消息
            while (true) {
                channel.basicConsume(queueName, true, consumer);
                Thread.sleep(1000);
            }
        }
    }
}

绑定、队列、消息、虚拟主机

Binding - 绑定:

  • Exchange和Exchange、Queue之 间的连接关系
  • Binding中可以包含RoutingKey或者参数

Queue - 消息队列:

  • 消息队列,实际存储消息数据
  • Durability:是否持久化。Durable:是,Transient:否
  • Auto delete:如选yes,代表当最后一个监听被移除之后,该Queue会自动被删除

Message - 消息:

  • 服务器和应用程序之间传送的数据
  • 本质上就是一段数据,由Properties和Payload(Body)组成
  • 常用属性:delivery mode、headers(自定义属性)
  • Message其他属性:
    • content_type、content_encoding、priority
    • correlation id、reply_to、expiration、message_id
    • timestamp、type、 user_id、app_id、 cluster_id

设置Message属性代码示例:

package com.zj.rabbitmq.learn.message;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.SneakyThrows;

import java.util.HashMap;
import java.util.Map;

class MyProducer {

    @SneakyThrows
    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.243.164");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        Map<String, Object> headers = new HashMap<>();
        headers.put("a", "1");
        headers.put("b", "2");
        // 自定义Message的一些属性
        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                // 持久化模式
                .deliveryMode(2)
                // 消息的编码格式
                .contentEncoding("UTF-8")
                // 消息过期时间
                .expiration("15000")
                // 设置消息的头
                .headers(headers)
                .build();

        // 通过连接工厂创建连接
        try (Connection connection = factory.newConnection();
             // 通过连接创建一个Channel
             Channel channel = connection.createChannel()) {
            for (int i = 0; i < 5; i++) {
                // 通过Channel发送数据
                String msg = "Hello RabbitMQ!";
                // 不设置Exchange默认走direct exchange,此时routingKey就是队列名称
                channel.basicPublish("", "test001", properties, msg.getBytes());
            }
        }
    }
}

Virtual host - 虚拟主机:

  • 虚拟地址,用于进行逻辑隔离,最上层的消息路由
  • 一个Virtual Host里面可以有若干个Exchange和Queue
  • 同一个Virtual Host里面不能有相同名称的Exchange或Queue

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 关于乱码问题的解决与HttpServletResponse中的方法

    会有乱码现象,其实就是因为字符集编码不一致的问题,就好像中国人和外国人谈话一样,互相不懂对方在说啥。字符集编码也是如此,本来就是一段GBK编码的文字,却要用ut...

    端碗吹水
  • 安装jumpserver

    Jumpserver是一款使用Python, Django开发的开源跳板机系统,,基于ssh协议来管理,客户端无需安装agent,助力互联网企业高效 用户、资产...

    端碗吹水
  • Kafka核心API——Stream API

    Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature,它提供了对存储于Kafka内的数据进行流式处理和分析的功能。简而言之...

    端碗吹水
  • RabbitMQ使用代码示例

    RabbitMQ 支持多种语言访问,以 Java 为例看下一般使用 RabbitMQ 的步骤。

    用户1220053
  • RabbitMQ In JAVA 介绍及使用

      RabbitMQ是开源的消息中间件,它是轻量级的,支持多种消息传递协议,可以部署在分布式和联合配置中,以满足高级别、高可用性需求。并且可在许多操作系统和云环...

  • 权限认证框架之Apache Shiro1介绍

    JavaEdge
  • Kafka Producer 异步发送消息居然也会阻塞?

    Kafka 一直以来都以高吞吐量的特性而家喻户晓,就在上周,在一个性能监控项目中,需要使用到 Kafka 传输海量消息,在这过程中遇到了一个 Kafka Pro...

    张乘辉
  • 新手可以学Python吗?应该怎么入门?【七天包学包会】

    很多人都推荐小白第一门语言选Python,因为语法简单。这句话只说了一半,Python确实容易上手,对初学者的门槛很低。但我发现,对于小白真正的门槛在于系统知识...

    原来是泽镜啊
  • linux环境下R语言的安装运行以及程序包的下载

    R是一种多功能型统计绘图软件,可以方便的编写函数、建立模型,具有良好的扩展性。目前在R网站上约有2400个程序包,涵盖了基础统计学、社会学、经济学、生态学、空间...

    HUBU生信
  • Golang语言--将byte的int转换

    在使用golang做数据传输的时候,会经常遇到byte与int的互转,但golang并没有现成的方法,因此只能通过binary包来解决 所以,需要 :impor...

    李海彬

扫码关注云+社区

领取腾讯云代金券