根据使用者对读取操作的控制情况,消费在可以分为两种类型:
package william.rmq.consumer.quickstart;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import javax.annotation.PostConstruct;
import java.io.UnsupportedEncodingException;
import java.util.List;
/**
* @Auther: ZhangShenao
* @Date: 2018/9/7 11:06
* @Description:RocketMQ消息消费者
*/
@Slf4j
@Service
public class MessageConsumer implements MessageListenerConcurrently {
@Value("${spring.rocketmq.namesrvAddr}")
private String namesrvAddr;
private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DefaultConsumer");
@PostConstruct
public void start() {
try {
consumer.setNamesrvAddr(namesrvAddr);
//从消息队列头部开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//设置广播消费模式
consumer.setMessageModel(MessageModel.BROADCASTING);
//订阅主题
consumer.subscribe("DefaultCluster", "*");
//注册消息监听器
consumer.registerMessageListener(this);
//启动消费端
consumer.start();
log.info("Message Consumer Start...");
System.err.println("Message Consumer Start...");
} catch (MQClientException e) {
log.error("Message Consumer Start Error!!",e);
}
}
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
if (CollectionUtils.isEmpty(msgs)){
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
msgs.stream()
.forEach(msg -> {
try {
String messageBody = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
log.info("Message Consumer: Handle New Message: messageId:{}, topic:{}, tags:{}, keys:{}, messageBody:{}"
, msg.getMsgId(), msg.getTopic(), msg.getTags(), msg.getKeys(), messageBody);
System.err.println("Message Consumer: Handle New Message: messageId: " + msg.getMsgId() + ",topic: " + msg.getTopic() + ",tags: " + msg.getTags());
} catch (Exception e) {
log.error("Consume Message Error!!", e);
}
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
下面对这几个参数进行详细介绍:
本节结合源码分析DefaultMQPushConsumer的处理流程。
DefaultMQPushConsumer主要功能实现在DefaultMQPushConsumerImpl中,消息处理逻辑是在pullMessage()方法的PullCallback回调中。在PullCallback回调中有个switch语句,根据Broker返回的消息类型做响应的处理,具体逻辑看源码:
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
switch (pullResult.getPullStatus()) {
case FOUND:
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
if (pullResult.getNextBeginOffset() < prevRequestOffset
|| firstMsgOffset < prevRequestOffset) {
log.warn(
"[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
pullResult.getNextBeginOffset(),
firstMsgOffset,
prevRequestOffset);
}
break;
case NO_NEW_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case NO_MATCHED_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case OFFSET_ILLEGAL:
log.warn("the pull request offset illegal, {} {}",
pullRequest.toString(), pullResult.toString());
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
pullRequest.getProcessQueue().setDropped(true);
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
@Override
public void run() {
try {
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
pullRequest.getNextOffset(), false);
DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
log.warn("fix the pull request offset, {}", pullRequest);
} catch (Throwable e) {
log.error("executeTaskLater Exception", e);
}
}
}, 10000);
break;
default:
break;
}
}
}
@Override
public void onException(Throwable e) {
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("execute the pull request exception", e);
}
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
};
PullCallback是一个消息拉取回调,Consumer从Broker拉取消息会,会根据拉取状态回调对应的onSuccess或onException方法。在onSuccess()的处理中,会根据不同的PullStatus进行不同的处理,PullStatus的状态有:
DefaultMQPushConsumer中有很多PullRequest的方法,如executePullRequestImmediately(),之所以在PushConsumer中也使用PullRequest的方式拉取消息,是因为RocketMQ通过长轮询的方式来实现Push和Pull两种模式,长轮询可以即有Pull的优点,又兼具Push的实时性。
Push方式是Broker端接收到消息后,主动把消息推给Consumer端,实时性高。对于一个提供消息队列服务的Server来说,用Push方式会有很多弊端:首先是消息的流量难以控制,当Push的消息过多时会加大Server的工作量,进而影响Server的性能;其次,Client的处理能力各不相同,且Client的状态不受Server控制,如果Client不能及时处理Server推送过来的消息,在造成消息堆积等各种潜在的问题。
Pull方式是Client端循环地从Server端拉取消息,主动权在Client手里,自己拉取到一定量的消息后,妥当处理完毕再继续拉取。Pull方式的问题是循环拉取的时间间隔不好设定,间隔太短就会处于”忙等”的状态,浪费资源;间隔太长又可能导致Server端有消息到来时没有及时被处理。
“长轮询”方式通过Client端和Server端的配合,达到既拥有Pull的优点,又保证实时性的目的。
“长轮询”的核心是,Broker端HOLD住客户端的请求一小段时间,如果在这段时间内有消息到达,就利用现有的链接立刻返回消息给Consumer。”长轮询”的主动权还是掌握在Consumer手上,即使Broker有大量的消息积压,也不会主动推送给Consumer。
长轮询方式的局限性在于,HOLD住Consumer端请求时,需要占用资源,它适合用在消息队列这种客户端连接数可控的场景中。
package william.rmq.consumer.pull;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.common.message.MessageQueue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import william.rmq.common.constant.RocketMQConstant;
import javax.annotation.PostConstruct;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* @Auther: ZhangShenao
* @Date: 2018/9/15 09:25
* @Description:使用DefaultMQPullConsumer拉取消息
*/
@Service
@Slf4j
public class PullMessageConsumer {
/**记录每个MessageQueue的消费位点offset,可以持久化到DB或缓存Redis,这里作为演示就保存在程序中*/
private static final Map<MessageQueue,Long> OFFSET_TABLE = new ConcurrentHashMap<>();
@Value("${spring.rocketmq.namesrvAddr}")
private String namesrvAddr;
/**使用DefaultMQPullConsumer实现拉取消息模式*/
private DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("DefaultPullConsumer");
/**每次拉取消息的最大数量*/
private static final int MAX_PULL_SIZE_EACH_TIME = 32;
@PostConstruct
public void start() {
try {
//设置namesrv地址
consumer.setNamesrvAddr(namesrvAddr);
//启动消费端
consumer.start();
System.err.println("Order Message Consumer Start...");
//从指定的Topic pull消息
Set<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues(RocketMQConstant.TEST_TOPIC_NAME);
//遍历MessageQueue,获取Message
for (MessageQueue messageQueue : messageQueues){
//获取该MessageQueue的消费位点
long offset = consumer.fetchConsumeOffset(messageQueue, true);
System.err.println("Consumer From Queue: " + messageQueue + ",from offset: " + offset);
while (true){
try {
//拉取消息
PullResult pullResult = consumer.pullBlockIfNotFound(messageQueue, null, getMessageQueueOffset(messageQueue), MAX_PULL_SIZE_EACH_TIME);
System.err.println("Pull Message Result: " + pullResult);
//记录offset
saveMessageQueueOffset(messageQueue,pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()){
//拉取到消息
case FOUND: break;
//没有匹配的消息
case NO_MATCHED_MSG: break;
//暂时没有新消息
case NO_NEW_MSG: continue;
//offset非法
case OFFSET_ILLEGAL: break;
default: break;
}
}
catch (Exception e){
log.error("Pull Message Error!!",e);
}
}
}
//关闭Consumer
consumer.shutdown();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private long getMessageQueueOffset(MessageQueue messageQueue){
Long offset = OFFSET_TABLE.get(messageQueue);
return (offset != null ? offset : 0);
}
private void saveMessageQueueOffset(MessageQueue messageQueue,long offset){
OFFSET_TABLE.put(messageQueue,offset);
}
}
分别启动生产端和消费端程序,可看到消费端控制台打印如下:
Order Message Consumer Start...
Consumer From Queue: MessageQueue [topic=DefaultCluster, brokerName=zhangshenao, queueId=0],from offset: 0
Pull Message Result: PullResult [pullStatus=FOUND, nextBeginOffset=32, minOffset=0, maxOffset=57, msgFoundList=32]
Pull Message Result: PullResult [pullStatus=FOUND, nextBeginOffset=57, minOffset=0, maxOffset=57, msgFoundList=25]
Pull Message Result: PullResult [pullStatus=NO_NEW_MSG, nextBeginOffset=57, minOffset=0, maxOffset=57, msgFoundList=0]
Pull Message Result: PullResult [pullStatus=NO_NEW_MSG, nextBeginOffset=57, minOffset=0, maxOffset=57, msgFoundList=0]
Pull Message Result: PullResult [pullStatus=FOUND, nextBeginOffset=58, minOffset=0, maxOffset=58, msgFoundList=1]
Pull Message Result: PullResult [pullStatus=FOUND, nextBeginOffset=59, minOffset=0, maxOffset=59, msgFoundList=1]
Pull Message Result: PullResult [pullStatus=FOUND, nextBeginOffset=60, minOffset=0, maxOffset=60, msgFoundList=1]
Pull Message Result: PullResult [pullStatus=FOUND, nextBeginOffset=61, minOffset=0, maxOffset=61, msgFoundList=1]
五. Consumer的启动、关闭流程