首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

一个看似简单的复杂问题:分布式消息队列RocketMQ-消息的“顺序消费”

顺序消息看似很简单:

理想情况如下:

①、producer 顺序发送1,2,3

②、broker 顺序存储 1,2,3

③、consumer 顺序消费1,2,3

以上,皆大欢喜。

这个特性看起来很简单,为什么 RocketMQ 默认不保证呢?

下面就从三个封面来讨论要保证顺序消息,是多么的困难,或者说不可能。

在 RocketMQ 中,一个topic默认有 4个 MessageQueue 。

数据结构:

this.messageQueueList = size = 4

0 = "MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0]"

1 = "MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1]"

2 = "MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2]"

3 = "MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3]"

①、producer 顺序发送1,2,3

如果2,3,发送成功了,过了一会1发送失败了,重新发送1,那么就不是顺序消息了。

换个角度,为了保证1发送成功才能发送2,3,得损耗多少性能和资源。

性能可想而知。

②、broker顺序存储1,2,3

在存储端,为了保证消息顺序存储,需要保证一下2个要求:

1、消息不能分区,在Kafka中,它叫做partition;在RocketMQ中,叫 queue ;在RocketMQ ,一个topic默认有4个queue。

如果你有多个队列,那么一个topic的消息,会被分散到多个队列中,自然不能保证顺序存储。

2、即使满足1的要求,一个topic的消息只有一个队列,那么还有两个问题。

A、高可用问题: 比如当前机器挂掉了,上面的消息还没消费完,此时切换到其他机器,高可用保证了,但是消息顺序乱掉了,所以得保证消息是同步复制。

B、那么还得保证切机器之前,挂掉的机器上面,剩余的消息都得全部消费完,这个就非常困难了。

③、consumer 顺序消费1,2,3

为了保证顺序消息,consumer端不能并行消费,也就是不能开多个线程或多个客户端去消费。

通过上面的分析,我们了解到,要保证一个topic内部的消息要做到严格的顺序,是多么的困难。

那么,我们有必要花大力气去保证消息严格顺序吗?

实际情况中,从业务方的角度:

(1)不关注顺序的业务大量存在;

(2) 队列无序不代表消息无序。

第(2)条的意思是说:我们不保证队列的全局有序,但可以保证消息的局部有序。

所以从业务层面来保证消息的顺序而不仅仅是依赖于消息系统,是不是我们应该寻求的一种更合理的方式?

举个例子:保证来自同1个order id的消息,是有序的!

下面就看一下在Kafka和RocketMQ中,分别是如何对待这个问题的:

1、Kafka中:发送1条消息的时候,可以指定(topic, partition, key) 3个参数。partiton和key是可选的。

如果你指定了partition,那就是所有消息发往同1个partition,就是有序的。并且在消费端,Kafka保证,1个partition只能被1个consumer消费。

或者你指定key(比如order id),具有同1个key的所有消息,会发往同1个partition。也是有序的。

2、RocketMQ: RocketMQ在Kafka的基础上,把这个限制更放宽了一步。只指定(topic, key),不指定具体发往哪个队列。也就是说,它更加不希望业务方,非要去要一个全局的严格有序。

但是在rocketmq 4.3.x版本中,可以通过制定 MessageQueue的方法指定消息发往哪个队列:

A、普通同步可靠发送方法:

SendResult sendResult = producer.send(msg);

B、普通同步可靠发送,并指定按orderId来选择MessageQueue方法:

就是说,同一个 orderId 发往同一个 MessageQueue。

下面是开发者自己实现的队列选择策略:

//FIXME RocketMQ通过轮询所有队列的方式来确定消息被发送到哪一个队列(负载均衡策略)。

//FIXME 比如下面的示例中,订单号相同的消息会被先后发送到同一个队列中:

// RocketMQ通过MessageQueueSelector中实现的算法来确定消息发送到哪一个队列上

// RocketMQ默认提供了3种MessageQueueSelector实现:随机/Hash/根据机房来选择

// 当然你可以根据业务实现自己的MessageQueueSelector来决定消息按照何种策略发送到消息队列中

intorderId =1;

producer.send(msg, newMessageQueueSelector() {

@Override

publicMessageQueueselect(List mqs,Message msg,Object arg) {

Integer id = (Integer) arg;

intindex = id % mqs.size();

returnmqs.get(index);

}

},orderId);

注意:选择哪个队列是根据

MessageQueue 选择策略

接口决定的。

默认有三种选择策略,除此之外,开发者还可以实现 选择算法:

/**

* 随机选择队列

①、 SelectMessageQueueByRandom 随机选择

*/

public classSelectMessageQueueByRandomimplementsMessageQueueSelector {

privateRandomrandom=newRandom(System.currentTimeMillis());

@Override

publicMessageQueueselect(List mqs,Message msg,Object arg) {

intvalue =random.nextInt(mqs.size());

returnmqs.get(value);

}

}

/**

* 根据 arg hashCode 选择队列

②、 SelectMessageQueueByHash 根据 args Hash值来选择

*/

public classSelectMessageQueueByHashimplementsMessageQueueSelector {

@Override

publicMessageQueueselect(List mqs,Message msg,Object arg) {

intvalue = arg.hashCode();

if(value

value = Math.abs(value);

}

value = value % mqs.size();

returnmqs.get(value);

}

}

/* ③、 SelectMessageQueueByMachineRoom 根据机房来选择

* 未实现

*

* ④、可以自己实现 策略,如同一个订单号的发往同一个队列

*

* RocketMQ默认提供了两种MessageQueueSelector实现

*/

public classSelectMessageQueueByMachineRoomimplementsMessageQueueSelector {

privateSetconsumeridcs;

@Override

publicMessageQueueselect(List mqs,Message msg,Object arg) {

return null;

}

publicSetgetConsumeridcs() {

returnconsumeridcs;

}

public voidsetConsumeridcs(Set consumeridcs) {

this.consumeridcs= consumeridcs;

}

}

/*

④、可以自己实现 策略,如同一个订单号的发往同一个队列

*

* RocketMQ默认提供了两种MessageQueueSelector实现

*/

intorderId =1;

producer.send(msg, newMessageQueueSelector() {

@Override

publicMessageQueueselect(List mqs,Message msg,Object arg) {

Integer id = (Integer) arg;

intindex = id % mqs.size();

returnmqs.get(index);

}

},orderId);

所以从业务层面来保证消息的顺序而不仅仅是依赖于消息系统,是不是我们应该寻求的一种更合理的方式?

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20190120G0ZWZ400?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券