消息中间件的设计与实践

也无风雨也无晴

消息中间件对应用的解耦

  • 如登陆系统负责向消息中间件发送消息,而其他的系统则向消息中间件来订阅这个消息,然后完成自己的工作.
  • 通过消息中间件解耦,登陆系统就不用关心到底有多少个系统需要知晓登陆成功这件事了,而不用关心如何通知它们,只需要把登陆成功这件事转化为一个消息发送到消息中间件就可以了
  • landon:和事件解耦一样,如游戏中玩家升级抛出一个事件,其他子系统只需要监听该事件即可,而不必升级直接调用各个子系统
  • 登陆成功时需要向消息中间件发送一个消息,那么[必须保证这个消息发送到了消息中间件],否则依赖这个消息的系统就无法工作了 互联网时代的消息中间件
  • JMS:Java Message Service->规范->Hornetq,ActiveMQ等产品是这个规范的实现1.如何解决消息发送一致性
  • 消息发送一致性的定义:产生消息的业务动作与消息发送的一致,即如果业务操作成功了,那么由这个操作产生的消息一定要发送出去,否则就丢失消息了;而另一方面,如果这个业务行为没有发生或者失败,那么就不应该把消息发出去.
  • JMS消息模型-Queue/Topic_支持XA协议(两阶段提交)->会引入分布式事务->存在一些限制且成本相对较高1.1一致性方案的正向流程
  • (1) 业务处理应用首先把消息发给消息中间件,标记消息的状态为待处理.
  • (2) 消息中间件收到消息后,把消息存储在消息存储中,并不投递该消息.
  • (3)消息中间件返回消息处理的结果,仅是入库的结果,结果是成功或者失败.
  • (4)业务方收到消息中间件返回的结果并进行处理: a) 如果收到的结果是失败,那么就放弃业务处理,结束 b) 如果收到的结果是成功,则进行业务自身的操作 *
  • (5)业务操作完成,把业务操作的结果发送给消息中间件
  • (6)消息中间件收到业务操作结果,根据结果进行处理 a) 如果业务失败,则删除消息存储中的消息,结束 b)如果业务成功,则更新消息存储中的消息状态为可发送,并且进行调度,进行消息的投递
  • 需要注意各种步骤中可能出现的异常情况

1.2 最终一致性方案的补偿流程:

  • (1)消息中间件询问状态为待处理的消息对应业务操作结果
  • (2)应用即消息发布者对业务操作检查操作结果
  • (3)发送业务处理结果给消息中间件
  • 4)消息中间件更新消息状态,业务成功,消息状态为待发送;业务失败则消息删除

2. 如何解决消息中间件与使用者的强依赖问题

  • 把消息中间件所需要的消息表与业务数据表放到同一个业务数据库->业务操作和写入消息作为一个本地事务完成,然后再通知消息中间件有消息可以发送->解决一致性->也可以消息中间件定时去轮询业务数据库找到需要发送的消息,取出内容后进行发送
  • 需要业务自己的数据库承载消息数据/需要让消息中间件去访问业务数据库/需要业务操作的对象是一个数据库
  • 消息中间件不再直接与业务数据库打交道->将业务操作、写入消息,轮询消息等全部放到业务应用
  • 加一个本地磁盘作为一个消息存储

3.消息模型对消息接收的影响

