前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >深度剖析RabbitMQ可靠性消息投递以及实践方案

深度剖析RabbitMQ可靠性消息投递以及实践方案

作者头像
程序猿DD
发布2020-05-26 16:01:25
8460
发布2020-05-26 16:01:25
举报
文章被收录于专栏:程序猿DD程序猿DD

作者 | 阿飞的博客

来源 | 阿飞的博客

一般而言,如果你选择RabbitMQ,那肯定就是把可靠性放在第一位。毕竟,RabbitMQ可是金融行业消息队列的标配。如果把性能放在第一位,那毫无疑问,必须是Kafka。但是,可靠性毕竟是相对的,就拿大火的阿里云,AWS云,或者传统的IBM小型机,Oracle数据库,没有谁敢说自己可靠性100%,都是说几个9。所以,本文的目的很明确,就是尽可能的提高我们RabbitMQ的可靠性,从发送、存储、消费、集群、监控、告警等多个维度给出可行性方案,指导开发者以及运维人员获取更加可靠的消息投递,保障我们的业务系统安全、可靠、稳定的运行。

数据可靠性是和RabbitMQ节点、生产者、消费者以及服务器等息息相关的。本文比较长,大概分为如下几个段落:

  1. 确认机制
  2. 生产者
  3. 消费者
  4. 队列镜像
  5. 告警
  6. 监控和Metrics
  7. 健康检查

如下是一张RabbitMQ架构图,本文对可靠性的分析,会涉及到架构图中的方方面面:

1. 确认机制

当连接出现问题的时候,在客户端和服务端之间的消息可能正在投递中,还没有被Broker接收,它们可能正在被编码或者解码,或者一些其他的情况。在这种场景下,消息并没有被投递,那么它们是需要被重新投递以保障业务稳定性。确认机制让服务端和客户端知道什么时候需要做这些事情,它对于生产者和消费者保障数据安全是非常重要的。

确认机制能被用在两个方向:允许消费者告诉服务器(Broker)它已经收到了消息,也允许服务器告诉生产者它接收到了消息。前者就是我们常说的消费者Ack,后者就是我们常说的生产者Confirm。

1.1 生产者/消费者确认

生产者确认以及消费者确认接下来会单独的段落进行详细的介绍,并且有示例代码,这里就不过多的讲解了。

1.2 确认机制总结

确认机制的使用,能够保证最少一次(at least once)投递。如果没有确认机制,消息就非常可能会丢失,这时候只能保证最多一次(at most once )。至于恰好一次投递,目前还没有哪个中间件可以保证,毕竟分布式系统非常复杂,尤其是网络的不可控,不确定的因素太多太多。

2. 生产者

当使用确认机制的时候,生产者从连接或者channel故障中恢复过来时,会重发没有被Broker确认签收的消息。如此一来,消息就可能被重复发送,因为可能是由于网络故障等原因,Broker发送了确认,但是生产者没有收到而已。亦或者,消息压根就没有发送到Broker那里去。正因为生产者为了可靠性可能会重发消息,所以在消费者消费消息处理业务时,还需要去重,或者对接受到的消息做幂等处理(推荐幂等处理)。

生产者增加确认机制非常简单,channel开启confirm模式,然后增加监听即可:

代码语言:javascript
复制
// 选择确认机制
channel.confirmSelect();
// 确认消息监听
channel.addConfirmListener(new ConfirmListener() {
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("消息已经ack,tag: " + deliveryTag);
    }

    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
	    // 对于消费者没有ack的消息,可以做一些特殊处理
        System.out.println("消息被拒签,tag: " + deliveryTag);
    }
});

说明:RabbitMQ还有事务机制(txSelect、txCommit、txRollback),也能保障消息的发送。不过事务机制是「同步阻塞」的,所以不推荐使用。而confirm模式是「异步」机制。如下图所示(图片摘自《Rabbit实战指南》,皮皮厮的书籍,非常值得一看),是事务机制以及confirm方式TPS性能对比,我们可以很明显的看到,事务机制是性能最差的:

在一些很重要的业务场景,我们还需要确保消息被正确路由到了队列。为了确保消息被正确路由到一个已知的队列,我们需要确保消息被正确的从交换器传递到了队列中,并且还需要确保目标队列至少有一个消费者。我们试想,如果一个队列都没有任何消费者,那发送的消息相当于石沉大海了。

前者,我们可以通过给交换器绑定一个备份交换器解决这个问题。比如一个direct类型的交换器,如果发送消息时路由KEY不匹配,那么这条消息就会进入备份交换器中,而不会被丢失:

