首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

RabbitMQ的运用

RabbitMQ的运用

RabbitMQ特性在平台的实现

RabbitMQ的相关消息通讯特性在平台中是如何具体实现的:

1

RabbitMQ特性在平台的逻辑实现

下面是平台在引入RabbitMQ后,在平台内部实现分布式队列通讯的逻辑架构图。

在图中,有带箭头流向的处理流程,就是一个交易在RabbitMQ的消息发送、消息接收、消息匹配的流转中,在平台内部完成业务流转的完整过程,具体的处理流程(按带数字的流向)如下所示:

1、根据交易配置信息,调度服务获取服务调度流程,由请求发送服务将相应的服务流信息推送到对应的交换器BS上,交换器将对应的消息路由到绑定的队列BS-2上;

2、业务子系统BS-2通过请求监听服务获取其绑定队列BS-2中的请求报文信息;

3、调用请求发送向对应队列写入消息;

4、跨子系统的请求监听服务获取队列中的请求报文信息;

5、跨子系统的请求发送向对应交换器END推送消息;

6、调度结束队列向服务调度子系统推送处理结束信息;

7、处理完毕。

2

交换器和队列绑定

下面说说RabbitMQ中交换器(Exchange)和队列(Message)之间的关系,以及实现队列通讯的机制。

RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现,而在整个send Message到Receive Message的流程实现过程中,最重要的两个概念就是“交换器(Exchange)”和“队列(Queue)”。

生产者产生的消息并不是直接发送给消息队列Queue的,而是要经过Exchange(交换器),由Exchange再将消息路由到一个或多个Queue。当然这里还会对不符合路由规则的消息进行丢弃掉,这里指的是后续要谈到的Exchange Type。 Queue(队列)在RabbitMQ的作用是存储消息,队列的特性是先进先出。生产者生产消息最终被送到RabbitMQ的内部对象Queue中去,而消费者则是从Queue队列中取出数据。而且,消息从生产产生后,先推送到Exchange,然后又Exchange路由到Queue中。

那么,Exchange是怎样将消息准确的路由到对应的Queue的呢?这就要我们这里要将的Bingding(绑定关系),RabbitMQ是通过Binding将Exchange和Queue链接在一起,这样Exchange就知道如何将消息准确的路由到Queue中去了。

图中左侧,是在平台中使用的队列初始化函数和队列建立时维护与交换器的绑定关系的函数。通过队列、交换器的绑定,就能把二者有机地关联起来,实现RabbitMQ消息流转的推送、路由的桥梁关系。在右边的图示中,我们能比较清晰地看到不同的交换器分别与对应的队列进行了绑定关联。完成AMQP的通讯机制的实现。

平台是怎样实现交换器(exchange)、队列(queue)定义以及二者的绑定呢?

交换器Exchange和队列Queue的声明,以及二者是如何绑定(binding)后建立关联关系的。在RabbitMQ中,声明交换器Exchange,是通过如下的具体操作来实现:

rabbitmqadmin declare exchange name=EX_PSBS type=direct durable=true auto_delete=false

在该声明中,我们要定义交换器Exchange的名称,交换器类型type(具体类型后面再讲)等信息。

在RabbitMQ中,声明队列Queue,是通过这样的操作来实现:

rabbitmqadmin declare queue name=Q_PSBS0101 durable=false node=rabbit@etlrdev auto_delete=false arguments='{"x-message-ttl":60000}'

在该声明中,定义了队列名称,节点,队列超时时间等信息。

在RabbitMQ中,绑定交换器和队列的关系,是通过这样的操作来完成:

rabbitmqadmin declare binding source=EX_PSBS destination=Q_PSBS0101 routing_key=Q_PSBS0101

把指定交换器、队列、队列key指进行关联,就完成相应交换器、队列的绑定。

3

消息的发送、接收和匹配

在平台内部,左边是我们用到的消息发送函数rabbit_put,右边是实现消息发送的处理流程。

1.先调用rabbit_initqueue进行队列初始化,实现绑定队列的可用;

2.调用分布式消息队列连接接口rabbit_connect,获取与分布式消息队列的连接;

3.获取会计日期、交易码、内部交易码、交换器名、消息队列名、消息内容等信息;

4.调用时间戳获取接口GetTimeStamp,获取开始时间,将消息开始时间放入消息属性中 props.app_id = amqp_cstring_bytes( bgnstamp ),获取时间是为了实现对消息的操作超时进行保证;

