【问题场景】 客户端以 consumer 身份订阅到 rabbitmq server 上的 queue 上,客户端侧在 AMQP 协议的 Connection.Tune-Ok 信令中,设置 heartbeat 为 0,即要求服务器侧不启用 heartbeat 功能。服务器由于异常断电原因停止服务,结果客户端在短时间内无法感知到服务器端已经异常。
刚刚出现这个问题时,就有测试人员和业务人员找到我这边说:经过改造的 rabbitmq-c 库可能存在重大 bug,服务器都关闭了,客户端怎么还那像什么都没发生一样继续工作着呢?听到这种疑问,我只问了两个问题就想到了答案:
业务人员告诉我上述问题的答案分别是:是的、是的、没有。呵呵~~所以答案就已经确定了,你想到了么?
【问题分析】 这个问题可以从以下两个层面进行分析:
如果一方已经关闭或异常终止连接而另一方却还不知道,我们将这样的 TCP 连接称为半打开(Half-Open)的。任何一端的主机异常都可能导致发生这种情况。只要不打算在半打开连接上传输数据,仍处于连接状态的一方就不会检测另一方已经出现异常。 半打开连接的一个常见原因是,当客户主机突然掉电,而不是正常的结束客户应用程序后再关机。当然这里所谓的客户机并不是仅仅表示客户端。 在这种情况发生时,作为 TCP 链路上只接收不发送数据的一方,只能依靠 TCP 协议本身的** keepalive 机制**来检查链路是否处于正常状态。而通常 keepalive 机制下,需要大约 2 个小时时间才能触发。
在此层面上讲,客户端由于是作为 consumer 订阅到 queue 上的,所以在该 AMQP/TCP 连接上客户端不会主动发送数据到 rabbitmq server 侧。当服务器由于异常断电停止服务后,consumer 不会接收到 AMQP 协议层面的终止信令,所以无法感知对端的情况。
一种可能的解决办法是客户端侧在接收 N 次超时后,通过发送 AMQP 协议中的 Heartbeat 信令检测服务器端是否处于正常状态。
在场景描述中说道“客户端侧在 AMQP 协议的 Connection.Tune-Ok 信令中,设置 heartbeat 为 0”,如果是将 heartbeat 设置为 30 会如何?答案是会同时触发服务器端和客户端的 heartbeat 功能,即服务器端会在一段时间内没有数据需要发送给客户端的情况下,发送一个心跳包给客户端;或者一段时间内没有收到任何数据,则判定为心跳超时,最终会关闭tcp连接(参考这里)。而客户端侧同样会触发对发送和接收 heartbeat 计时器的维护,分别用于判定发送和接收的超时情况。
所以,需要解决的问题可以描述为: 客户端作为 consumer 订阅到服务器上的 queue 后,在无业务数据需要处理时,需要通过检测 Heartbeat 帧(信令)来判定服务器是否处于异常状态(换句话说,自己是否已经是“半打开”的 TCP 连接)。
建议的解决办法如下:
客户端必须启用 heartbeat 功能(解决“半打开”问题的基础); 客户端需要支持在发送空闲时,发送 heartbeat 的功能(因为目前客户端作为 producer 是长连接到 rabbitmq server 上的); 客户端需要支持在接收空闲时,通过检测服务器端发送来的 heartbeat 帧来判定服务器端(或网络)是否处于正常状态(因为客户端作为 consumer 也是长连接到 rabbitmq server 上的,同时不会主动向 rabbitmq server 发送数据)。
只要客户端启用 heartbeat ,那么服务器就会在满足“一定条件”时,定时向客户端发送 heartbeat 信令,同时也会检测在空闲状态达到规定时间后是否收到 heartbeat 信令;而客户端侧作为 consumer 时,需要判定是否接收到数据(无论是常规数据还是 heartbeat 信令),若在一定时间内没有接收到数据,则认为当前链路可能存在问题。后续可以从业务上触发 consume 关系的重新建立。
由于长期以来,在我们的 Node.js 服务端项目中,离线任务大部分用的是 kue,这是个轻量级的任务队列,之前 也有过介绍。而周五那天我正准备将之前的 kue 队列重构成 RabbitMQ 的队列的相关代码上线。
RabbitMQ 任务队列是我基于 amqplib 实现的,在生产环境跑了半年有余,没什么大问题。
但是,按照墨菲定理,你最担心的事情总会发生,或者说:出来混迟早是要还的。
结果,明明在预发布环境测试没问题的,却在正式环境完全不起作用,一直在报 EPIPE
的错误,并且在之后 ack 时报 channel closed
的错误。
同时,RabbitMQ 管理后台看到,任务队列在一直堆积,已经累计了 5k 的任务量,可能你会觉得不多,但是如果告诉你,每个任务需要执行 1 到 20 分钟不等呢?
显然,先是把我吓了一跳,不过又马上镇定下来,毕竟处理过的线上事故大于十个手指能数的数量了。
冷静想了想,这个离线任务里的业务虽说重要,但一时的任务堆积关系不是很大,而且任务会重新创建,回滚到旧代码就行,于是我将所有的代码一键回滚。
现在,改来找问题原因了。
按照目前的所掌握的信息,似乎还不能定位问题所在,大致能确定的是:TCP 连接有问题,导致 ack 数据写到了已经关闭的 sockets 上面了,才会导致 EPIPE
的错误。
一般来说,TCP 正常的关闭,会有四次握手:
『我要关了哈』 『好的』,『我也要关了』 『恩,拜拜』
而不正常的错误,会有 ECONNRESET
或者 Connection reset by peer
之类的错误提示,EPIPE
的话,一般是对方主动关闭,而没有通知到我方。
于是,原因显然是需要在对方机器上去找,因此登录到 RabbitMQ 的机器上查看日志,果然,发现了非常多的错误日志:
=ERROR REPORT==== 9-Jun-2017::16:07:39 === closing AMQP connection <0.9305.6670> (X.X.X.1:33647 -> X.X.X.2:5672): missed heartbeats from client, timeout: 60s
这是什么意思呢?关键信息是最后一行,missed heartbeats from client, timeout: 60s 。
很明显,超过默认 heartbeats timeout 的时间了,于是 RabbitMQ 认为这个客户端已经不行了,所以主动断了连接。
好了,那么继续下一步。
在 RabbitMQ 官方文档上 [1] 找到这样的解释:在 server 3.0 以及之后的版本中,client 以及 server 会协商一个 timeout 值,默认是 60s (3.5.5 之前是 580s),回过头来看服务器版本,已经大于 3.5.5,(其实看日志也知道了),也就是 60s。
server 每隔 timeout / 2 就会发送一个心跳包,如果都错过,就会认为这客户端没救了,会主动关闭连接,然后客户端需要重新连接。
于是,兴奋地赶紧设置下 heartbeat 时间,来个 3600s。
很明显,问题没那么简单,错误还是在出现。
回过头来,再看看文档,注意 『协商』 这两个字,也就是说,结果不是我设置了就能成功的,server 该怎么做还是怎么做,于是 60s 的默认 timeout 不能通过 client 来修改。
但是这会儿我又不敢修改了,server 的 timeout 是全局的 [2],如果改了就意味着所有的连接都是这个数了,这可太危险了。
整理下思路,看看手头上已有的信息,于是把眼光放到了 client。
其实这会儿,答案已经呼之欲出了:
事件循环太长导致
Node.js 不同于其它正常语言,它是单进程模型,没有所谓的进程并发,即使底层的线程也是为了异步 io。
也就是说,一旦一个事件里面的 CPU 被占满,其它 io 操作都会在事件队列中等待,导致事件循环过长。而在这个问题中,它的表现就是:client 的心跳包所在的事件,无法通过 TCP 这样的网络 io 操作发送至 server。
这才明白,我重构的部分是 CPU 密集型的任务,这恰恰是 Node.js 最软肋的地方。
显然对于 CPU 密集型任务,我们一般有这几种方案:
那么,为了尽快解决线上的问题,第一个就是我们的选择:最快,最直接。
此示例演示了心跳的明确设置和阻止的连接超时。
从RabbitMQ 3.5.5开始,代理的默认心跳超时从580秒减少到60秒。因此,在同一个运行Pika连接的线程中执行冗长处理的应用程序可能会因心跳超时而出现意外断开的连接。在这里,我们为心跳超时指定显式下限。
当RabbitMQ代理耗尽某些资源(例如内存和磁盘空间)时,它可能会阻止执行资源消耗操作的连接,例如发布消息。一旦连接被阻止,RabbitMQ就会停止从该连接的套接字读取,因此客户端的命令不会通过该连接上的代理,直到代理解除阻塞。被阻止的连接可能持续一段无限期,停止连接并可能导致挂起(例如,在BlockingConnection中),直到连接被解除阻塞。阻塞连接超时旨在中断(即,丢弃)已被阻止超过给定超时值的连接。
配置hertbeat和阻塞连接超时的示例:
import pika
def main():
# NOTE: These parameters work with all Pika connection types
params = pika.ConnectionParameters(heartbeat_interval=600,
blocked_connection_timeout=300)
conn = pika.BlockingConnection(params)
chan = conn.channel()
chan.basic_publish('', 'my-alphabet-queue', "abc")
# If publish causes the connection to become blocked, then this conn.close()
# would hang until the connection is unblocked, if ever. However, the
# blocked_connection_timeout connection parameter would interrupt the wait,
# resulting in ConnectionClosed exception from BlockingConnection (or the
# on_connection_closed callback call in an asynchronous adapter)
conn.close()
if __name__ == '__main__':
main()
参考:https://my.oschina.net/moooofly/blog/209823 https://github.com/xizhibei/blog/issues/51 https://github.com/pika/pika/issues/965 https://pika.readthedocs.io/en/stable/examples/heartbeat_and_blocked_timeouts.html