前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >快速入门RabbitMQ核心概念

快速入门RabbitMQ核心概念

作者头像
端碗吹水
发布2020-11-24 10:57:55
4470
发布2020-11-24 10:57:55
举报

哪些互联网大厂在使用RabbitMQ,为什么?

初识RabbitMQ:

  • RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,RabbitMQ是使用Erlang语言来编写的,并且RabbitMQ是基于AMQP协议的。

哪些互联网大厂在使用RabbitMQ:

  • 滴滴、美团、头条、去哪儿、艺龙

为什么使用RabbitMQ:

  • 开源、性能优秀,稳定性保障
  • 提供可靠性消息投递模式(confirm) 、返回模式(return)
  • 与SpringAMQP完美的整合、API丰富
  • 集群模式丰富,表达式配置,HA模式,镜像队列模型
  • 保证数据不丢失的前提做到高可靠性、可用性

RabbitMQ高性能的原因

  • 主要原因是因为RabbitMQ使用Erlang语言编写,Erlang语言最初在于交换机领域的架构模式,这样使得RabbitMQ在Broker之间进行数据交互的性能是非常优秀的
  • Erlang的优点:Erlang有着和原生Socket一样的延迟

AMQP高级消息队列协议与模型

什么是AMQP高级消息队列协议:

  • AMQP全称是:Advanced Message Queuing Protocol,所以AMQP翻译过来就是:高级消息队列协议。AMQP定义:是具有现代特征的二进制协议。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。

AMQP协议模型:

快速入门RabbitMQ核心概念
快速入门RabbitMQ核心概念

AMQP核心概念

  • Server:又称Broker, 接受客户端的连接,实现AMQP实体服务
  • Connection:连接,应用程序与Broker的网络连接
  • Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道。客户端可建立多个Channel,每个Channel代表一个会话任务
  • Message:消息,服务器和应用程序之间传送的数据,由PropertiesBody组成。Properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body则就是消息体内容
  • Virtual host:虚拟主机,用于进行逻辑隔离,就有点类似于NameSpace或Group的概念,是最上层的消息路由。一个Virtual Host里面可以有若干个Exchange和Queue,同一个Virtual Host里面不能有相同名称的Exchange或Queue
  • Exchange:交换机,接收消息,根据路由键转发消息到绑定的队列
  • Binding:Exchange和Queue之间的虚拟连接,binding中可以包含routing key
  • Routing key:一个路由规则,虚拟机可用它来确定如何路由一个特定消息
  • Queue:也称为Message Queue,消息队列,保存消息并将它们转发给消费者

RabbitMQ整体架构与消息流转

RabbitMQ整体架构图:

快速入门RabbitMQ核心概念
快速入门RabbitMQ核心概念

RabbitMQ消息流转图:

快速入门RabbitMQ核心概念
快速入门RabbitMQ核心概念

RabbitMQ环境安装

官方下载地址:

我们知道RabbitMQ是基于Erlang编写的,所以在安装RabbitMQ之前需要确保安装了Erlang环境。RabbitMQ与Erlang是有版本对应关系的,可以参考官方列举的版本对应关系:

例如,我这里要安装3.8.9版本的RabbitMQ,那么按官方的说明,我需要安装 22.3 ~ 23.x 版本的Erlang环境,我这里选择23.1.3版本的Erlang。使用如下命令下载RPM安装包:

[root@rabbitmq01 ~]# cd /usr/local/src
[root@rabbitmq01 /usr/local/src]# wget https://github.com/rabbitmq/erlang-rpm/releases/download/v23.1.3/erlang-23.1.3-1.el7.x86_64.rpm
[root@rabbitmq01 /usr/local/src]# wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.9/rabbitmq-server-3.8.9-1.el7.noarch.rpm
[root@rabbitmq01 /usr/local/src]# ls
erlang-23.1.3-1.el7.x86_64.rpm  rabbitmq-server-3.8.9-1.el7.noarch.rpm
[root@rabbitmq01 /usr/local/src]# 

使用yum命令进行安装,因为yum可自动解决依赖关系:

[root@rabbitmq01 /usr/local/src]# yum install -y erlang-23.1.3-1.el7.x86_64.rpm
[root@rabbitmq01 /usr/local/src]# yum install -y rabbitmq-server-3.8.9-1.el7.noarch.rpm

RabbitMQ新版本没有提供配置文件的示例,需要自己去Github上下载:

将配置文件放到/etc/rabbitmq目录下:

[root@rabbitmq01 /usr/local/src]# mv rabbitmq.conf.example /etc/rabbitmq/rabbitmq.conf

修改配置文件:

[root@rabbitmq01 ~]# vim /etc/rabbitmq/rabbitmq.conf
# 允许默认用户被外部网络访问
loopback_users.guest = false

完成配置后,启动RabbitMQ Server:

