本文主要讲解RabbitMQ的联邦机制,有以下内容
在文章开始之前,我们先介绍一下联邦机制的基本概念。联邦机制的实现,依赖于RabbitMQ的Federation插件,该插件的主要目标是为了RabbitMQ可以在多个 Broker节点或者集群中进行消息的无缝传递。
Federation插件可以让多个交换器和多个队列进行联邦。一个联邦交换器或者一个联邦队列接受上游(位于其他Broker上的交换器和队列)消息。联邦交换器能够将原本发送给上游交换器(upstream exchange)的消息路由到本地的某个队列中;联邦队列则允许一个本地消费者接收到来自上游队列(upstream queue)的消息。
下面先假设一种场景,BrokerA服务部署在上海,BrokerB服务部署在北京。来自上海的ClientA向BrokerA的exchangeA发送消息网络延迟很小,但是北京的ClientB向BrokerA的exchangeA发送消息那么将会面临网络延迟的问题。Federation机制则可以帮助我们解决这个问题。
首先在BrokerA的exchangeA上与北京的BrokerB建立一条单向的Federation Link。此时Federation插件会在BrokerB上建立一个同名的交换器(可以配置,默认同名),并且还会建立一个内部交换器federation:exchangeA->Broker B(其中Broker为集群名称)通过相同的绑定建进行绑定,于此同时Federation插件会建立一个federation:exchangeA->Broker B(BrokerB为集群名称),并且将内部交换器federation:exchangeA->Broker B绑定到该队列。
Federation插件会在队列federation:exchangeA->Broker B与BrokerA中的交换器exchangeA之间建立一条AMQP连接来实时地消费队列federation:exchangeA->Broker B中的数据。这些操作都是内部的,对外部业务客户端来说这条Federation link建立在BrokerA的exchangeA和BrokerB的exchangeA之间。
此时ClientB可以以较小的网络延迟向BrokerB的exchangeA发送消息,并且该消息会被正确路由到BrokerA中的exchangeA中,通过Federation插件我们可以以较小的网络延迟向与客户端属于不同地域的Broker节点发送消息。
"max_hops=1"表示一条消息最多被转发的次数为1。
默认的交换器(每个vhost下都会默认创建一个名为""的交换器)和内部交换器,不能对其使用Federation的功能。
队列queue1和queue2原本在broker2中,由于某种需求将其配置为federated queue并将broker1作为upstream。Federation插件会在broker1上创建同名的队列queue1和queue2,与broker2中的队列queue1和queue2分别建立两条单向独立的Federation link。当有消费者ClientA连接broker2并通过Basic.Consume消费队列queue1(或queue2)中的消息时,如果队列queue1(或queue2)中本身有若干消息堆积,那么ClientA直接消费这些消息,此时broker2中的queue1(或queue2)并不会拉取broker1中的queue1(或queue2)的消息;如果队列queue1(或queue2)中没有消息堆积或者消息被消费完了,那么它会通过Federation link拉取在broker1中的上游队列queue1(或queue2)中的消息(如果有消息),然后存储到本地,之后再被消费者ClientA进行消费。
和federated exchange不同,一条消息可以在联邦队列间转发无限次。两个队列可以互为联邦队列。
如果两个队列互为联邦队列,队列中的消息除了被消费,还会转向有多余消费能力的一方,如果这种"多余的消费能力"在broker1和broker2中来回切换,那么消费也会在broker1和broker2中的队列queue中来回转发
federation queue只能使用Basic.Consume进行消费,并且不具备传递性。
Federation插件默认在RabbitMQ发布包中,执行rabbitmq-plugins enable rabbitmq_federation命令可以开启Federation功能。rabbitmq-plugins enable rabbitmq_federation命令会同时开启amqp_client插件。如果要开启Federation的管理插件,需要执行rabbitmq-plugins enable rabbitmqfederation_management命令。
当需要在集群中使用Federation功能的时候,集群中所有的节点都应该开启Federation插件。
在前面的章节中我们搭建了一个3个节点的rabbitmq集群(假设在上海),我们在搭建一个两个节点的集群(假设在北京),这个读者自行去搭建吧。在我们第一个集群中默认的vhost下有一个exchangeA和queueA,我们北京的一个客户端需要向exchangeA中发消息,为了减轻网络延迟,我们使用联邦机制,将北京的集群作为上行资源,下面我们讲一下配置。
首先在上海集群中配置Federation Upstreams,配置如下图:
在配置完了Federation Upstreams之后,再定义一个Policy用于匹配交换器exchangeA,并且使用刚才的Federation Upstreams配置,如下图:
当策略配置完毕之后,Federation将会在北京的集群上建立一个beijingExchange,并建立Federation link,如下图: