public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("rocketmq-consumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.start();
try {
MessageQueue mq = new MessageQueue();
mq.setQueueId(0);
mq.setTopic("mq-test");
mq.setBrokerName("broker-a");
long offset = 26;
PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, offset, 32);
System.out.printf("%s%n", pullResult);
} catch (Exception e) {
e.printStackTrace();
}
consumer.shutdown();
}
Consumer主动从Broker获取消息,可以设置多久拉取一次、可以设置一次拉取多少条消息等参数;
public static void main(String[] args) throws InterruptedException, MQClientException {// 构造方法
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rocketmq-consumer");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("mq-test", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
Push模式服务端主动向客户端发送消息,Push方式下,消息队列RocketMQ版还支持批量消费功能,可以将批量消息统一推送至Consumer进行消费;
长轮询本质上也是客户端发起定时轮训请求,会保持请求到服务端,直到设置的时长(该hold时长要小于HTTP超时时间)到期或者服务端收到消息,进行返回数据,consumer收到响应后根据状态判断是否有消息;
MQClientInstance#start
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// 前后省略
this.pullMessageService.start();
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);default:
break;
}
}
}
Consumer启动,启动过程会执行各种定时任务和守护线程。其中一个pullMessageService 定时发起请求拉取消息服务,一个MQClientInstance 只会启动一个消息拉取线程,就是push模式使用pull封装一下;
PullMessageService# 客户端发起拉取消息请求
public void run() {
while (!this.isStopped()) {
try { // 将返回结果添加到QueuePullRequest pullRequest = this.pullRequestQueue.take();this.pullMessage(pullRequest);
}
}
private void pullMessage(final PullRequest pullRequest) {
final MQConsumerInner consumer = this.mQClientFactory. selectConsumer(pullRequest.getConsumerGroup());if (consumer != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
impl.pullMessage(pullRequest);
}
}
DefaultMQPushConsumerImpl#pullMessage
try {
// 真正拉取消息的地方,首先获取Broker信息
this.pullAPIWrapper.pullKernelImpl(pullRequest.getMessageQueue(), subExpression,subscriptionData.getExpressionType(),subscriptionData.getSubVersion(), pullRequest.getNextOffset(), this.defaultMQPushConsumer.getPullBatchSize(),sysFlag, commitOffsetValue, BROKER_SUSPEND_MAX_TIME_MILLIS , CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,CommunicationMode.ASYNC, pullCallback);
}
启动后Consumer则不断轮询 Broker 获取消息。Rocketmq将每次请求参数放入pullRequestQueue进行缓冲。这样做的好处:consumer可能对应很多topic。当拉取到消息或者长轮询请求到期后进行回调PullCallback进行下一轮拉取消息;
Consumer处理的逻辑包括:
DefaultMQPushConsumerImpl#pullMessage
// 当拉取的请求有响应时
PullCallback pullCallback = new PullCallback() {
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper
.processPullResult(pullRequest.getMessageQueue(), pullResult ,subscriptionData);
switch (pullResult.getPullStatus()) {
case FOUND:
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
// 统计消费组下消息主题拉取耗时
DefaultMQPushConsumerImpl.this.getConsumerStatsManager()
.incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null
|| pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this
.executePullRequestImmediately(pullRequest);
} else {
firstMsgOffset = pullResult.getMsgFoundList().get(0)
.getQueueOffset();
// 提交拉取到的消息到消息处理队列
boolean dispatchToConsume = processQueue. putMessage(pullResult.getMsgFoundList());// 提交消费请求 ConsumeRequest#run 拉取消息响应listener
//.consumeMessage最终返回给客户端,同时也包括执行前和执行后逻辑
DefaultMQPushConsumerImpl.this.consumeMessageService
.submitConsumeRequest(pullResult. getMsgFoundList(), processQueue, pullRequest .getMessageQueue(), dispatchToConsume);
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer
.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(
pullRequest, DefaultMQPushConsumerImpl.this
.defaultMQPushConsumer.getPullInterval());
} else {
// 消费者拉取完消息后,立马就有开始下一个拉取任务
DefaultMQPushConsumerImpl
.this.executePullRequestImmediately(pullRequest);
}
}
if (pullResult.getNextBeginOffset() < prevRequestOffset
|| firstMsgOffset < prevRequestOffset) {
}
break;
case NO_NEW_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
//消费者没有消息,立马就有开始下一个拉取任务
DefaultMQPushConsumerImpl.
this.executePullRequestImmediately(pullRequest);
break;
default:
break;
}
}
}
PullCallback则根据pullStatus状态判断是否有消息。不管何种状态最终会调用 executePullRequestImmediately 将拉取请求放入队列中进行下一轮消息请求:
Consumer发起拉取消息请求,Broker端无消息
Broker端
PullMessageProcessor#processRequest
// broker端没有拉取到消息
case ResponseCode.PULL_NOT_FOUND:
if (brokerAllowSuspend && hasSuspendFlag) {
long pollingTimeMills = suspendTimeoutMillisLong;
if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
pollingTimeMills = this.brokerController.getBrokerConfig()
.getShortPollingTimeMills();
}
String topic = requestHeader.getTopic();
long offset = requestHeader.getQueueOffset();
int queueId = requestHeader.getQueueId();
PullRequest pullRequest = new PullRequest(request, channel,
pollingTimeMills,this.brokerController.getMessageStore().now(),
offset, subscriptionData, messageFilter);
this.brokerController.getPullRequestHoldService()
.suspendPullRequest(topic, queueId, pullRequest);
response = null;
break;
}
// 先将拉取请求放在this.pullRequestTable中,进行挂载起来
public void suspendPullRequest(final String topic, final int queueId,
final PullRequest pullRequest) {
String key = this.buildKey(topic, queueId);
ManyPullRequest mpr = this.pullRequestTable.get(key);
if (null == mpr) {
mpr = new ManyPullRequest();
ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);
if (prev != null) {
mpr = prev;
}
}
mpr.addPullRequest(pullRequest);
}
如果broker没有获取到新消息,并不会马上返回pullRequest,会在suspendPullRequest方法中,把当前的请求信息(主要是offset,group,topic,requestId这几个值)放到PullRequestHoldService.pullRequestTable中,而在ReputMessageService的doReput--->messageArrivingListener.arriving-->pullRequestHoldService.notifyMessageArriving--->mpr = this.pullRequestTable.get(key)--->requestList = mpr.cloneListAndClear() 把刚才存进去的所有pullRequest取出来,取到消息再返回。这样就避免了不停的轮询。 hold的请求存放在 ConcurrentHashMap<String, ManyPullRequest> 中,key 为 topic@queueId ,value 是 ManyPullRequest 实际是List<PullRequest> 可以理解对应的多个相同的topic客户端;
PullRequestHoldService 轮训遍历是否阻塞请求快到超时时间,进行唤醒
public void run() {
while (!this.isStopped()) {
try {
if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
this.waitForRunning(5 * 1000);
} else {
this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
}
long beginLockTimestamp = this.systemClock.now();
this.checkHoldRequest();
long costTime = this.systemClock.now() - beginLockTimestamp;
if (costTime > 5 * 1000) { }
}
}
}
//
private void checkHoldRequest() {
for (String key : this.pullRequestTable.keySet()) {
String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
if (2 == kArray.length) {
String topic = kArray[0]; int queueId = Integer.parseInt(kArray[1]);
final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
try {
this.notifyMessageArriving(topic, queueId, offset);
}
}
}
}
Broker端启动线程 PullRequestHoldService 不断轮训检测hold请求是否超时,然后唤醒请求并返回给consumer端。其中轮训时间设置可以是5s一次或者设定时长,进行定期检测;
DefaultMessageStore#doReput
if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
}
NotifyMessageArrivingListener#arriving
public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties);
}
PullRequestHoldService
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,
long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
String key = this.buildKey(topic, queueId);
ManyPullRequest mpr = this.pullRequestTable.get(key);
if (mpr != null) {
List<PullRequest> requestList = mpr.cloneListAndClear(); // 克隆挂起的请求列表
if (requestList != null) {
List<PullRequest> replayList = new ArrayList<PullRequest>();
for (PullRequest request : requestList) {
long newestOffset = maxOffset;
if (newestOffset <= request.getPullFromThisOffset()) {
newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
}
if (newestOffset > request.getPullFromThisOffset()) {
boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode, new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));
// 从挂起的请求列表中找到当前新的消息的匹配的,匹配到了则唤起请求立即给客户端返回。
if (match) {
try {
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),request.getRequestCommand());
}
continue;
}
}
// 如果列表中挂起的请求快超时了则立即唤醒返回给客户端
if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
try {
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),request.getRequestCommand());
}
continue;
}
replayList.add(request);
}
if (!replayList.isEmpty()) {
mpr.addPullRequest(replayList);
}
}
}
}
Producer写入消息,Broker端有消息通知Consumer端; 当 Broker 是主节点 && Broker 开启的是长轮询,通知消费队列有新的消息。当拉取消息请求获取不到消息则进行阻塞。当有消息或者或者阻塞超时,重新执行获取消息逻辑,主要是NotifyMessageArrivingListener 会 调用 PullRequestHoldService#notifyMessageArriving(…) 方法通知消费端有消息到达。这时候克隆hold的请求列表,从挂起的请求列表中找到当前新的消息的匹配的,匹配到然后在reput这个操作中顺带激活了长轮询休眠的PullRequest;
当生产者发送最新消息过来后,首先持久化到commitLog文件,通过异步方式同时持久化consumerQueue和index。然后激活consumer发送来hold的请求,立即将消息通过channel写入consumer客户; 如果没有消息到达且客户端拉取的偏移量是最新的,会hold住请求。其中hold请求超时时间 < 请求设定的超时时间。同时Broker端也定时检测是否请求超时,超时则立即将请求返回,状态code为NO_NEW_MESSAGE;