代码语言:javascript
复制
Map<String, Object> argsMap = new HashMap<>();
argsMap.put("alternate-exchange", ALTER_EXCHANGE_NAME);
// map参数中alternate-exchange的值,就是申明绑定的备份交换器名称
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, argsMap);

❝RabbitMQ支持的4种交换器类型中,只有fanout是不存在路由不到队列的情况,因为它会自动路由到所有队列中,跟绑定KEY没有任何关系。所以,在满足你业务的前提下,笔者建议,尽可能使用fanout类型交换器。 ❞

后者,我们可以通过死信交换器(DLX,也被称为私信队列)来解决这个问题,假设一些消息没有被消费,那么它就会被转移到绑定的死信交换器上,对于这类消息,我们消费并处理死信队列即可:

代码语言:javascript
复制
Map<String, Object> argsMap = new HashMap<>();
// 死信交换器/死信队列
argsMap.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME);
// 设置队列过期时间(第一次设置一个值后,以后不能设置一个更大的值)
argsMap.put("x-message-ttl", 60000);

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
// 死信的关系一定要在queue申明时指定,而不能在exchange申明时指定
channel.queueDeclare(QUEUE_NAME, true, false, false, argsMap);

3. 消费者

只有消费者确认的消息,RabbitMQ才会删除它,不确认就不会被删除。所以,在消费端,建议关闭自动确认机制。应该在收到消息处理完业务以后,手动确认消息。消费者手动确认实现代码如下:

代码语言:javascript
复制
DefaultConsumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                               AMQP.BasicProperties props, byte[] body){
        System.out.println("死信队列接受到的消息:" + new String(body));
        // 手动确认消息接受成功
        channel.basicAck(envelope.getDeliveryTag(), false);
        // channel.basicNack(envelope.getDeliveryTag(), false, false);
    }

};
// 推模式,并且关闭自动确认机制,即autoAck=false
channel.basicConsume(QUEUE_NAME, false, consumer);

注意上面的basicAck方法:void basicAck(long deliveryTag, boolean multiple) 第二个参数multiple。要说明这个参数的含义,首先需要讲清楚一个概念「deliveryTag」,即投递消息唯一标识符,它是一个「单调递增」的Long类型正整数。假设此次basicAck的tag为123130,如果multiple=false,那么表示只确认签收这一条消息。如果multiple=true,那么表示确认签收tag小于或等于123130的所有消息。

「最大Tag值」:Delivery tag是一个64位长整型值,也就是说它的最大值是9223372036854775807(2^64-1),不过这个tag值是按channel划分的。也就是说,理论上每一个channel都可以发送2^64-1条消息。因此,我们认为在实际情况中,tag基本上不可能达到它的上限值。 ❞

前面提到,由于网络等一些原因引起了故障,就会导致消息被重发。因此,消费端一定要做好处理重复消息的准备,强烈建议在消费端实现「幂等」的业务逻辑。比如以支付送积分为例,那么,消息体中肯定有订单号这个业务属性唯一的ID。那么在消费这个消息送积分的业务代码中,需要根据这个订单号做幂等处理,即「同一个订单号只能送一次积分」

在消费端还有一种情况,就是当前消费者认为它不能处理当前消息。因此,它就拒绝签收(basic.reject或者basic.nack)这个消息。那么生产者也需要监听这些消息并做特殊的业务处理。

RabbitMQ中消费者有3种签收消息方式:1、channel.basicAck(long deliveryTag, boolean multiple):确认签收消息;2、channel.basicNack(long deliveryTag, boolean multiple, boolean requeue):不确认签收消息,并设置是否重入队列中,并且可以批量不确定签收tag之前的所有还未ack的消息;3、channel.basicReject(long deliveryTag, boolean requeue):直接拒绝签收这一条消息,并设置是否重入队列中。

4. 队列镜像

为了防止在Broker中丢失消息,交换器、队列和消息都应该设置为持久化。除此以外,队列和消息还应该被复制,为了应对操作系统未及时fsync刷盘、Broker重启、Broker服务器硬件故障、或者Broker crash故障等问题。

集群节点提供了冗余能力,能容忍单节点故障。在RabbitMQ集群中,所有的定义都可以被复制,例如交换器、绑定关系、用户等。但是队列有所不同,默认只存在一个节点上(这一点和kafka很不一样,kafka的Topic可以设置多个副本)。不过,可以通过配置把队列「镜像」到多个节点上,从而让队列不会有单点故障的问题:

当节点发生故障时,如果这时候有镜像队列,还会自动发生新的选举,就能选举出新的Master。从而不会因为某个节点故障而导致队列不可用,而且整个过程对业务完全无感知。

不过需要注意的是,镜像并不能保证所有队列的高可用,「排他性队列」(Exclusive Queues)就不行。因为排他性队列生命周期与它的连接绑定在一起,当Broker节点发生故障重启后,排他性队列是会自动删除的,因此不能被镜像(Mirrored)!

5. 告警

这里我们主要说一下磁盘告警以及内存告警。因为出现下面两种情况时,RabbitMQ会停止从客户端网络socket中读数据: 1、内存使用达到配置上限;2、磁盘使用达到配置上限;

内存使用上限有3种设置方式:

代码语言:javascript
复制
## 设置节点可使用RAM百分比,超过这个百分比就会告警
vm_memory_high_watermark.relative = 0.4
## 设置节点可使用RAM的上限,单位为byte即字节
vm_memory_high_watermark.absolute = 1073741824
## RabbitMQ 3.6.0+,设置节点可使用RAM的绝对大小,它的优先级低于relative
vm_memory_high_watermark.absolute = 2GB

磁盘使用上限也有3种设置方式:

代码语言:javascript
复制
## 设置磁盘剩余空间阈值,当可用空间低于这个值就会触发告警
disk_free_limit.absolute = 51200

## with RabbitMQ 3.6.0+.
disk_free_limit.absolute = 500KB
disk_free_limit.absolute = 50MB
disk_free_limit.absolute = 5GB

## 设置磁盘剩余空间为有效RAM的多少倍,比如当前可用RAM为2G,且设定这个参数为2。那么当磁盘可用空间低于2*2=4G时就会触发告警
disk_free_limit.relative = 2.0

❝由上可知:开启磁盘告警以及内存告警非常简单,需要说明的是,设置绝对值大小时,MB和M是不一样的,M即MiB表示 mebibytes (2^20 ,1,048,576 bytes),而MB表示 megabytes (10^6 ,1,000,000 bytes)。 ❞

只要满足这两种情况其中的一种情况,服务器就会临时阻塞连接,并且连接心跳也会断开。这时候我们通过rabbitmqctl或者RabbitMQ管理后台查看,所有网络连接都是「blocking」。如果我们是RabbitMQ集群,那么内存和磁盘告警就会影响整个集群。只要其中一个节点内存或者磁盘达到上限,整个集群的所有节点都会阻塞连接。

一些客户端包是支持连接阻塞提醒的,如果你使用的客户端包恰好也支持,建议基于此增加监控告警。此外,运维需要针对RabbitMQ集群服务器增加内存和磁盘使用率告警。假设RabbitMQ磁盘告警是只剩10%,那么运维设置的告警应该要大于10%,比如15%,从而能够提前发现隐患,提前介入处理问题,避免磁盘使用达到上限而引起线上问题,背个P0故障可不好看。

6. 监控和Metrics

生产环境我们应该有完善且合理的监控机制,从而做到防患于未然。监控能在问题还未暴露出来时,就能提前发现问题。监控最重要的事情就是对Metrics的采集和分析。我们把Metrics分为两大类:「RabbitMQ的Metrics」「基础设施的Metrics」

6.1 基础设施的Metrics

这块的监控,我们需要收集运行RabbitMQ节点的所有服务器,以及应用的一些metrics,包括但不限于:

  1. CPU统计情况;
  2. 内存使用率;
  3. 虚拟内存统计;
  4. RabbitMQ节点数据目录下磁盘剩余可用空间;
  5. 磁盘IO情况;
  6. 网络吞吐量(接收量、发送量、最大网络吞吐量等);
  7. 网络延迟情况(RabbitMQ集群所有节点以及客户端之间的网络延迟);
  8. 文件描述符;

对基础设施以及系统内核Metrics的监控工具非常多,而且都很成熟。比如:Prometheus、Datadog、Zabbix等。它们都能很好的收集Metrics信息,然后存储并可视化展示,并且可以自定义告警规则。

6.2 RabbitMQ的Metrics

接下来我们说一下对RabbitMQ集群本身的监控。RabbitMQ的管理后台UI暴露了节点很多metrcis信息,并且RabbitMQ还通过HTTP API把这些信息暴露出来方便我们二次开发,自定义监控系统就非常依赖它的HTTP API。HTTP API访问参考:curl -i -u root:root123 'http://localhost:15672/api/overview'。

