? 本文已归档到:「blog」 消息队列(Message Queue,简称 MQ)技术是分布式应用间交换信息的一种技术。 消息队列主要解决应用耦合,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。 注意:为了简便,下文中除了文章标题,一律使用 MQ 简称。
MQ 比较核心的优点有 3 个:解耦、异步、削峰。
不同系统如果要建立通信,传统的做法是:调用接口。
如果需要和新的系统建立通信或删除已建立的通信,都需要修改代码,这种方案显然耦合度很高。
img
如果使用 MQ,系统间的通信只需要通过发布/订阅(Pub/Sub)模型即可,彼此没有直接联系,也就不需要相互感知,从而达到 解耦。
img
假设这样一个场景,用户向系统 A 发起请求,系统 A 处理计算只需要 10 ms,然后通知系统 BCD 写库,系统 BCD 写库耗时分别为:100ms、200ms、300ms。最终总耗时为:10+100ms+200ms+300ms=610ms。此外,加上请求和响应的网络传输时间,从用户角度看,可能要等待将近 1s 才能得到结果。
img
如果使用 MQ,系统 A 接到请求后,耗时 10ms 处理计算,然后向系统 BCD 连续发送消息,假设耗时 5ms。那么 这一过程的总耗时为 3ms + 5ms = 8ms,这相比于 610 ms,大大缩短了响应时间。至于系统 BCD 的写库操作,只要自行消费 MQ 后处理即可,用户无需关注。
img
假设某个系统读写数据库的稳定性能为每秒处理 1000 条数据。平常情况下,远远达不到这么大的处理量。假设,因为因为做活动,系统的瞬时请求量剧增,达到每秒 10000 个并发请求,数据库根本承受不了,可能直接就把数据库给整崩溃了,这样系统服务就不可用了。
img
如果使用 MQ,每秒写入 10000 条请求,但是系统 A 每秒只从 MQ 中消费 1000 条请求,然后写入数据库。这样,就不会超过数据库的承受能力,而是把请求积压在 MQ 中。只要高峰期一过,系统 A 就会很快把积压的消息给处理掉。
img
凡事有利有弊,使用 MQ 给系统带来很多好处,也会付出一定的代价。
它引入了以下问题:
如何保证消息不被重复消费 和 如何保证消息消费的幂等性 是同一个问题。
必须先明确产生重复消费的原因,才能对症下药。
重复消费问题通常不是 MQ 来处理,而是由开发来处理的。
以 Kafka 举例:Kafka 每个 Partition 都是一个有序的、不可变的记录序列,不断追加到结构化的提交日志中。Partition 中为每条记录分配一个连续的 id 号,称为偏移量(Offset),用于唯一标识 Partition 内的记录。
img
Kafka 的客户端和 Broker 都会保存 Offset。客户端消费消息后,每隔一段时间,就把已消费的 Offset 提交给 Kafka Broker,表示已消费。
在这个过程中,如果客户端应用消费消息后,因为宕机、重启等情况而没有提交已消费的 Offset 。当系统恢复后,会继续消费消息,由于 Offset 未提交,就会出现重复消费的问题。
img
应对重复消费问题,就要通过幂等性来解决。
这个问题可以从业务层面来解决。
MQ 重复消费不可怕,可怕的是没有应对机制,可以借鉴的思路有:
如何处理消息丢失的问题 和 如何保证消息不被重复消费 是同一个问题。关注点有:
唯一可能导致消费方丢失数据的情况,就是:消费方设置了自动提交 Offset。设置了自动提交 Offset,接受到消息后就会自动提交 Offset 给 Kafka ,Kafka 就认为消息已被消费。如果此时,消费方尚未来得及处理消息就挂了,那么消息就丢了。
解决方法就是:消费方关闭自动提交 Offset,处理完消息后手动提交 Offset。但这种情况下可能会出现重复消费的情形,需要自行保证幂等性。
当 Kafka 某个 Broker 宕机,需要重新选举 Partition 的 Leader。若此时其他的 Follower 尚未同步 Leader 的数据,那么新选某个 Follower 为 Leader 后,就丢失了部分数据。
为此,一般要求至少设置 4 个参数:
replication.factor
参数 - 这个值必须大于 1,要求每个 Partition 必须有至少 2 个副本。min.insync.replicas
参数 - 这个值必须大于 1,这是要求一个 Leader 需要和至少一个 Follower 保持通信,这样才能确保 Leader 挂了还有替补。acks=all
- 这意味着:要求每条数据,必须是写入所有 replica 之后,才能认为写入成功了。retries=MAX
(很大很大很大的一个值,无限次重试的意思) - 这意味着要求一旦写入失败,就无限重试,卡在这里了。如果按照上述的思路设置了 acks=all
,生产方一定不会丢数据。
要求是,你的 Leader 接收到消息,所有的 Follower 都同步到了消息之后,才认为本生产消息成功了。如果未满足这个条件,生产者会自动不断的重试,重试无限次。
以 Kafka 为例
要保证 MQ 的顺序性,势必要付出一定的代价,所以实施方案前,要先明确业务场景是不是有必要保证消息的顺序性。只有那些明确对消息处理顺序有要求的业务场景才值得去保证消息顺序性。
方案一
一个 Topic,一个 Partition,一个 Consumer,内部单线程消费,单线程吞吐量太低,一般不会用这个。
方案二
img
假设一个 MQ 消费者可以一秒处理 1000 条消息,三个 MQ 消费者可以一秒处理 3000 条消息,那么一分钟的处理量是 18 万条。如果 MQ 中积压了几百万到上千万的数据,即使消费者恢复了,也需要大概很长的时间才能恢复过来。
对于产线环境来说,漫长的等待是不可接受的,所以面临这种窘境时,只能临时紧急扩容以应对了,具体操作步骤和思路如下:
不同 MQ 实现高可用的原理各不相同。因为 Kafka 比较具有代表性,所以这里以 Kafka 为例。
了解 Kafka,必须先了解 Kafka 的核心概念:
img
Kafka 是如何实现高可用的呢?
Kafka 在 0.8 以前的版本中,如果一个 Broker 宕机了,其上面的 Partition 都不能用了,这自然不是高可用的。
为了实现高可用,Kafka 引入了复制功能。
简单来说,就是副本机制( Replicate )。
每个 Partition 都有一个 Leader,零个或多个 Follower。Leader 和 Follower 都是 Broker,每个 Broker 都会成为某些分区的 Leader 和某些分区的 Follower,因此集群的负载是平衡的。
同一个 Topic 的不同 Partition 会分布在多个 Broker 上,而且一个 Partition 还会在其他的 Broker 上面进行备份,Producer 在发布消息到某个 Partition 时,先找到该 Partition 的 Leader,然后向这个 Leader 推送消息;每个 Follower 都从 Leader 拉取消息,拉取消息成功之后,向 Leader 发送一个 ACK 确认。
img
FAQ 问:为什么让 Leader 处理一切对对 Partition (分区)的读写请求? 答:因为如果允许所有 Broker 都可以处理读写请求,就可能产生数据一致性问题。
由上文可知,Partition 在多个 Broker 上存在副本。
如果某个 Follower 宕机,啥事儿没有,正常工作。
如果 Leader 宕机了,会从 Follower 中重新选举一个新的 Leader。
MQ 可驻留在内存或磁盘上,队列存储消息直到它们被应用程序读取。通过 MQ,应用程序可独立地执行,它们不需要知道彼此的位置,不需要等待接收程序接收此消息。在分布式计算环境中,为了集成分布式应用,开发者需要对异构网络环境下的分布式应用提供有效的通信手段。为了管理需要共享的信息,对应用提供公共的信息交换机制是重要的。
一般的业务系统要引入 MQ,最早大家都用 ActiveMQ,但是现在确实大家用的不多了,没经过大规模吞吐量场景的验证,社区也不是很活跃,所以大家还是算了吧,我个人不推荐用这个了;
谈 MQ 就不得不提一下 JMS 。
JMS(JAVA Message Service,java 消息服务)API 是一个消息服务的标准/规范,允许应用程序组件基于 JavaEE 平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。
在 EJB 架构中,有消息 bean 可以无缝的与 JM 消息服务集成。在 J2EE 架构模式中,有消息服务者模式,用于实现消息与应用直接的解耦。
在 JMS 标准中,有两种消息模型:
P2P 模式包含三个角色:MQ(Queue),发送者(Sender),接收者(Receiver)。每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。
P2P 的特点
如果希望发送的每个消息都会被成功处理的话,那么需要 P2P 模式。
包含三个角色主题(Topic),发布者(Publisher),订阅者(Subscriber) 。多个发布者将消息发送到 Topic,系统将这些消息传递给多个订阅者。
Pub/Sub 的特点
为了缓和这样严格的时间相关性,JMS 允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。
如果希望发送的消息可以不被做任何处理、或者只被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用 Pub/Sub 模型。
在 JMS 中,消息的产生和消费都是异步的。对于消费来说,JMS 的消息者可以通过两种方式来消费消息。
receive
方法来接收消息,receive
方法在接收到消息之前(或超时之前)将一直阻塞;onMessage
方法。JNDI
- Java 命名和目录接口,是一种标准的 Java 命名系统接口。可以在网络上查找和访问服务。通过指定一个资源名称,该名称对应于数据库或命名服务中的一个记录,同时返回资源连接建立所必须的信息。
JNDI 在 JMS 中起到查找和访问发送目标或消息来源的作用。
创建 Connection 对象的工厂,针对两种不同的 jms 消息模型,分别有 QueueConnectionFactory 和 TopicConnectionFactory 两种。可以通过 JNDI 来查找 ConnectionFactory 对象。
Destination 的意思是消息生产者的消息发送目标或者说消息消费者的消息来源。对于消息生产者来说,它的 Destination 是某个队列(Queue)或某个主题(Topic);对于消息消费者来说,它的 Destination 也是某个队列或主题(即消息来源)。
所以,Destination 实际上就是两种类型的对象:Queue、Topic。可以通过 JNDI 来查找 Destination。
Connection 表示在客户端和 JMS 系统之间建立的链接(对 TCP/IP socket 的包装)。Connection 可以产生一个或多个 Session。跟 ConnectionFactory 一样,Connection 也有两种类型:QueueConnection 和 TopicConnection。
Session 是操作消息的接口。可以通过 session 创建生产者、消费者、消息等。Session 提供了事务的功能。当需要使用 session 发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。同样,也分 QueueSession 和 TopicSession。
消息生产者由 Session 创建,并用于将消息发送到 Destination。同样,消息生产者分两种类型:QueueSender 和 TopicPublisher。可以调用消息生产者的方法(send 或 publish 方法)发送消息。
消息消费者由 Session 创建,用于接收被发送到 Destination 的消息。两种类型:QueueReceiver 和 TopicSubscriber。可分别通过 session 的 createReceiver(Queue)或 createSubscriber(Topic)来创建。当然,也可以 session 的 creatDurableSubscriber 方法来创建持久化的订阅者。
消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的 onMessage 方法。EJB 中的 MDB(Message-Driven Bean)就是一种 MessageListener。
深入学习 JMS 对掌握 JAVA 架构,EJB 架构有很好的帮助,消息中间件也是大型分布式系统必须的组件。本次分享主要做全局性介绍,具体的深入需要大家学习,实践,总结,领会。