前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ集群和高可用方案

RabbitMQ集群和高可用方案

作者头像
CodingDiray
发布2020-05-21 12:08:41
9.8K0
发布2020-05-21 12:08:41
举报
文章被收录于专栏:Coding DiaryCoding DiaryCoding Diary

RabbitMQ高可用集群方案

RabbitMQ的Cluster模式分为两种:

  • 普通模式
  • 镜像模式

Cluster普通模式:

元数据包含以下内容:

  • 队列元数据:队列的名称及属性
  • 交换器:交换器的名称及属性
  • 绑定关系元数据:交换器与队列或者交换器与交换器
  • vhost元数据:为vhost内的队列,交换器和绑定提供命名空间及安全属性之间的绑定关系

Cluster多机多节点部署:多机多节点是指在每台机器中部署一个RabbitMQ服务节点,进而由多个机器组成一个RabbitMQ集群

Cluster单机多节点部署:由于某些因素的限制,有时候不得不在单台物理机器上去创建一个多RabbitMQ服务节点的集群。或者只想要实验性的验证集群的某些特性,也不需要浪费过多的物理机器去实现。需要为每个RabbitMQ服务节点设置不同的端口号和节点名称来启动相应的服务

Cluster镜像模式:

镜像模式的集群是在普通模式的基础上,通过policy来实现,使用镜像模式可以实现RabbitMQ的高可用方案

ha-sync-mode 队列中消息的同步方式,有效值为automatic和manual,默认为automatic

RabbitMQ集群搭建

  1. 环境准备

准备3台机器,并在这三台机器上安装RabbitMQ

192.168.0.22 node1
192.168.0.23 node2
192.168.0.24 node3
  1. 修改配置文件

依次修改对应主机的hostname

hostname node1
hostname node2
hostname node3

依次修改主机的/etc/hosts文件,添加以下内容

192.168.0.22 node1
192.168.0.23 node2
192.168.0.24 node3

将node1节点上的 /var/lib/rabbitmq/.erlang.cookie 文件复制到其他节点(Erlang语言要求必须有相同的cookie才能进行集群通信)

scp /var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmq/
scp /var/lib/rabbitmq/.erlang.cookie root@node3:/var/lib/rabbitmq/
  1. 添加防火墙端口

给每台机器的防火墙添加端口

firewall-cmd --zone=public --add-port=4369/tcp --permanent
firewall-cmd --zone=public --add-port=5672/tcp --permanent
firewall-cmd --zone=public --add-port=25672/tcp --permanent
firewall-cmd --zone=public --add-port=15672/tcp --permanent

重启防火墙

firewall-cmd --reload
  1. 启动RabbitMQ集群

启动每台机器的RabbitMQ

systemctl start rabbitmq-server

操作节点node2,将node2加入到集群

rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1 --ram
rabbitmqctl start_app

查看集群的状态

rabbitmqctl cluster_status

在node3节点上重复上述命令,将node3加入到集群。完成后查看集群状态

RabbitMQ Web端查看集群状态

同时在Web端还可以看到每个节点的详细信息,如内存情况,IO情况,数据的存储等等

镜像队列模式集群

镜像队列属于RabbitMQ 的高可用方案,见:https://www.rabbitmq.com/ha.html#mirroring-arguments 通过前面的步骤搭建的集群属于普通模式集群,是通过共享元数据实现集群 开启镜像队列模式需要在管理页面添加策略,添加方式: 进入管理页面 -> Admin -> Policies(在页面右侧)-> Add / update a policy 在表单中填入:

参数说明: name: 策略名称,如果使用已有的名称,保存后将会修改原来的信息 Apply to:策略应用到什么对象上 Pattern:策略应用到对象时,对象名称的匹配规则(正则表达式) Priority:优先级,数值越大,优先级越高,相同优先级取最后一个 Definition:策略定义的内容,对于镜像队列的配置来说,只需要包含3个部分: ha-modeha-paramsha-sync-mode。其中,ha-sync-mode是同步的方式,自动还是手动,默认是自动。ha-modeha-params 组合使用。组合方式如下:

镜像队列模式相比较普通模式,镜像模式会占用更多的带宽来进行同步,所以镜像队列的吞吐量会低于普通模式。但普通模式不能实现高可用,某个节点挂了后,这个节点上的消息将无法被消费,需要等待节点启动后才能被消费。

集群测试代码示例:

Producer示例:

import com.rabbitmq.client.*;import java.io.IOException;import java.time.Instant;import java.util.concurrent.TimeUnit;public class Producer {  public static void main(String[] args) {    // 创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();    // 设置连接属性
    connectionFactory.setUsername("test-user");
    connectionFactory.setPassword("test-user");
    connectionFactory.setVirtualHost("v1");    // 设置每个节点的链接地址和端口
    Address[] addresses =        new Address[] {          new Address("192.168.0.22", 5672),          new Address("192.168.0.23", 5672),          new Address("192.168.0.24", 5672)
        };

    Connection connection = null;
    Channel channel = null;    try {      // 开启/关闭连接自动恢复,默认是开启状态
      connectionFactory.setAutomaticRecoveryEnabled(true);      // 设置每100毫秒尝试恢复一次,默认是5秒
      connectionFactory.setNetworkRecoveryInterval(100);

      connectionFactory.setTopologyRecoveryEnabled(false);      // 从使用连接集合里面的地址获取连接
 connection = connectionFactory.newConnection(addresses, "Producer");      // 添加重连监听器
      ((Recoverable) connection)
          .addRecoveryListener(              new RecoveryListener() {                // 重连成功后的回调
                public void handleRecovery(Recoverable recoverable) {
                  System.out.println(Instant.now().toString() + " 已重新建立连接");
                }                // 开始重连时的回调
                public void handleRecoveryStarted(Recoverable recoverable) {
                  System.out.println(Instant.now().toString() + " 开始尝试重连");
                }
              });      // 从链接中创建通道
      channel = connection.createChannel();      // 声明队列,如果队列不存在,会创建
      // RabbitMQ 不允许声明两个队列名相同,属性不同的队列,否则会报错
      channel.queueDeclare("queue-test", true, false, false, null);      for (int i = 0; i < 100; i++) {
        String message = "Hello Rabbit " + i;        try {          // 发送消息
          channel.basicPublish("", "queue-test", null, message.getBytes());

        } catch (Exception e) {          // 可能连接已关闭,等待重连
          System.out.println("消息 " + message + " 发送失败!");
          i--;
          TimeUnit.SECONDS.sleep(2);          continue;
        }

        System.out.println("消息" + i + "已发送!");
        TimeUnit.SECONDS.sleep(2);
      }

    } catch (Exception e) {
      e.printStackTrace();
    } finally {      // 关闭通道
      if (channel != null && channel.isOpen()) {        try {
          channel.close();
        } catch (Exception e) {
          e.printStackTrace();
        }
      }      // 关闭连接
      if (connection != null && connection.isOpen()) {        try {
          connection.close();
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
    }
  }
}

Consumer示例:

import com.rabbitmq.client.*;import java.io.IOException;import java.time.Instant;public class Consumer {  public static void main(String[] args) {    // 创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();    // 设置连接属性
    connectionFactory.setUsername("test-user");
    connectionFactory.setPassword("test-user");
    connectionFactory.setVirtualHost("v1");    // 设置每个节点的链接地址和端口
    Address[] addresses =        new Address[] {          new Address("192.168.0.22", 5672),          new Address("192.168.0.23", 5672),          new Address("192.168.0.24", 5672)
        };

    Connection connection = null;
    Channel channel = null;    try {      // 开启/关闭连接自动恢复,默认是开启状态
      connectionFactory.setAutomaticRecoveryEnabled(true);      // 设置每100毫秒尝试恢复一次,默认是5秒
      connectionFactory.setNetworkRecoveryInterval(100);      // 从连接工厂获取连接
 connection = connectionFactory.newConnection(addresses, "Consumer");      // 添加重连监听器
      ((Recoverable) connection)
          .addRecoveryListener(              new RecoveryListener() {                // 重连成功后的回调
                public void handleRecovery(Recoverable recoverable) {
                  System.out.println(Instant.now().toString() + " 已重新建立连接");
                }                // 开始重连时的回调
                public void handleRecoveryStarted(Recoverable recoverable) {
                  System.out.println(Instant.now().toString() + " 开始尝试重连");
                }
              });      // 从链接中创建通道
      channel = connection.createChannel();      // 声明队列,如果队列不存在,会创建
      // RabbitMQ 不允许声明两个队列名相同,属性不同的队列,否则会报错
      channel.queueDeclare("queue-test", true, false, false, null);      // 定义收到消息后的回调
      final Channel finalChannel = channel;
      DeliverCallback deliverCallback =          new DeliverCallback() {

            @Override            public void handle(String consumerTag, Delivery message) throws IOException {
              System.out.println("收到消息:" + new String(message.getBody(), "UTF-8"));
              finalChannel.basicAck(message.getEnvelope().getDeliveryTag(), false);
            }
          };      // 监听队列
      channel.basicConsume(          "queue-test",          false,
          deliverCallback,          new CancelCallback() {
            @Override            public void handle(String consumerTag) throws IOException {}
          });

      System.out.println("开始接收消息");
      System.in.read();

    } catch (Exception e) {
      e.printStackTrace();
    } finally {      // 关闭通道
      if (channel != null && channel.isOpen()) {        try {
          channel.close();
        } catch (Exception e) {
          e.printStackTrace();
        }
      }      // 关闭连接
      if (connection != null && connection.isOpen()) {        try {
          connection.close();
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
    }
  }
}

RabbitMQ常用管理命令

rabbitmqctl status 查看节点状态

rabbitmqctl stop [pid_file] 停止运行的RabbitMQ的Erlang虚拟机和RabbitMQ服务应用 如果指定了pid_file,还需要等待指定进程的结束。pid_file是通过调用rabbitmq-server命令启动RabbitMQ服务时创建的,默认情况下存放于Mnesia目录中。 如果使用rabbitmq-server -detach这个带有-detach后缀的命令来启动RabbitMQ服务则不会生成pid_file文件。

rabbitmqctl stop_app 停止RabbitMQ服务应用,但是Erlang虚拟机还是处于运行状态 此命令的执行优先于其他管理操作(这些操作需要先停止RabbitMQ应用,如rabbitmqctl reset)

rabbitmqctl start_app 启动RabbitMQ应用,此命令典型的用途就是执行了其他管理操作之后,重新启动之前停止的RabbitMQ应用。

rabbitmqctl reset 将RabbitMQ节点重置还原到最初状态 包括从原来的集群中删除此节点,从管理数据库中删除所有的配置数据,如已配置的用户,vhost等,以及删除所有的持久化数据 执行rabbitmqctl reset 命令前必须停止RabbitMQ应用

rabbitmqctl force_reset 强制将RabbitMQ节点重置还原到最初状态。此命令不论当前管理数据库的状态和集群配置是什么,都会无条件的重置节点,只能在数据库或集群配置已损坏的情况下使用

rabbitmqctl [-n nodename] join_cluster {cluster_node} [—ram] 将节点加入指定的集群中。在这个命令执行前需要停止RabbitMQ应用并重置节点。 -n nodename:指定需要操作的目标节点,如rabbit@node1 cluster_node:需要加入的集群节点名,如rabbit@node2 —ram:集群节点类型,有ram,disc两种,默认为disc

  • ram 内存节点,所有元数据都存储在内存中
  • disc 磁盘节点,所有元数据都存储在磁盘中

rabbitmqctl cluster_status 查看集群状态

rabbitmqctl change_cluster_node_type {disc|ram} 修改集群节点的类型,使用此命令前要停止RabbitMQ应用

rabbitmqctl forget_cluster_node [—offline] 将节点从集群中删除,允许离线执行

rabbitmqctl update_cluster_nodes {clusternode} 在集群中的节点应用启动前咨询clusternode节点的最新信息,并更新相应的集群信息。这个和join_cluster不同,它不加入集群

rabbitmqctl force_boot 确保节点可以启动,即使它不是最后一个关闭的节点

rabbitmqctl set_cluster_name {name} 设置集群名称。集群名称在客户端连接时会通报给客户端 集群名称默认是集群中第一个节点的名称,通过这个命令可以重新设置

Federation插件

Federation插件的设计目标是使RabbitMQ在不同Broker节点之间进行消息传递而无需建立集群,该功能在以下场景下非常有用:

  • 各个节点运行在不同版本的Erlang和RabbitMQ上
  • 网络环境不稳定,如广域网当中

Federation的作用:

Shovel插件

Shovel与Federation具备的数据转发功能类似。Shovel能够可靠,持续的从一个Broker中的队列(作为源端,即source)拉取数据并转发至另一个Broker的交换器(作为目的端,即destination)

Shovel的主要优势: 松耦合,shovel可以移动位于不同管理域中的Broker或者集群上的消息,这些Broker或者集群可以包含不同的用户和vhost,也可以使用不同的RabbitMQ和Erlang版本 支持广域网,Shovel插件同样基于AMQP协议在Broker之间进行通信,被设计成可以容忍时断时续的连通情形,并且能够保证消息的可靠性 高度定制,当Shovel成功连接后,可以对其进行配置以执行相关的AMQP命令

Federation/Shovel与Cluster的区别与联系

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-05-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Coding Diary 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • RabbitMQ高可用集群方案
  • RabbitMQ集群搭建
  • 镜像队列模式集群
  • RabbitMQ常用管理命令
  • Federation插件
  • Shovel插件
  • Federation/Shovel与Cluster的区别与联系
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档