专栏首页shysh95RabbitMQ Federation

RabbitMQ Federation

本文主要讲解RabbitMQ的联邦机制,有以下内容

  1. 联邦交换器
  2. 联邦队列
  3. 联邦使用

在文章开始之前,我们先介绍一下联邦机制的基本概念。联邦机制的实现,依赖于RabbitMQ的Federation插件,该插件的主要目标是为了RabbitMQ可以在多个 Broker节点或者集群中进行消息的无缝传递。

Federation插件可以让多个交换器和多个队列进行联邦。一个联邦交换器或者一个联邦队列接受上游(位于其他Broker上的交换器和队列)消息。联邦交换器能够将原本发送给上游交换器(upstream exchange)的消息路由到本地的某个队列中;联邦队列则允许一个本地消费者接收到来自上游队列(upstream queue)的消息。

联邦交换器

下面先假设一种场景,BrokerA服务部署在上海,BrokerB服务部署在北京。来自上海的ClientA向BrokerA的exchangeA发送消息网络延迟很小,但是北京的ClientB向BrokerA的exchangeA发送消息那么将会面临网络延迟的问题。Federation机制则可以帮助我们解决这个问题。

首先在BrokerA的exchangeA上与北京的BrokerB建立一条单向的Federation Link。此时Federation插件会在BrokerB上建立一个同名的交换器(可以配置,默认同名),并且还会建立一个内部交换器federation:exchangeA->Broker B(其中Broker为集群名称)通过相同的绑定建进行绑定,于此同时Federation插件会建立一个federation:exchangeA->Broker B(BrokerB为集群名称),并且将内部交换器federation:exchangeA->Broker B绑定到该队列。

Federation插件会在队列federation:exchangeA->Broker B与BrokerA中的交换器exchangeA之间建立一条AMQP连接来实时地消费队列federation:exchangeA->Broker B中的数据。这些操作都是内部的,对外部业务客户端来说这条Federation link建立在BrokerA的exchangeA和BrokerB的exchangeA之间。

此时ClientB可以以较小的网络延迟向BrokerB的exchangeA发送消息,并且该消息会被正确路由到BrokerA中的exchangeA中,通过Federation插件我们可以以较小的网络延迟向与客户端属于不同地域的Broker节点发送消息。

"max_hops=1"表示一条消息最多被转发的次数为1。

默认的交换器(每个vhost下都会默认创建一个名为""的交换器)和内部交换器,不能对其使用Federation的功能。

联邦队列

队列queue1和queue2原本在broker2中,由于某种需求将其配置为federated queue并将broker1作为upstream。Federation插件会在broker1上创建同名的队列queue1和queue2,与broker2中的队列queue1和queue2分别建立两条单向独立的Federation link。当有消费者ClientA连接broker2并通过Basic.Consume消费队列queue1(或queue2)中的消息时,如果队列queue1(或queue2)中本身有若干消息堆积,那么ClientA直接消费这些消息,此时broker2中的queue1(或queue2)并不会拉取broker1中的queue1(或queue2)的消息;如果队列queue1(或queue2)中没有消息堆积或者消息被消费完了,那么它会通过Federation link拉取在broker1中的上游队列queue1(或queue2)中的消息(如果有消息),然后存储到本地,之后再被消费者ClientA进行消费。

和federated exchange不同,一条消息可以在联邦队列间转发无限次。两个队列可以互为联邦队列。

如果两个队列互为联邦队列,队列中的消息除了被消费,还会转向有多余消费能力的一方,如果这种"多余的消费能力"在broker1和broker2中来回切换,那么消费也会在broker1和broker2中的队列queue中来回转发

federation queue只能使用Basic.Consume进行消费,并且不具备传递性。

Federation使用

Federation插件默认在RabbitMQ发布包中,执行rabbitmq-plugins enable rabbitmq_federation命令可以开启Federation功能。rabbitmq-plugins enable rabbitmq_federation命令会同时开启amqp_client插件。如果要开启Federation的管理插件,需要执行rabbitmq-plugins enable rabbitmqfederation_management命令。

当需要在集群中使用Federation功能的时候,集群中所有的节点都应该开启Federation插件。

在前面的章节中我们搭建了一个3个节点的rabbitmq集群(假设在上海),我们在搭建一个两个节点的集群(假设在北京),这个读者自行去搭建吧。在我们第一个集群中默认的vhost下有一个exchangeA和queueA,我们北京的一个客户端需要向exchangeA中发消息,为了减轻网络延迟,我们使用联邦机制,将北京的集群作为上行资源,下面我们讲一下配置。