5.若存在消息匹配号,则在消息属性中设置消息匹配号props.correlation_id = amqp_cstring_bytes( msgid );

6.设置消息传输属性(非持久化或持久化),props.delivery_mode = dureable;

7.调用消息发送接口amqp_put,将消息发送到对应的交换器再由交换器路由到绑定的消息队列;

8.根据返回判断发送是否成功,如果返回“需要重新连接”则调用分布式消息队列连接接口rabbit_connect重新与分布式消息队列建立连接;

9.如果返回成功,则表示消息推送成功。

下面是消息的接收,左边是我们封装的消息接收函数rabbit_get,右边是实现消息接收的处理流程。

1.变量初始化;

2.获取服务配置通道编号参数;

3.根据子系统编号计算交换器名称,sprintf( exchange,"EX_%s", getenv("PLAT_CD") );

4.根据子系统编号、节点编号、通道编号计算接收队列名称,sprintf( qname, "Q_%s%s%s", getenv("PLAT_CD"),getenv("HOST_ID"), chnl );

5.调用分布式消息队列连接接口rabbit_connect,获取与分布式消息队列的连接;

6.调用分布式消息队列初始化接口rabbit_initqueue,初始化队列信息;

7.调用消费者初始化接口rabbit_consumeinit,初始化消费者;

8.循环调用消息获取接口rabbit_get,获取队列中的消息;

9.将接收到的消息保存至msgbuf中;

10.返回成功,消息接收处理完毕。

来看一下消息接收相关服务在ubbconfig中需要配置的信息。

我们看到对于消息接收服务psmsgrcv_server,在该节点部署了2个server,对应的Tuxedo队列是q_psrcv1,连接的数据库是DB1,对应的实例是01。同时,另外部署2个psmsgrcv_server做并发处理,对应的uxedo队列是q_psrcv2,连接的数据库是DB1,对应的实例是02。

在RabbitMQ中,对应的消息接收队列分别是Q_PS0101、Q_PS0102。这样,就能实现两组消息接收服务分别并发处理,以提升消息处理性能。

以下是平台内部RabbitMQ进行消息匹配的实现。左边是封装的消息匹配函数,右边是实现消息匹配的处理流程。下面我们重点说一下消息匹配流程,实际的处理流程包括以下步骤:

1.变量初始化;

2.获取服务配置通道编号参数;

3.根据子系统编号计算交换器名称,strcpy( exchange,"EX_END" );

4.根据节点编号、通道编号计算接收队列名称,sprintf( qname, "Q_END%s%s", getenv("HOST_ID"), chnl );

5.调用分布式消息队列连接接口rabbit_connect,获取与分布式消息队列的连接;

6.调用分布式消息队列初始化接口rabbit_initqueue,初始化队列信息;

7.调用消费者初始化接口rabbit_consumeinit,初始化消费者;

8.设置超时时间,timeout.tv_sec = time_out;

9.循环调用消息匹配接口rabbit_matchmsg,匹配队列中的消息;

10.调用FML报文分配接口tux_Alloc,为待解包报文分配总线;

11.初始化总线;

12.调用字符流消息转换接口String2Fml,将字符流报文转换成FML报文;

13.将接收到的消息保存至msgbuf中;

14.返回成功,消息匹配处理完毕。

下面是消息接收相关服务在ubbconfig中需要配置的信息。

对于消息匹配的处理做3点强调:

第一,就是在消息处理时,我们在发送消息时,要在消息属性中加入correlation_id属性,这样就能对消息进行甄别,有助于我们对消息的分类处理,提升消息的消费效率。

第二,消费者初始化时设置消息需要ack,也就是进行消息的应答确认,通过该机制可以确保消息的准确消费,在生产者没有收到回值时可以抛出异常提示,提醒消费者进行相应的排斥处理,提高安全性。

第三,在接收消息时,使用msg_id与correlation_id进行匹配。通过匹配成功或失败进行不同的消息处理操作,并针对超时与否进行不同的处理,达到有效确保消息匹配成功并进行后续处理的目的。

4

队列和交换器的模式介绍及在平台的选型

简单介绍一下在无交换器模式下,消息队列的两个情况:

模式1 :生产者VS消费者 = 1 : 1

也就是,一个生产者P发送消息到队列Q,一个消费者C接收。

模式2: 工作队列模式——生产者VS消费者 = 1 : N