3.1 JMS Queue模型:

  • 应用1和应用2发送消息到JMS服务器,这些消息根据到达的顺序形成一个队列->应用3和应用4进行消息的消费;如果Queue里面的消息被一个应用处理了,那么连接到JMS Queue上的另一个应用是收不到这个消息的->即连接到这个JMS Queue上的应用共同消费了所有的消息->消息从发送端发送出来时不能确定最终会被哪个应用消费,但是可以明确的是只有一个应用会去消费这条消息->Peer To Peer方式(PTP)3.2 JMS Topic模型:
  • 和Queue模型的最大区别在于消息接收的部分,在该模型中,接收消息的应用3和应用4是可以独立收到所有到达Topic的消息的->Pub/Sub方式
  • JMS中客户端连接的处理和带来的限制
  • JMS中每个Connection都有一个唯一的clientId,用于标识连接的唯一性
  • 应用3和JMS服务器建立了两个连接,应用4和JMS服务器建立了一个连接->可以看到这三个连接所接收的消息是完全不同,每个连接收到的消息条数以及收到消息的顺序则不是固定的.->另外每个连接都会收到所有发送到Topic的消息.3.3 我们需要什么样的消息模型
  • 消息发送方和接收方都是集群/同一个消息的接收方可能有多个集群进行消息的处理/不同集群对于同一条消息的处理不能相互干扰
  • 如8条消息和两个集群,每个集群恰好有两台机器->那么需要这两个集群的机器分别处理掉所有8条消息->不能遗漏也不能重复
  • 引入ClusterId,用这个Id来标识不同的集群,而集群内的各个应用实例的连接使用同样的ClusterId->把Topic模型和Queue模型的特点结合起来使用4. 消息订阅者订阅消息的方式
  • 作为消息中间件,提供对于消息的可靠保证是非常重要的事情->一些场景中一些下游系统完全通过消息中间件进行自身任务的驱动
  • 持久订阅、非持久订阅
  • 非持久订阅:消息接收者应用启动时,就建立了订阅关系->可以收到消息->如果消息接收者应用结束了,那么消息订阅关系也就不存在了->这时的消息是不会为消息接收者保留的.
  • 持久订阅:消息订阅关系一旦建立除非应用显示地取消订阅关系否则这个订阅关系将一直存在即使消息接收者应用停止->这个消息也会保留,等待下次应用启动后再投递给消息接收者.

5. 保证消息可靠性

  • 消息从发送端应用到接收端应用,中间有三个阶段需要保证可靠,分别是:[消息发送者把消息发送到消息中间件];[消息中间件把消息存入消息存储];[消息中间件把消息投递给消息接收者]
  • 要保证这三个阶段都可靠,才能保证最终消息的可靠
  • 消息发送端可靠的保证->注意异对异常的处理->可能出现的问题是在不注意的情况下吃掉了异常->从而导致错误的判断结果
  • 消息存储的可靠性保证
  • 持久存储部分的代码完全自主实现
  • 利用现有的存储系统实现
  • 实现基于文件的消息存储
  • 采用数据库作为消息存储
  • 基于双机内存的消息存储
  • 消息中间件自身扩容
  • 让消息的发送者和消息的订阅者能够感知到有新的消息中间件机器加入到了机器->软负载中心
  • 消息存储的扩容处理
  • 服务端主动调度安排投递
  • 消息投递的可靠性保证
  • 消息接收者在处理消息的过程中对于异常的处理->千万不要吃掉异常后确认消息处理成功
  • 投递处理优化:
  • 投递是一定要采用多线程处理
  • 单机多订阅者共享连接->消息只发送一次
  • 订阅者视角的消息重复的产生和应对
  • 分布式事务,复杂
  • 幂等操作->对于消息接收端->采用同样的输入多次调用处理函数会得到同样的结果
  • JMS的消息确认方式与消息重复的关系
  • AUTOACKNOWLEDGE/CLIENTACKNOWLEDGE/DUPSOKACKNOWLEDGE消息投递的其他属性支持
  • 消息优先级
  • 订阅者消息处理顺序和分级订阅
  • 自定义属性
  • 局部顺序
  • 保证顺序的消息队列设计
  • 接收端的设计从原来的Push模式变为了Pull模式

5. 保证消息可靠性:

消息从发送端应用到接收端应用,中间有三个阶段需要保证可靠,分别是:[消息发送者把消息发送到消息中间件];[消息中间件把消息存入消息存储];[消息中间件把消息投递给消息接收者]