首先在上海集群中配置Federation Upstreams,配置如下图:

  • name:定义这个upstream的名称,必填项
  • uri:定义upstream的AMQP连接,我们这里设置的是amqp://upstreamer:123456@rabbit111:5672(这里是一个北京集群一个节点地址),格式amqp://username:password@host:port
  • Prefetch count:定义Federation内部缓存的消息条数,即在收到上游消息之后且在发送到下游之前缓存的消息条数。
  • Reconnect delay:Federation link由于某种原因断开之后,需要等待多少秒开始重新建立连接。
  • Acknowledgement Mode:定义Federation link 的消息确认方式。其有3种:on-confirm、on-publish、no-ack。默认为on-confirm,表示在接收到下游的确认消息(等待下游的Basic.Ack)之后再向上游发送消息确认,这个选项可以确保网络失败或者Broker宕机时不会丢失消息,但也是处理速度最慢的选项。如果设置为on-publish,则表示消息发送到下游后(并需要等待下游的Basic.Ack)再向上游发送消息确认,这个选项可以确保在网络失败的情况下不会丢失消息,但不能确保Broker宕机时不会丢失消息。no-ack表示无须进行消息确认,这个选项处理速度最快,但也最容易丢失消息。
  • Trust User-ID:设定Federation是否使用"Validated User-ID" 这个功能。如果设置为false或者没有设置,那么Federation会忽略消息的user_id 这个属性;如果设置为true,则Federation只会转发user_id为上游任意有效的用户的消息。
  • Exchange:指定upstream exchange的名称,默认情况下和federated exchange同名,这里我们指定的是beijingExchange
  • Max hops:指定消息被丢弃前在Federation link中最大的跳转次数。默认为1。注意即使设置max-hops参数为大于1的值,同一条消息也不会在同一个Broker中出现2次,但是有可能会在多个节点中被复制。
  • Expires:指定Federation link断开之后,federated queue所对应的upstream queue的超时时间,默认为"none",表示为不删除,单位为ms。这个参数相当于设置普通队列的x-expires参数。设置这个值可以避免Federation link断开之后,生产者一直在向北京集群中的upstream exchange发送消息,这些消息又不能被转发到上海的集群中而被消费掉,进而造成upstream exchange中有大量的消息堆积。
  • Message TTL:为federated queue所对应的upstream queue设置,相当于普通队列的x-message-ttl参数。默认为"none",表示消息没有超时时间。
  • HA policy:为federated queue所对应的upstream queue设置,相当于普通队列的x-ha-policy参数,默认为 "none",表示队列没有任何HA。
  • Queue:执行upstream queue的名称,默认情况下和federated queue同名

在配置完了Federation Upstreams之后,再定义一个Policy用于匹配交换器exchangeA,并且使用刚才的Federation Upstreams配置,如下图:

当策略配置完毕之后,Federation将会在北京的集群上建立一个beijingExchange,并建立Federation link,如下图:

本文分享自微信公众号 - shysh95(shysh95),作者:shysh95

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-04-11

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • RabbitMQ基础使用

    生产消息的应用,生产者需要指定将消息发送到哪个exchange,并且指定routingkey(这是为了exchange可以将消息路由到相关的队列)。

    shysh95
  • RabbitMQ进阶使用

    在入门使用曾提到过,生产者发送消息可以使用mandatory参数,该参数的作用主要是在交换器根据路由键无法匹配队列的时候讲消息返回给生产者,但是需要生产者通过R...

    shysh95
  • RabbitMQ存储和队列结构

    首先确认一个点,持久化和非持久化的消息都会落地磁盘,区别在于持久化的消息一定会写入磁盘(并且如果可以在内存中也会有一份),而非持久化的消息只有在内存吃紧的时候落...

    shysh95
  • IBM Websphere Message Broker(MB) 教程系列-(1) 在Fedora

    Java学习123
  • 通过多种方式将数据导入hive表

    hive官方手册 http://slaytanic.blog.51cto.com/2057708/939950 通过多种方式将数据导入hive表 1.通过外部...

    闵开慧
  • 如何实现VR视频版权保护?VR视频加密方案

    VR视频制作越来越精细,内容越来越丰富,观影体验越来越好,VR视频版权保护问题也越来越受重视,如何能更好的保护VR视频版权,并且不影响视频的观看体验呢?

    点量小崔
  • iOS中CoreData数据管理系列二——CoreData框架中三个重要的类

        在上一篇博客中,介绍了iOS中使用CoreData框架设计数据模型的相关步骤。CoreData框架中通过相关的类将数据——数据模型——开发者无缝的衔接起...

    珲少
  • Docker官方centos镜像下安装elasticsearch【详细步骤】

    这里启动容器选择了一段ip和主机ip映射「-p 9000-9900:9000-9900」可以使用docker port 命令查看具体映射 1docker...

    XING辋
  • Php开发过程中不常碰到的error (2.25更新)

    这个不仅存在于页面解析当中,当使用 curl 请求时拼接的参数有这种格式的也会发生转义

    猿哥
  • SQL*Plus copy 命令处理大批量数据复制

        对于数据库表级上的数据复制,我们最常用的是CREATE TABLE AS(CTAS)..方式。其实在SQL*Plus下面copy命令可以完成同样的工作,...

    Leshami

扫码关注云+社区

领取腾讯云代金券