深入讲解ActiveMQ5.X消息的持久性

我经常被问到一些基本的关于解释消息存储在ActiveMQ中是如何工作的问题。在这里我将做一个高层面的解释。注意,上下文环境是它是在JMS范围内。如果你使用的是ActiveMQ的非JMS客户端(e.g.,STOMP,AMQP,MQTT,等),那么它的行为在一些案例中会有所不同。

ActiveMQ

JMS的持久性保证对于被标记为“持久的”而不能丢失的消息而言是非常强大的. 让我们看下它在ActiveMQ中是如何被运用的.

主题

主题使用了一个广播机制. 它允许我们在JMS领域使用发布订阅语义模型. 但当我们将一条消息标记为“持久的”而它并没有订阅者时会如何? 对于任何一个正常的广播而言 (就如我去市中心大声宣扬ActiveMQ的优势), 如果它没有订阅者 (好比在凌晨3点时根本就没有任何人能听到我) 时会怎么样? 什么也不会发生. 如果消息没有任何的订阅者(无活跃的或可持久的订阅者),那么当消息被发布后(持久或非持久的),ActiveMQ 对此消息不会做任何的事情.

如果消息有可持久的订阅者(活跃或非活跃),那么ActiveMQ只是会存储这些消息. 对于一个非活跃的可持久订阅, ActiveMQ 会将标记为“持久的”消息做持久存储并等待订阅者重新加入订阅,到那时它将会尝试投递消息.

队列

ActiveMQ的队列, 使用“持久的”消息做为一个默认的协议. 基本上我们会阻塞生产者线程并等待实际获得消息的broker的确认:

生产者:

  • 生产者发送消息
  • 生产者阻塞并等待broker的ACK
    • 如果ACK成功,那么生产者会继续发送消息
    • 如果NACK或者超时或者失败,那么会重试

Broker:

  • 接收消息
  • 将消息存储到磁盘
  • 回送ACK

对于 “非持久的”的消息发送, 流程是不一样的. 我们会使用“发送并忘记” 的模式. 主生产者线程不会被阻塞,任何的ACK或其它的响应在ActiveMQ连接传输线程上都是异步的:

  • 生产者发送消息
  • 生产者在线程内继续发送消息而不被阻塞
  • 生产者最终在一个独立的线程而不是主生产者线程中获得ACK

事务性的发送?

我们可以通过一次将多条消息合并发送到broker来提高性能. 这样将对网络和broker存储的使用更加的高效. 当做事务性发送的时候,有一个你需要知道且非常重要的差别, 那就是事务会话的开启和关闭 (回滚/提交) 与broker的交互都是同步的, 但是, 在事务窗口内发送的每条消息却是异步的. 如果一切都顺利那就没有任何问题,因为broker对这些消息进行了批处理. 但是如果有事务错误时会发生什么? 或者broker在保存这些消息时发生没有可用磁盘空间时会怎样?