5.1 消息发送的可靠性保证

  • 持久订阅 不会因消费者或MQ的宕机,导致消息订阅无效
  • 消息发送端可靠性保证 当且仅当MQ及时、明确返回成功,消息发送端才认为消息发送成功; 其他情况,如返回错误、异常、超时等,均视为发送失败,需要重发。5.2 MQ消息存储可靠性保证 必须存储在磁盘上
    • 基于文件 自建引擎 or 开源引擎
    • 基于关系数据库
    • 库表设计 [消息表 + 投递表]缺点:投递消息表的数据量与[消息数 * 订阅者]成比例,数据量过大。
    • 库表设计 [消息表 + 投递字段]缺点: 无法方便地从订阅者维度对投递状态进行更新。投递字段长度限制。
    • 存储容灾 多机Replication,延迟问题。
    • 存储容灾 双写,复杂性。
    • 基于双机内存 并非完全安全,但性能高 扩容 消息存储独立 存储扩容、调度器扩容(无状态,更易扩容) 消息调度存储一体 趋势 RocketMQ、Kafka5.3 消息投递的可靠性保证
    • 当且仅当消费端明确返回成功,MQ才认为消息接收成功。
    • 消费者不应该在消息的业务处理完成前返回接收成功响应。 消息投递通常是多线程的,具体到单个投递线程,其实现方式有:
    1. 阻塞式 投递后阻塞等待消费端返回
    2. 非阻塞式 投递后不等待消费端返回,直接投递其他消息;启用单独的[投递状态更新线程]异步[及时/定时批量]更新。 其实和IO模型类似。 单应用存在多个订阅者订阅相同topic的优化:
    3. 共享socket连接。
    4. 消息只发送一次,消费端增加一个dispatcher,负责将消息分发给不同订阅者。

6. 消息重复的产生和应对:

6.1 生成端重试

  • 生成端发送到MQ后,MQ正常存储,随后MQ出现问题,没有响应给生成端。
  • MQ负载过高,导致没能及时给生成端发送响应,导致超时。
  • MQ存储消息后,网络问题导致没能发送响应,生成端重试时,网络又恢复。

解决方案:

  • 消费端重发时使用相同ID,即消息ID不在MQ生成,由客户端生成。
  • 分布式事务,在高可用、高并发的互联网应用没法实行。可以直接PASS。
  • 消息消费者对消息的处理操作保持幂等性。6.2 MQ重复投递
  • 消费端接收消息,成功处理后,应用出现问题,没有给MQ发送响应
  • 消费端接收消息,成功处理后,网络出现问题,没有给MQ发送响应
  • 消费端接收消息,处理时间较长后,导致MQ等待响应超时
  • 消费端接收消息,成功处理后,发送响应给MQ,但此时MQ出现问题,没能处理响应
  • 消费端接收消息,成功处理后,发送响应给MQ,但此时消息存储错误,没有更新消息处理状态

解决方案:

  • 分布式事务,在高可用、高并发的互联网应用没法实行。可以直接PASS。
  • 消息消费者对消息的处理操作保持幂等性。
  • 消费端保存消息消费状态,并保证状态更新操作与消息处理操作是一个本地事务。

应对思路: 消除重发行为(如生成端重试和MQ重复投递) 总的思路: [消息唯一ID] + [消息(投递或消费)状态表] + [多个本地事务] 消除重复行为的副作用,即保持消息处理方的操作幂等性。

  • 操作幂等性

7.1 MQ单机多队列(topic)的优化:

MQ单机中的物理队列过多会导致“随机写”,性能急剧下降。 解决方式: 将队列分为“物理队列”和“逻辑队列”,其中物理队列“顺序写”实际的消息,而逻辑队列是“被订阅的队列”。逻辑队列相当于是一个“数据(存储在物理队列)索引队列”。但这种方法,会导致另外的问题:

  1. 读消息时,会先读逻辑队列,再读物理队列,多了一次开销。
  2. 编程复杂性
  3. 读变成了完全随机读