我们先说RabbitMQ管理后台,这个后台做的还是很不错的。但是如果用于监控的话,就显得有些不足了:

  1. 强依赖被监控的系统(应该要解耦被监控的系统和监控系统);
  2. 只存储最近一天的数据(超过一天的数据就没有了);
  3. 用户接口不够强大;
  4. 它的权限系统是依赖RabbitMQ权限系统的;

可喜的是,RabbitMQ从3.8版本开始,已经支持Prometheus和Grafana了,也推荐应用在生产环境上。接下来说一下RabbitMQ监控系统需要关注RabbitMQ的哪些metrics。

6.2.1 集群Metrics

我们可以通过 'http://localhost:15672/api/overview' 得到RabbitMQ集群相关信息,。结果(部分字段)如下:

代码语言:javascript
复制
{
	... ...
	"message_stats": {
		"ack": 212,
		"ack_details": {
			"rate": 0.0
		},
		"confirm": 143,
		"confirm_details": {
			"rate": 0.0
		},
		"publish": 323,
		... ...
	},
	"queue_totals": {
		"messages": 197,
		"messages_details": {
			"rate": 0.0
		},
		...
	},
	"object_totals": {
		"channels": 2,
		"connections": 2,
		"consumers": 1,
		"exchanges": 24,
		"queues": 10
	},
	... ...
}

这里我们需要关注的一些主要指标有:

  • message_stats.ack:消费者确认接受消息的数量;
  • message_stats.confirm:生产者得到Broker已经确认的消息(ConfirmListener中的handleAck);
  • message_stats.publish:最近发布的消息总量(它只有最近的数据,没有RabbitMQ集群整个生命周期的数据。前面两个指标ack和confirm一样,也都是最近的数据。如果RabbitMQ闲置一段时间,这几个值都会归零);
  • object_totals.channels:channel数量,等价于RabbitMQ管理后台「Channels」页面中交换器数量;
  • object_totals.connections:连接数量,等价于RabbitMQ管理后台「Connections」页面中交换器数量;
  • object_totals.consumers:消费者数量;
  • object_totals.exchanges:交换器数量,等价于RabbitMQ管理后台「Exchanges」页面中交换器数量;
  • object_totals.queues:队列数量,等价于RabbitMQ管理后台「Queues」页面中队列数量;

说明:object_totals这个节点下几个字段的值,我们也可以在RabbitMQ管理后台「Overview」页面的「Global counts」中得到,如下图所示:

6.2.2. 节点Metrics

首先,我们通过 'http://localhost:15672/api/nodes/' 可以获取到RabbitMQ集群中所有节点的详细信息,它返回的是一个JSON数组。也可以通过 'http://localhost:15672/api/nodes/{nodeName}' 获取某个节点信息(这里的nodeName就是第一个URL结果中字段name的值,例如:"name": "rabbit@afeideMacBook-Pro"),所以这个URL返回的是一个JSON对象。结果(部分字段)如下:

代码语言:javascript
复制
{
	"partitions": [],
	"os_pid": "36637",
	"fd_total": 4864,
	"sockets_total": 4285,
	"mem_limit": 3435973836,
	"mem_alarm": false,
	"disk_free_limit": 50000000,
	"disk_free_alarm": false,
	"proc_total": 1048576,
	"rates_mode": "basic",
	"uptime": 115368753,
	"run_queue": 1,
	"processors": 4,
	... ...
}

这里我们需要关注的一些主要指标有:

  • mem_used:已经使用的内存;
  • mem_limit:限制最大允许使用的内存;
  • mem_alarm:bool类型值,是否开启了内存告警;
  • disk_free_limit:磁盘还剩下多少空间告警阈值,也就是说当磁盘可用空间小于这个值时就会告警;
  • disk_free_alarm:bool类型值,是否开启了磁盘告警;
  • fd_total:总计可用文件描述符数量;
  • fd_used:已经使用的文件描述符数量;
  • sockets_total:总计可用sockets数量;
  • sockets_used:已经使用sockets数量;

6.2.3 队列Metrics

队列的metrics都可以通过'http://localhost:15672/api/queues/afei/queue-normal-afei'获取。得到的json结果如下:

代码语言:javascript
复制
{
	"consumer_details": [],
	"arguments": {
		"x-dead-letter-exchange": "exchange-dlx-afei",
		"x-message-ttl": 3000
	},
	"auto_delete": false,
	... ...
	"idle_since": "2020-05-05 7:45:58",
	"incoming": [],
	"memory": 19900,
	"message_bytes": 0,
	... ...
	"state": "running",
	"vhost": "/afei"
}

