public interface MessageQueueSelector {
MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}
可以实现MessageQueueSelector接口,在select方法中自定义选择哪个MessageQueue。然后调用
public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg, selector, arg);
}
package william.rmq.producer.order;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import java.util.List;
/**
* @Auther: ZhangShenao
* @Date: 2018/9/11 17:36
* @Description: 自定义MessageQueueSelector,根据发送消息时传递的参数,选择指定的MessageQueue
*/
public class OrderMessageQueueSelector implements MessageQueueSelector{
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
//选择以参数arg为索引的MessageQueue
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}
在select方法中,根据传入的arg参数决定目标MessageQueue的索引。
下面实现发送消息逻辑:
package william.rmq.producer.order;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import william.rmq.common.constant.RocketMQConstant;
import william.rmq.producer.common.CommonSendCallback;
import javax.annotation.PostConstruct;
/**
* @Auther: ZhangShenao
* @Date: 2018/9/11 17:32
* @Description:顺序消息生产端
*/
@Slf4j
@Service
public class OrderMessageProducer {
@Value("${spring.rocketmq.namesrvAddr}")
private String namesrvAddr;
private static final DefaultMQProducer producer = new DefaultMQProducer("OrderProducer");
private static final String[] ORDER_MESSAGES = {"下单","结算","支付","完成"};
@PostConstruct
public void sendMessage() {
try {
//设置namesrv
producer.setNamesrvAddr(namesrvAddr);
//启动Producer
producer.start();
System.err.println("Order Message Producer Start...");
//创建3组消息,每组消息发往同一个Queue,保证消息的局部有序性
String tags = "Tags";
OrderMessageQueueSelector orderMessageQueueSelector = new OrderMessageQueueSelector();
//注:要实现顺序消费,必须同步发送消息
for (int i = 0;i < 3;i++){
String orderId = "" + (i + 1);
for (int j = 0,size = ORDER_MESSAGES.length;j < size;j++){
String message = "Order-" + orderId + "-" + ORDER_MESSAGES[j];
String keys = message;
byte[] messageBody = message.getBytes(RemotingHelper.DEFAULT_CHARSET);
Message mqMsg = new Message(RocketMQConstant.TEST_TOPIC_NAME, tags, keys, messageBody);
producer.send(mqMsg, orderMessageQueueSelector,i);
}
}
} catch (Exception e) {
log.error("Message Producer: Send Message Error ", e);
}
}
}
package william.rmq.consumer.order;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.util.CollectionUtils;
import java.util.List;
/**
* @Auther: ZhangShenao
* @Date: 2018/9/11 17:53
* @Description:顺序消息监听器
*/
public class OrderMessageListener implements MessageListenerOrderly{
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
if (CollectionUtils.isEmpty(msgs)){
return ConsumeOrderlyStatus.SUCCESS;
}
//设置自动提交
context.setAutoCommit(true);
msgs.stream()
.forEach(msg -> {
try {
String messageBody = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.err.println("Handle Order Message: messageId: " + msg.getMsgId() + ",topic: " + msg.getTopic() + ",tags: "
+ msg.getTags() + ",keys: " + msg.getKeys() + ",messageBody: " + messageBody);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
return ConsumeOrderlyStatus.SUCCESS;
}
}
package william.rmq.consumer.order;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import william.rmq.common.constant.RocketMQConstant;
import javax.annotation.PostConstruct;
/**
* @Auther: ZhangShenao
* @Date: 2018/9/11 17:53
* @Description:顺序消息的消费者
*/
@Service
public class OrderMessageConsumer {
@Value("${spring.rocketmq.namesrvAddr}")
private String namesrvAddr;
private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DefaultConsumer");
@PostConstruct
public void start() {
try {
//设置namesrv地址
consumer.setNamesrvAddr(namesrvAddr);
//从消息队列头部开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//集群消费模式
consumer.setMessageModel(MessageModel.CLUSTERING);
//订阅主题
consumer.subscribe(RocketMQConstant.TEST_TOPIC_NAME, "*");
//注册消息监听器,这里因为要实现顺序消费,所以必须注册MessageListenerOrderly
consumer.registerMessageListener(new OrderMessageListener());
//启动消费端
consumer.start();
System.err.println("Order Message Consumer Start...");
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Handle Order Message: messageId: 0A0E089B386F18B4AAC23B99C8440000,topic: DefaultCluster,tags: Tags,keys: Order-1-下单,messageBody: Order-1-下单
Handle Order Message: messageId: 0A0E089B386F18B4AAC23B99C8510001,topic: DefaultCluster,tags: Tags,keys: Order-1-结算,messageBody: Order-1-结算
Handle Order Message: messageId: 0A0E089B386F18B4AAC23B99C8540002,topic: DefaultCluster,tags: Tags,keys: Order-1-支付,messageBody: Order-1-支付
Handle Order Message: messageId: 0A0E089B386F18B4AAC23B99C8580003,topic: DefaultCluster,tags: Tags,keys: Order-1-完成,messageBody: Order-1-完成
Handle Order Message: messageId: 0A0E089B386F18B4AAC23B99C85A0004,topic: DefaultCluster,tags: Tags,keys: Order-2-下单,messageBody: Order-2-下单
Handle Order Message: messageId: 0A0E089B386F18B4AAC23B99C85F0005,topic: DefaultCluster,tags: Tags,keys: Order-2-结算,messageBody: Order-2-结算
Handle Order Message: messageId: 0A0E089B386F18B4AAC23B99C8620006,topic: DefaultCluster,tags: Tags,keys: Order-2-支付,messageBody: Order-2-支付
Handle Order Message: messageId: 0A0E089B386F18B4AAC23B99C8680007,topic: DefaultCluster,tags: Tags,keys: Order-2-完成,messageBody: Order-2-完成
Handle Order Message: messageId: 0A0E089B386F18B4AAC23B99C86E0008,topic: DefaultCluster,tags: Tags,keys: Order-3-下单,messageBody: Order-3-下单
Handle Order Message: messageId: 0A0E089B386F18B4AAC23B99C8720009,topic: DefaultCluster,tags: Tags,keys: Order-3-结算,messageBody: Order-3-结算
Handle Order Message: messageId: 0A0E089B386F18B4AAC23B99C881000A,topic: DefaultCluster,tags: Tags,keys: Order-3-支付,messageBody: Order-3-支付
Handle Order Message: messageId: 0A0E089B386F18B4AAC23B99C883000B,topic: DefaultCluster,tags: Tags,keys: Order-3-完成,messageBody: Order-3-完成