[root@rabbitmq01 ~]# rabbitmq-server start &

检查端口是否正常监听,5672是RabbitMQ的默认端口号:

[root@rabbitmq01 ~]# netstat -lntp |grep 5672
tcp        0      0 0.0.0.0:25672           0.0.0.0:*           LISTEN      1922/beam.smp       
tcp6       0      0 :::5672                 :::*                LISTEN      1922/beam.smp       
[root@rabbitmq01 ~]# 

启用RabbitMQ的管控台插件,我们可以在管控台中查看RabbitMQ的基础监控信息,以及对RabbitMQ进行管理:

[root@rabbitmq01 ~]# rabbitmq-plugins enable rabbitmq_management

使用浏览器访问管控台的15672端口,进入到登录界面,默认用户名密码均为guest

快速入门RabbitMQ核心概念
快速入门RabbitMQ核心概念

登录成功,进入到管控台首页:

快速入门RabbitMQ核心概念
快速入门RabbitMQ核心概念

rabbitmqctl命令行操作

rabbitmqctl基础操作命令:

# 关闭应用
rabbitmqctl stop_app

# 启动应用
rabbitmqctl start_app

# 节点状态
rabbitmqctl status

# 添加用户
rabbitmqctl add user username password

# 列出所有用户
rabbitmqctl list users

# 删除用户
rabbitmqctl delete_user username

# 清除用户权限
rabbitmqctl clear_permissions -p vhostpath username

# 列出用户权限
rabbitmqctl list_user_permissions username

# 修改密码
rabbitmqctl change_password username newpassword

# 设置用户权限
rabbitmqctl set permissions -p vhostpath username ".*" ".*" ".*"

# 创建虚拟主机
rabbitmqctl add vhost vhostpath

# 列出所有虚拟主机
rabbitmqctl list_vhosts

# 列出虚拟主机上所有权限
rabbitmqctl list_permissions -p vhostpath

# 删除虚拟主机
rabbitmqctl delete_vhost vhostpath

# 查看所有队列信息
rabbitmqctl list_queues

# 清除队列里的消息
rabbitmqctl -p vhostpath purge_queue blue

rabbitmqctl高级操作命令:

# 移除所有数据,要在rabbitmqctIl stop_app之后使用
rabbitmqctl reset

# 组成集群命令
rabbitmqctl join_cluster <clusternode> [--ram]

# 查看集群状态
rabbitmqctl cluster_status

# 修改集群节点的存储形式
rabbitmqctl change_cluster_node_type disc | ram

# 忘记节点(摘除节点)
rabbitmqctl forget cluster_node [--offline]

# 修改节点名称
rabbitmqctl rename_cluster_node oldnode1 newnode1 [oldnode2] [newnode2..] 

生产者消费者代码示例

创建一个Maven工程,在pom文件中添加如下依赖:

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.16</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.10.0</version>
</dependency>

生产者代码示例:

package com.zj.rabbitmq.learn.basic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.SneakyThrows;

/**
 * 生产者
 *
 * @author 01
 * @date 2020-11-23
 **/
public class MyProducer {

    @SneakyThrows
    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.243.164");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        // 通过连接工厂创建连接
        try (Connection connection = factory.newConnection();
             // 通过连接创建一个Channel
             Channel channel = connection.createChannel()) {
            for (int i = 0; i < 5; i++) {
                // 通过Channel发送数据
                String msg = "Hello RabbitMQ!";
                // 不设置Exchange默认走default direct exchange,此时routingKey就是队列名称
                channel.basicPublish("", "test001", null, msg.getBytes());
            }
        }
    }
}

消费者代码示例:

package com.zj.rabbitmq.learn.basic;

import com.rabbitmq.client.*;
import lombok.SneakyThrows;

/**
 * 消费者
 *
 * @author 01
 * @date 2020-11-23
 **/
public class MyConsumer {

    @SneakyThrows
    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.243.164");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        // 通过连接工厂创建连接
        try (Connection connection = factory.newConnection();
             // 通过连接创建一个Channel
             Channel channel = connection.createChannel()) {
            // 声明一个队列,队列不存在会自动创建
            channel.queueDeclare("test001", true, false, false, null);
            // 创建消费者
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) {
                    String message = new String(body);
                    System.out.println("Received: " + message);
                }
            };

            // 持续监听,消费消息
            while (true){
                channel.basicConsume("test001", true, consumer);
                Thread.sleep(1000);
            }
        }
    }
}

先运行消费者,再运行生产者,此时消费者控制台输出如下:

快速入门RabbitMQ核心概念
快速入门RabbitMQ核心概念

关于交换机

Exchange(交换机)用于接收消息,并根据路由键转发消息所绑定的队列:

快速入门RabbitMQ核心概念
快速入门RabbitMQ核心概念