也就是,一个生产者,多个消费者,每个消费者获取到的消息唯一,多个消费者只有一个队列。

这里介绍消息队列模式,为后面和交换器绑定关系的建立作基础知识铺垫。

交换器常用的三种模式:Direct Fanout Topic。

Direct类型,指定路由模式,则会将消息中的RoutingKey与该Exchange关联的所有Binding中的BindingKey进行比较,如果相等,则发送到该Binding对应的Queue中。

任何发送到Direct Exchange的消息都会被转发到RouteKey中指定的Queue。

1.一般情况可以使用rabbitMQ自带的Exchange:”"(该Exchange的名字为空字符串,下文称其为default Exchange)。

2.这种模式下不需要将Exchange进行任何绑定(binding)操作。

3.消息传递时需要一个“RouteKey”,可以简单的理解为要发送到的队列名字。

4.如果vhost中不存在RouteKey中指定的队列名,则该消息会被抛弃。

Fanout 类型,也就是不规则路由模式,会将消息发送给所有与该 Exchange 定义过 Binding 的所有 Queues 中去,其实是一种广播行为。

任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上。

1.可以理解为路由表的模式;

2.这种模式不需要RouteKey;

3.这种模式需要提前将Exchange与Queue进行绑定,一个Exchange可以绑定多个Queue,一个Queue可以同多个Exchange进行绑定;

4.如果接受到消息的Exchange没有与任何Queue绑定,则消息会被抛弃。

Topic类型,也就是通配符路由模式,则会按照正则表达式,对RoutingKey与BindingKey进行匹配,如果匹配成功,则发送到对应的Queue中。

任何发送到Topic Exchange的消息都会被转发到所有关心RouteKey中指定话题的Queue上。

1.这种模式较为复杂,简单来说,就是每个队列都有其关心的主题,所有的消息都带有一个“标题”(RouteKey),Exchange会将消息转发到所有关注主题能与RouteKey模糊匹配的队列;

2.这种模式需要RouteKey,也许要提前绑定Exchange与Queue;

3.在进行绑定时,要提供一个该队列关心的主题,如“#.log.#”表示该队列关心所有涉及log的消息(一个RouteKey为”MQ.log.error”的消息会被转发到该队列);

5.同样,如果Exchange没有发现能够与RouteKey匹配的Queue,则会抛弃此消息。

经过分析,我们结合平台实际场景需求,针对3种模式进行选择:对普通的服务流处理的消息发送、接收和匹配,我们采用了Direct 模式,用指定路由模式实现到绑定队列的消息传递;对平台内部子系统间日切通知、候补队列,我们采用了Fanout模式,即用广播形式,实现对消息的传递。

在这里,我们比较了一下三种不同的交换器适用的场景,以及在平台实现的场景情况。其中,对于TopicExchange模式,虽然暂无应用,但我们相信在未来的项目建设中根据实际需求,一定会有与之对应的业务场景实现和功能落地。

RabbitMQ实践经验分享

01

死信队列

死信队列DLX,全称是Dead-Letter-Exchange,可以称之为死信交换器,在业内也被称之为死信信箱。当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX。

消息变成死信一般有一下几种情况:

消息被拒绝(basic.reject/ basic.nack)并且requeue=false;

消息TTL过期(参考:RabbitMQ之TTL(Time-To-Live 过期时间));

队列达到最大长度。

下面图中,可以看到针对管理平台配置的死信队列。可以做参考。

02

后补交换器

后补交换器实现原理也很简单:当队列阻塞等异常情况发生时,导致交换器无法向所绑定的队列继续路由消息,为保证消息不积压不丢失,将消息路由到候补交换器上,由候补交换器路由到对应的绑定队列上,实现消息的继续消费。

03

管理平台RabbitMQ集群分布

再简单介绍下管理平台的RabbitMQ集群实现方案。如图所示,一共有3台物理主机,分别为rbt1、rbt2、rbt3,其中rat1、rat3是disk-node,rat2是RAM-node,通过设置镜像队列,可以简单实现3台机器互为主备的模式。在集群内部,RAM-node(内存节点)将所有的队列、交换机、绑定、用户、权限和 vhost 的元数据定义存储在内存中,好处是可以使得像交换机和队列声明等操作更加的快速。

最后是管理平台的集群部署实际情况的监控。

平台云课堂

为科技人带来有价值有温度的阅读

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20190110G0QNZV00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券