我们需要关注的一些主要指标有:

  • memory:使用的内存;
  • messages:未被确认的消息总数 + 准备投递的消息总数;
  • messages_ready:准备投递的消息总数;
  • messages_unacknowledged:未被确认的消息总数;
  • state:当前队列的状态,running状态表示正常;
  • idle_since:表示当前队列有多久没有被消费者消费了,如果这个时间相比当前时间是很久以前,很有可能说明当前队列没有生产者已经废弃。

6.2.4 应用Metrics

现在大部分系统都是分布式的,在这样的系统中,当某个组件出问题时,通常很难立即定位问题。所以,系统的每个部分,包括应用本身,都应该被监控起来。

一些基础设施级别和RabbitMQ的Metrics能看出系统当前有异常行为和问题,但是,还不能定位根据原因。例如,通过监控我们很容易知道某个节点磁盘空间不足,但是很难麻烦就知道为什么不足。所以,我们需要应用Metris信息来协助我们排查这样的问题:区分哪个消息生产者速度异常、哪个消息者出现重复性的失败、消费者速度赶不上消息产生的速度等。

应用程序跟踪的Metrics可能是特定系统的,但是也有一些Metrics与大多数系统都是有关联的,例如:Connection opening rate、Channel opening rate、Connection failure (recovery) rate、Publishing rate、Delivery rate、Positive delivery acknowledgement rate、Negative delivery acknowledgement rate、Mean/95th percentile delivery processing latency... ...

7. 健康检查

下面推荐一些RabbitMQ监控检查的命令:

  • rabbitmq-diagnostics -q ping:如果节点没有任何问题,那么返回Ping succeeded。
  • rabbitmq-diagnostics -q status:会显示当前RabbitMQ节点即broker的很多信息,例如内存信息、磁盘信息、虚拟内存信息、告警信息、文件描述符等。
  • rabbitmq-diagnostics -q alarms:该命令可以检查RabbitMQ本地以及集群节点是否有告警信息,如果没有,那么返回这样的信息:Node rabbit@afeideMacBook-Pro reported no alarms, local or clusterwide;

说明:「rabbitmq-diagnostics」这个命令还有很多其他的用法,我们可以通过rabbitmq-diagnostics --help查看它还能监控和检查哪些Metrics。

最后推荐一些第三方监控工具,如下表格所示,按照字母排序,排名不分先后。这些工具能力上各有千秋,不过一般都是既能收集基础设施级别的Metrics,也能收集RabbitMQ的Metrics。当然,笔者能力有限,可能还有其他一些好用的工具并没有列举出来:

名称

简介

地址

AppDynamics

RabbitMQ监控扩展,从RaabitMQ管理API上收集RabbitMQ的metrics并上传到AppDynamics Controller上

https://github.com/Appdynamics/rabbitmq-monitoring-extension

AWS CloudWatch

把RabbitMQ集群的Metrics信息推送到AWS CloudWatch上的RabbitMQ插件

https://github.com/noxdafox/rabbitmq-cloudwatch-exporter

DataDog

通过Datadog代理监控RabbitMQ

https://github.com/DataDog/integrations-core/tree/master/rabbitmq

Prometheus

通过Prometheus暴露RabbitMQ一些核心Metrics

https://github.com/rabbitmq/rabbitmq-prometheus

Zabbix

通过Zabbix监控RabbitMQ

https://git.zabbix.com/projects/ZBX/repos/zabbix/browse/templates/app/rabbitmq_http

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-05-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 程序猿DD 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 确认机制
    • 1.1 生产者/消费者确认
      • 1.2 确认机制总结
      • 2. 生产者
      • 3. 消费者
      • 4. 队列镜像
      • 5. 告警
      • 6. 监控和Metrics
        • 6.1 基础设施的Metrics
          • 6.2 RabbitMQ的Metrics
            • 6.2.1 集群Metrics
            • 6.2.2. 节点Metrics
            • 6.2.3 队列Metrics
            • 6.2.4 应用Metrics
        • 7. 健康检查
        相关产品与服务
        Prometheus 监控服务
        Prometheus 监控服务(TencentCloud Managed Service for Prometheus,TMP)是基于开源 Prometheus 构建的高可用、全托管的服务,与腾讯云容器服务(TKE)高度集成,兼容开源生态丰富多样的应用组件,结合腾讯云可观测平台-告警管理和 Prometheus Alertmanager 能力,为您提供免搭建的高效运维能力,减少开发及运维成本。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档