顺序消息是消息队列 RocketMQ 提供的一种高级消息类型,对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发送的消息先消费,后发送的消息后消费。
顺序消息适用于对消息发送和消费顺序有严格要求的情况。
使用场景
顺序消息和普通消息的对比如下:
消息类型 | 消费顺序 | 性能 | 适用场景 |
普通消息 | 无顺序 | 高 | 适用于对吞吐量要求高,且对生产和消费顺序无要求 |
顺序消息 | 指定的 Topic 内的消息遵循先入先出(FIFO)规则 | 一般 | 吞吐量要求一般,但是要求特定的 Topic 严格地按照 FIFO 原则进行消息发布和消费的场景 |
对应到具体的业务场景,顺序消息可以被用在以下场景中:
订单创建场景:在一些电商系统中,同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息必须严格按照先后顺序来进行生产或者消费,否则消费中传递订单状态会发生紊乱,影响业务的正常进行。因此,该订单的消息必须按照一定的顺序在客户端和消息队列中进行生产和消费,同时消息之间有先后的依赖关系,后一条消息需要依赖于前一条消息的处理结果。
日志同步场景:在有序事件处理或者数据实时增量同步的场景中,顺序消息也能发挥较大的作用,如同步 mysql 的 binlog 日志时,需要保证数据库的操作是有顺序的。
金融场景:在一些撮合交易的场景下,例如某些证券交易,在价格相同的情况下,先出价者优先处理,则需要按照FIFO的方式生产和消费顺序消息。
说明:
在某些情况下可能会出现短暂的不保序现象,例如在客户端频繁上下线、4.x 集群升降配前后和集群升级等。
实现原理
在 RocketMQ 中支持顺序消息的原理如下图所示。我们可以按照某一个标准对消息进行分区(例如图中的ShardingKey),同一个ShardingKey 的消息会被分配到同一个队列中,并按照顺序被消费。


生产顺序消息
顺序消息的代码如下所示:
public class Producer {public static void main(String[] args) throws UnsupportedEncodingException {try {// 实例化消息生产者ProducerDefaultMQProducer producer = new DefaultMQProducer(groupName,// ACL权限new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY)), true, null);// 设置NameServer的地址producer.setNamesrvAddr("rmq-xxx.rocketmq.xxxtencenttdmq.com:8080");// 启动Producer实例producer.start();String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};for (int i = 0; i < 100; i++) {int orderId = i % 10;Message msg =new Message("TopicTest", tags[i % tags.length], "KEY" + i,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;int index = id % mqs.size();return mqs.get(index);}}, orderId);System.out.printf("%s%n", sendResult);}producer.shutdown();} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {e.printStackTrace();}}}
这里的区别主要是调用了
SendResult send(Message msg, MessageQueueSelector selector, Object arg)
方法,MessageQueueSelector
是队列选择器,arg 是一个 Java Object 对象,可以传入作为消息发送分区的分类标准。MessageQueueSelector
的接口如下:public interface MessageQueueSelector {MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);}
其中 mqs 是可以发送的队列,msg 是消息,arg 是上述 send 接口中传入的 Object 对象,返回的是该消息需要发送到的队列。上述例子里,是以 orderId 作为分区分类标准,对所有队列个数取余,来对将相同 orderId 的消息发送到同一个队列中。
生产环境中建议选择最细粒度的分区键进行拆分,例如,将订单ID、用户ID作为分区键关键字,可实现同一终端用户的消息按照顺序处理,不同用户的消息无需保证顺序。
注意:
为了保证消息的高可用,目前TDMQ RocketMQ版不支持单队列的 “全局顺序消息”(已经创建了全局顺序消息的用户可以正常使用);如果您想保证全局的顺序性,您可以通过使用一致的 ShardingKey 来实现。
消费顺序消息
顺序消费代码如下:
/*** Description: 顺序消费者*/public class OrderConsumer {/*** topic名称*/private static final String TOPIC_NAME = "order_topic";/*** 消费者组名称*/private static final String GROUP_NAME = "group2";public static void main(String[] args) throws Exception {// 创建消息消费者// 实例化消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(GROUP_NAME,new AclClientRPCHook(new SessionCredentials("accessKey", "secretKey")),new AllocateMessageQueueAveragely(), true, null);// 设置NameServer的地址consumer.setNamesrvAddr("rmq-xxx.rocketmq.xxxtencenttdmq.com:8080");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);/** 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费* 如果非第一次启动,那么按照上次消费的位置继续消费*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);// 订阅topic中的所有的信息consumer.subscribe(TOPIC_NAME, "*");consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {context.setAutoCommit(true);for (MessageExt msg : msgs) {// 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序System.out.println("consumeThread=" + Thread.currentThread().getName() + ", queueId=" + msg.getQueueId() + ", msgId=" + msg.getMsgId() + ", content:" + new String(msg.getBody()));}try {// 模拟业务逻辑处理中...TimeUnit.SECONDS.sleep(1);} catch (Exception e) {e.printStackTrace();}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.println("Consumer Started.");}}