交换机属性:

  • Name:交换机名称
  • Type:交换机类型direct、topic、 fanout、 headers
  • Durability:是否需要持久化,true为持久化
  • Auto Delete:当最后一个绑定到Exchange 上的队列删除后,自动删除该Exchange
  • Internal:当前Exchange是否用于RabbitMQ内部使用,默认为False
  • Arguments:扩展参数,用于扩展AMQP协议自制定化使用

Direct Exchange

  • 所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue
  • 注意:Direct模式可以使用RabbitMQ自带的Exchange:default Exchange,所以不需要将Exchange进行任何绑定(binding)操作。消息传递时,RouteKey必须完全匹配才会被队列接收,否则该消息会被抛弃
快速入门RabbitMQ核心概念
快速入门RabbitMQ核心概念

生产者代码示例:

package com.zj.rabbitmq.learn.exchange;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.SneakyThrows;

public class ProducerOfDirectExchange {
    @SneakyThrows
    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.243.164");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        String exchangeName = "test_direct_exchange";
        String routingKey = "test.direct";
        // 通过连接工厂创建连接
        try (Connection connection = factory.newConnection();
             // 通过连接创建一个Channel
             Channel channel = connection.createChannel()) {
            // 通过Channel发送数据
            String msg = "Hello RabbitMQ of Direct Exchange!";
            channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
        }
    }
}

消费者代码示例:

package com.zj.rabbitmq.learn.exchange;

import com.rabbitmq.client.*;
import lombok.SneakyThrows;

public class ConsumerOfDirectExchange {

    @SneakyThrows
    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.243.164");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        // 通过连接工厂创建连接
        try (Connection connection = factory.newConnection();
             // 通过连接创建一个Channel
             Channel channel = connection.createChannel()) {

            String exchangeName = "test_direct_exchange";
            String exchangeType = "direct";
            String queueName = "test_direct_queue";
            String routingKey = "test.direct";

            // 声明一个direct类型的Exchange
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
            // 声明一个队列,队列不存在会自动创建
            channel.queueDeclare(queueName, true, false, false, null);
            // 将队列绑定到指定的Exchange上
            channel.queueBind(queueName, exchangeName, routingKey);

            // 创建消费者
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) {
                    String message = new String(body);
                    System.out.println("Received: " + message);
                }
            };

            // 持续监听,消费消息
            while (true) {
                channel.basicConsume(queueName, true, consumer);
                Thread.sleep(1000);
            }
        }
    }
}

Topic Exchange

  • 所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上
  • Exchange将RouteKey和某Topic进行模糊匹配,此时队列需要绑定一个Topic
  • 可以使用通配符进行模糊匹配:
    • 符号 "#" 匹配一个或多个词
    • 符号 "*" 匹配不多不少一个词
    • 例如:
      • "log.#" 能够匹配到 "log.info.oa"
      • "log.*" 只会匹配到 "log.error"
快速入门RabbitMQ核心概念
快速入门RabbitMQ核心概念

生产者代码示例:

package com.zj.rabbitmq.learn.exchange;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.SneakyThrows;

public class ProducerOfTopicExchange {

    @SneakyThrows
    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.243.164");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        String exchangeName = "test_topic_exchange";
        String routingKey1 = "user.save";
        String routingKey2 = "user.update";
        String routingKey3 = "user.delete.abc";

        // 通过连接工厂创建连接
        try (Connection connection = factory.newConnection();
             // 通过连接创建一个Channel
             Channel channel = connection.createChannel()) {
            // 通过Channel发送数据
            String msg = "Hello RabbitMQ of Topic Exchange!";
            channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes());
            channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes());
            channel.basicPublish(exchangeName, routingKey3, null, msg.getBytes());
        }
    }
}

消费者代码示例:

package com.zj.rabbitmq.learn.exchange;

import com.rabbitmq.client.*;
import lombok.SneakyThrows;

public class ConsumerOfTopicExchange {

    @SneakyThrows
    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.243.164");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        // 通过连接工厂创建连接
        try (Connection connection = factory.newConnection();
             // 通过连接创建一个Channel
             Channel channel = connection.createChannel()) {

            String exchangeName = "test_topic_exchange";
            String exchangeType = "topic";
            String queueName = "test_topic_queue";
            //String routingKey = "user.*";
            String routingKey = "user.#";

            // 声明一个topic类型的Exchange
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
            // 声明一个队列,队列不存在会自动创建
            channel.queueDeclare(queueName, true, false, false, null);
            // 将队列绑定到指定的Exchange上
            channel.queueBind(queueName, exchangeName, routingKey);

            // 创建消费者
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) {
                    String message = new String(body);
                    System.out.println("Received: " + message);
                }
            };

            // 持续监听,消费消息
            while (true) {
                channel.basicConsume(queueName, true, consumer);
                Thread.sleep(1000);
            }
        }
    }
}

