首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

RabbitMQ一个简单可靠的方案

关键时刻,第一时间送达!

来源:编程玩家

cnblogs.com/Erik_Xu/p/9515208.html

程序员大咖整理发布,转载请联系作者获得授权

前言

最近需要使用到消息队列相关技术,于是重新接触RabbitMQ。其中遇到了不少可靠性方面的问题,归纳了一下,大概有以下几种:

1. 临时异常,如数据库网络闪断、http请求临时失效等;

2. 时序异常,如A任务依赖于B任务,但可能由于调度或消费者分配的原因,导致A任务先于B任务执行;

3. 业务异常,由于系统测试不充分,上线后发现某几个或某几种消息无法正常处理;

4. 系统异常,业务中间件无法正常操作,如网络中断、数据库宕机等;

5. 非法异常,一些伪造、攻击类型的消息。

针对这些异常,我采用了一种基于消息审计、消息重试、消息检索、消息重发的方案。

方案

1. 消息均使用Exchange进行通讯,方式可以是direct或topic,不建议fanout。

2. 根据业务在Exchange下分配一个或多个Queue,同时设置一个审计线程(Audit)监听所有Queue,用于记录消息到MongoDB,同时又不阻塞正常业务处理。

3. 生产者(Publisher)在发布消息时,基于AMQP协议,生成消息标识MessageId和时间戳Timestamp,根据消息业务添加头信息Headers便于跟踪。

4. 消费者(Comsumer)消息处理失败时,则把消息发送到重试交换机(Retry Exchange),并设置过期(重试)时间及更新重试次数;如果超过重试次数则删除消息。

5. 重试交换机Exchange设置死信交换机(Dead Letter Exchange),消息过期后自动转发到业务交换机(Exchange)。

6. WebApi可以根据消息标识MessageId、时间戳Timestamp以及头信息Headers在MongoDB中对消息进行检索或重试。

注:选择MongoDB作为存储介质的主要原因是其对头信息(headers)的动态查询支持较好,同等的替代产品还可以是Elastic Search这些。

生产者(Publisher)

1. 设置断线自动恢复

2. 定义Exchange,模式为direct

3. 根据业务定义QueueA和QueueB

4. 启动消息发送确认机制,即需要收到RabbitMQ服务端的确认消息

5. 设置消息持久化

6. 生成消息标识MessageId、时间戳Timestamp以及头信息Headers

7. 发送消息,偶数序列发送到QueueA(RouteA),奇数序列发送到QueueB(RouteB)

8. 确定收到RabbitMQ服务端的确认消息

完整代码

效果:QueueA和QueueB各一条消息,QueueAudit两条消息

注:Exchange下必须先声明Queue才能接收到消息,上述代码并没有QueueAudit的声明;需要手动声明,或者先执行下面的消费者程序进行声明。

正常消费者(ComsumerA)

1. 设置预取消息,避免公平轮训问题,可以根据需要设置预取消息数,这里是

2. 声明Exchange和Queue

3. 编写回调函数

注:设置了RabbitMQ的断线恢复机制,当RabbitMQ连接不可用时,与MQ通讯的操作会抛出AlreadyClosedException的异常,导致主线程退出,哪怕连接恢复了,程序也无法恢复,因此,需要捕获处理该异常。

异常消费者(ComsumerB)

1.设置预取消息

2.声明Exchange和Queue

3.设置死信交换机(Dead Letter Exchange)

4. 重试设置,3次重试;第一次1秒,第二次10秒,第三次30秒

5. 获取当前重试次数

6. 发生异常,判断是否可以重试

7. 可以重试,则启动重试机制

完整代码

审计消费者(Audit Comsumer)

1. 声明Exchange和Queue

2. 排除死信Exchange转发过来的重复消息

3. 生成消息实体

4. RabbitMQ会用bytes来存储字符串,因此,要把头中bytes转回字符串

5. 把Unix格式的Timestamp转成UTC时间

6. 消息存入MongoDB

MongoDB记录:

重试记录:

消息检索及重发(WebApi)

1. 通过消息Id检索消息

2. 通过头消息检索消息

3. 消息重发,会重新生成MessageId

Ack,Nack,Reject的关系

1. 消息处理成功,执行Ack,RabbitMQ会把消息从队列中删除。

2. 消息处理失败,执行Nack或者Reject:

a) 当requeue=true时,消息会重新回到队列,然后当前消费者会马上再取回这条消息;

b) 当requeue=false时,如果Exchange有设置Dead Letter Exchange,则消息会去到Dead Letter Exchange;

c) 当requeue=false时,如果Exchange没设置Dead Letter Exchange,则消息从队列中删除,效果与Ack相同。

3. Nack与Reject的区别在于:Nack可以批量操作,Reject只能单条操作。

RabbitMQ自动恢复

连接(Connection)恢复

1. 重连(Reconnect)

2. 恢复连接监听(Listeners)

3. 重新打开通道(Channels)

4. 恢复通道监听(Listeners)

5. 恢复basic.qos,publisher confirms以及transaction设置

拓扑(Topology)恢复

1. 重新声明交换机(Exchanges)

2. 重新声明队列(Queues)

3. 恢复所有绑定(Bindings)

4. 恢复所有消费者(Consumers)

异常处理机制

1. 临时异常,如数据库网络闪断、http请求临时失效等

通过短时间重试(如1秒后)的方式处理,也可以考虑Nack/Reject来实现重试(时效性更高)。

2. 时序异常,如A任务依赖于B任务,但可能由于调度或消费者分配的原因,导致A任务先于B任务执行

通过长时间重试(如1分钟、30分钟、1小时、1天等),等待B任务先执行完的方式处理。

3. 业务异常,由于系统测试不充分,上线后发现某几个或某几种消息无法正常处理

等系统修正后,通过消息重发的方式处理。

4. 系统异常,业务中间件无法正常操作,如网络中断、数据库宕机等

等系统恢复后,通过消息重发的方式处理。

5. 非法异常,一些伪造、攻击类型的消息

多次重试失败后,消息从队列中被删除,也可以针对此业务做进一步处理。

源码地址:

https://github.com/ErikXu/RabbitMesage

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20180829B0ICWH00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券