对上述三个问题,可以进行如下优化:

  1. 增大内存,尽量让读命中Page Cache
  2. 系统IO调度方式设置为NOOP,会在一定程度上将随机读转换为顺序跳读。
  3. 物理队列中保存元信息,即使逻辑队列丢失,仍然可以通过物理队列恢复。

8. MQ的PUSH和PULL模式

问:MQ怎么改能缓冲流量? 答:由MQ-server推模式,升级为MQ-client拉模式。

MQ-client根据自己的处理能力,每隔一定时间,或者每次拉取若干条消息,实施流控,达到保护自身的效果。并且这是MQ提供的通用功能,无需上下游修改代码。 问:如果上游发送流量过大,MQ提供拉模式确实可以起到下游自我保护的作用,会不会导致消息在MQ中堆积? 答:下游MQ-client拉取消息,消息接收方能够批量获取消息,需要下游消息接收方进行优化,方能够提升整体吞吐量,例如:批量写。 结论

1)MQ-client提供拉模式,定时或者批量拉取,可以起到削平流量,下游自我保护的作用(MQ需要做的)

2)要想提升整体吞吐量,需要下游优化,例如批量处理等方式(消息接收方需要做的)

58到家架构优化具备整体性,需要通用服务和业务方一起优化升级。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏yukong的小专栏

easyUI自定义iconeasyUI自定义icon

首先我们需要下载好自己需要的标签并且放在一个文件中,然后把这个文件夹复制到easyui所在目录下的themes目录下 如图

10430
来自专栏java思维导图

SpringBoot webSocket实现发送广播、点对点消息和Android接收

这篇文章主要介绍了SpringBoot webSocket实现发送广播、点对点消息和Android接收,具有一定的参考价值,感兴趣的小伙伴们可以参考一下。

60520
来自专栏杨建荣的学习笔记

服务器搬迁之后的准备工作和应对

服务器搬迁后不是简单能连接上服务器就可以了,还有许多的事情需要考虑,否则服务器不可用还是白搭。 我大体碰到了如下的一些问题,也能够反应出来对于系统的各种...

39460
来自专栏架构师之路

微信多点登录与QQ消息漫游架构随想

【需求缘起】 之前的一些文章简单介绍了 《“单人消息”》《“离线消息”》《“群消息”》《“用户状态”》的一些相关技术(点击上面的link直接阅读),今天来聊一聊...

40450
来自专栏外文翻译

如何使用您系统里的主机(hosts)文件

Linux 系统的hosts文件存储在/etc/hosts下,它在IP地址、主机名、域名和机器别名之间创建静态关联。然后,您的Linode会为这些关联提供比必须...

54230
来自专栏MessageQueue

什么是WAL?

在写完上一篇《Pull or Push》之后,原本计划这一片写《存储层设计》,但是临时改变主意了,想先写一篇介绍一下消息中间件最最基础也是最核心的部分:writ...

44710
来自专栏云计算教程系列

如何在Ubuntu 14.04上设置Masterless Puppet环境

在现代云计算领域,配置管理是至关重要的一步。配置管理工具允许您可靠地将配置部署到服务器。Puppet是这个领域中比较成熟的配置管理工具之一。

7800
来自专栏IT笔记

SpringBoot开发案例之整合Kafka实现消息队列

45830
来自专栏张善友的专栏

SQL Server 2008 Service Broker

SQL Server Service Broker 为消息和队列应用程序提供 SQL Server 数据库引擎本机支持。这使开发人员可以轻松地创建使用数据库引擎...

20870
来自专栏XAI

ActiveMQ介绍

1、ActiveMQ服务器工作模型       通过ActiveMQ消息服务交换消息。消息生产者将消息发送至消息服务,消息消费者则从消息服务接收这些消息。这些消...

24990

扫码关注云+社区

领取腾讯云代金券