这时我们需要在发送时设置一个ExceptionListener来监控异常. 当broker无资源可用时,我们也应该设置一个在客户端发送的 “生产者窗口”来允许我们加强对生产者流程的控制. 关于这块更多请查阅ActiveMQ生产者流程控制(http://activemq.apache.org/producer-flow-control.html).

改变默认值

我们可以改变生产者的设置行为:

  • useAsyncSend - 经常异步等待ACK, 甚至在持久性的发送和提交中
  • alwaysSyncSend – 强制所有的发送 (非持久的或事务性的发送也包括在内) 必须等待broker的ACK

使用默认通常是我们所需要的.

存储

对于生产环境使用ActiveMQ, 我建议使用共享存储方式. 这里我们需要了解在理解ActiveMQ保证的时候存储层会发生什么.

ActiveMQ默认会实现 JMS可持久性的需求,最基本的要求是当应用crash了也要有能力将消息从存储中恢复出来. 对于这一点, 我们默认会在文件系统上做一次 “fsync”操作. 这个操作在每个系统上会发生什么取决于每个系统所使用的OS、网络、存储控制器、存储设备等。 这跟你犹如期望使用任何的数据库来存储消息是类似的.

当我们需要将消息写入到事务日志时,我们会要求操作系统通过fsync调用将日志刷到磁盘上. 基本上我们会强制OS使用缓存文件通道将页文件写回到存储介质上并允许存储介质在“存储” 数据到磁盘上时做它所需要做的事情(取决于实现):

有一些存储控制器有一些自身需要刷新的缓存, 磁盘驱动也有自身的缓存. 这里有些缓存是靠电池来支持的,需要定时写回. 要理解ActiveMQ对消息的持久存储, 你就需要理解存储层.

消费者

最后一个谜团是我们如何将消息分发或投递到消费者,且消费者是如何确认消息的。 ActiveMQ 的JMS 库为你做好了一切, 所以你不需要担心你是否会丢失消息.

消息被分发到消费者取决于消费者的“预取”缓冲设置。可通过使用消费者可用的缓存来加速对消息的处理并在处理完后将缓存还回. 在ActiveMQ中,这些预取的消息在控制台里用的是“在飞行中”来代表. 它取决于消费者对消息的处理和确认(这取决于消息的确认模式… 默认模式是自动确认,即当消费者收到消息就会发送ACK.. 对更重要的消息处理你可能希望使用“客户端”确认,也即客户端明确的指示什么时候确认消息, 例如, 在完成一些处理后).

如果消费者因某些原因对消息处理失败,那么任何非确认的消息将会被投递到另一个消费者(如果有),然后执行上面同样的处理方式。broker在未得到ACK前不会将消息从索引中移除。所以这里包含了消费者层和网络层上的失败. 如果在消费者“成功处理”(注意,这里的“成功处理”的因用例的不同含义有所不同)后这两层上有任何一层发生失败 , 且broker没有得到确认, 那么broker有可能需要重新发送消息. 在这种情况下,你可以实现一个幂等的消费者以在消费者端收到重复的消息来结束对消息的成功处理. 在扩展消息的生产者/消费者时,你将会希望有幂等的消费者.

最后需要注意的是: 在没有使用XA事务时,JMS不会保证一次且仅且一次的消息处理. JMS会保证一次且仅且一次的消息投递,在这个范围内它会将消息标记为“可被重复投递”并让消费者来检查,消费者会负责它允许被处理多少次(或使用幂等的消费者来做过滤).

原文发布于微信公众号 - IT技术精选文摘(ITHK01)

原文发表时间:2017-08-14

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏nummy

分布式消息系统:Kafka

Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka是一个分布式的,可划分的,冗余备份的持久性的...

16330
来自专栏JAVA高级架构

高并发面试必问:分布式消息系统Kafka简介

30730
来自专栏北京马哥教育

误删重要文件怎么办?学会Linux 救援模式再也不担心

背景 在运用Linux时会出现一些误操作,导致系统无法正常使用,比如删除了某个重要依赖库,或者删除了rpm等等。在这里记录下具体的操作步骤,供以后参考。 意义 ...

41590
来自专栏编程坑太多

『中级篇』docker网络(23)

PS:本机基本上docker的多虚拟机网络已经完成了,比较简单,毕竟是演示环境,可能有老铁说,为啥设置成了自动获得IP,而不是静态IP,毕竟是学习的环境,我也尝...

13840
来自专栏云计算

使用Ansible自动化管理云上或者本地的基础设施

Ansible任务是幂等的。通常在没有额外编码的情况下,单单调用脚本重复执行通常是不安全的。而Ansible在执行任务之前都会收集当前的系统和环境信息作为上下文...

25450
来自专栏小尘哥的专栏

也许能帮到你一点!eboot框架基础版发布

目前发布一个基础版,适用于中小型项目开发,后续会加入更多功能,以满足大型项目的需求。

7810
来自专栏杨建荣的学习笔记

半自动化搭建Data Guard的想法和实践(二)(r9笔记第79天)

关于半自动化搭建Data Guard,自己花了一些时间,总算是把这件事情继续推进了一下,还是再啰嗦一句,为什么不自动化,因为安全。主库就是主库,任何变更都要手...

34750
来自专栏北京马哥教育

用开源自动化运维工具 SaltStack 在云平台中实现各主机统一配置管理

本文将主要介绍开源软件 SaltStack 自动化运维工具在大型云计算环境中,如何帮助管理员快速完成运维任务,降低运维工作量,提高效率。根据不同业务特性,进行...

54250
来自专栏IMWeb前端团队

Node.js必须收藏,五大应用性能技巧

Nodejs/web前端 权威大牛交流群:550392000 ,更多免费视频资料+源码~ 大家赶紧收藏起来吧~ 一、实现一个反向代理服务器 相比大多数应用服务器...

240100
来自专栏腾讯云中间件团队的专栏

高性能消息队列 CKafka 核心原理介绍(上)

Ckafka 基于现有的 Kafka 进行了扩展开发和优化,为了方便用户理解 Ckafka 本文也将对 Kafka 的实现原理进行较为详细的介绍。本文是《高性能...

1.8K30

扫码关注云+社区

领取腾讯云代金券