Fanout Exchange

  • 不处理路由键,只需要简单的将队列绑定到交换机上
  • 发送到交换机的消息都会被转发到与该交换机绑定的所有队列上
  • Fanout交换机转发消息是最快的
快速入门RabbitMQ核心概念
快速入门RabbitMQ核心概念

生产者代码示例:

package com.zj.rabbitmq.learn.exchange;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.SneakyThrows;

public class ProducerOfFanoutExchange {

    @SneakyThrows
    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.243.164");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        String exchangeName = "test_fanout_exchange";

        // 通过连接工厂创建连接
        try (Connection connection = factory.newConnection();
             // 通过连接创建一个Channel
             Channel channel = connection.createChannel()) {
            for (int i = 0; i < 10; i++) {
                // 通过Channel发送数据
                String msg = "Hello RabbitMQ of Fanout Exchange!";
                channel.basicPublish(exchangeName, "", null, msg.getBytes());
            }
        }
    }
}

消费者代码示例:

package com.zj.rabbitmq.learn.exchange;

import com.rabbitmq.client.*;
import lombok.SneakyThrows;

public class ConsumerOfFanoutExchange {

    @SneakyThrows
    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.243.164");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        // 通过连接工厂创建连接
        try (Connection connection = factory.newConnection();
             // 通过连接创建一个Channel
             Channel channel = connection.createChannel()) {

            String exchangeName = "test_fanout_exchange";
            String exchangeType = "fanout";
            String queueName = "test_fanout_queue";
            // 不设置routingKey
            String routingKey = "";

            // 声明一个fanout类型的Exchange
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
            // 声明一个队列,队列不存在会自动创建
            channel.queueDeclare(queueName, true, false, false, null);
            // 将队列绑定到指定的Exchange上
            channel.queueBind(queueName, exchangeName, routingKey);

            // 创建消费者
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) {
                    String message = new String(body);
                    System.out.println("Received: " + message);
                }
            };

            // 持续监听,消费消息
            while (true) {
                channel.basicConsume(queueName, true, consumer);
                Thread.sleep(1000);
            }
        }
    }
}

绑定、队列、消息、虚拟主机

Binding - 绑定:

  • Exchange和Exchange、Queue之 间的连接关系
  • Binding中可以包含RoutingKey或者参数

Queue - 消息队列:

  • 消息队列,实际存储消息数据
  • Durability:是否持久化。Durable:是,Transient:否
  • Auto delete:如选yes,代表当最后一个监听被移除之后,该Queue会自动被删除

Message - 消息:

  • 服务器和应用程序之间传送的数据
  • 本质上就是一段数据,由Properties和Payload(Body)组成
  • 常用属性:delivery mode、headers(自定义属性)
  • Message其他属性:
    • content_type、content_encoding、priority
    • correlation id、reply_to、expiration、message_id
    • timestamp、type、 user_id、app_id、 cluster_id

设置Message属性代码示例:

package com.zj.rabbitmq.learn.message;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.SneakyThrows;

import java.util.HashMap;
import java.util.Map;

class MyProducer {

    @SneakyThrows
    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.243.164");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        Map<String, Object> headers = new HashMap<>();
        headers.put("a", "1");
        headers.put("b", "2");
        // 自定义Message的一些属性
        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                // 持久化模式
                .deliveryMode(2)
                // 消息的编码格式
                .contentEncoding("UTF-8")
                // 消息过期时间
                .expiration("15000")
                // 设置消息的头
                .headers(headers)
                .build();

        // 通过连接工厂创建连接
        try (Connection connection = factory.newConnection();
             // 通过连接创建一个Channel
             Channel channel = connection.createChannel()) {
            for (int i = 0; i < 5; i++) {
                // 通过Channel发送数据
                String msg = "Hello RabbitMQ!";
                // 不设置Exchange默认走direct exchange,此时routingKey就是队列名称
                channel.basicPublish("", "test001", properties, msg.getBytes());
            }
        }
    }
}

Virtual host - 虚拟主机:

  • 虚拟地址,用于进行逻辑隔离,最上层的消息路由
  • 一个Virtual Host里面可以有若干个Exchange和Queue
  • 同一个Virtual Host里面不能有相同名称的Exchange或Queue
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-11-23 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 哪些互联网大厂在使用RabbitMQ,为什么?
  • RabbitMQ高性能的原因
  • AMQP高级消息队列协议与模型
  • AMQP核心概念
  • RabbitMQ整体架构与消息流转
  • RabbitMQ环境安装
  • rabbitmqctl命令行操作
  • 生产者消费者代码示例
  • 关于交换机
    • Direct Exchange
      • Topic Exchange
        • Fanout Exchange
        • 绑定、队列、消息、虚拟主机
        相关产品与服务
        消息队列 CMQ 版
        消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档