前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >面试系列之-rocketmq长轮询模式

面试系列之-rocketmq长轮询模式

作者头像
用户4283147
发布2022-12-29 20:05:56
5210
发布2022-12-29 20:05:56
举报
文章被收录于专栏:对线JAVA面试对线JAVA面试

Consumer消费两种模式

pull模式
代码语言:javascript
复制
public static void main(String[] args) throws MQClientException {
    DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("rocketmq-consumer");
    consumer.setNamesrvAddr("127.0.0.1:9876");
    consumer.start();
    try {
         MessageQueue mq = new MessageQueue();
         mq.setQueueId(0);
         mq.setTopic("mq-test");
         mq.setBrokerName("broker-a");
        long offset = 26;
        PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, offset, 32);
        System.out.printf("%s%n", pullResult);
    } catch (Exception e) {
        e.printStackTrace();
    }
    consumer.shutdown();
}

Consumer主动从Broker获取消息,可以设置多久拉取一次、可以设置一次拉取多少条消息等参数;

  • 好处:是如果Broker消息特别多的话,消费端按照自身的消费能力匀速消费消息,不至于被大量消息打死;
  • 缺陷:消息超时时间可以配置,设置短则会轮训频率过快服务端会承担压力,甚至导致空转,设置长则导致消息接收不及时;
push模式
代码语言:javascript
复制
public static void main(String[] args) throws InterruptedException,    MQClientException {// 构造方法
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rocketmq-consumer");
    consumer.setNamesrvAddr("localhost:9876");
    consumer.subscribe("mq-test", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,        ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n",        Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
}

Push模式服务端主动向客户端发送消息,Push方式下,消息队列RocketMQ版还支持批量消费功能,可以将批量消息统一推送至Consumer进行消费;

  • 好处:可以及时收到新的消息,消费端不会产生额外的延迟;
  • 缺陷:当有大量的推送消息会加重消费端的负载甚至将消费端打死,同时Broker会维护所有建连的客户端连接;

RocketMQ实现长轮询

长轮询本质上也是客户端发起定时轮训请求,会保持请求到服务端,直到设置的时长(该hold时长要小于HTTP超时时间)到期或者服务端收到消息,进行返回数据,consumer收到响应后根据状态判断是否有消息;

Consumer端处理
Consumer启动
代码语言:javascript
复制
MQClientInstance#start
public void start() throws MQClientException {
    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // 前后省略
                this.pullMessageService.start();
                     break;
            case START_FAILED:
throw new MQClientException("The Factory object[" +            this.getClientId() + "] has been created before,            and failed.", null);default:
                break;
        }
    }
}

Consumer启动,启动过程会执行各种定时任务和守护线程。其中一个pullMessageService 定时发起请求拉取消息服务,一个MQClientInstance 只会启动一个消息拉取线程,就是push模式使用pull封装一下;

Consumer请求
代码语言:javascript
复制
PullMessageService# 客户端发起拉取消息请求
public void run() {
    while (!this.isStopped()) {
try {      // 将返回结果添加到QueuePullRequest pullRequest = this.pullRequestQueue.take();this.pullMessage(pullRequest);
        }
}

private void pullMessage(final PullRequest pullRequest) {
final MQConsumerInner consumer = this.mQClientFactory.    selectConsumer(pullRequest.getConsumerGroup());if (consumer != null) {
        DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
        impl.pullMessage(pullRequest);
    } 
}
DefaultMQPushConsumerImpl#pullMessage
try {
    // 真正拉取消息的地方,首先获取Broker信息
this.pullAPIWrapper.pullKernelImpl(pullRequest.getMessageQueue(),     subExpression,subscriptionData.getExpressionType(),subscriptionData.getSubVersion(), pullRequest.getNextOffset(),     this.defaultMQPushConsumer.getPullBatchSize(),sysFlag, commitOffsetValue, BROKER_SUSPEND_MAX_TIME_MILLIS    , CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,CommunicationMode.ASYNC, pullCallback);
}

启动后Consumer则不断轮询 Broker 获取消息。Rocketmq将每次请求参数放入pullRequestQueue进行缓冲。这样做的好处:consumer可能对应很多topic。当拉取到消息或者长轮询请求到期后进行回调PullCallback进行下一轮拉取消息;

Consumer处理的逻辑包括:

  • 判断 Consumer 处于运行中状态、Consumer 处于暂停中;
  • 消息处理队列持有消息最大量和消息体最大量;
  • 根据 consumeOrderly 判断是否为顺序消息;
  • 根据topic获取订阅组信息;
  • 真正拉取消息,发起netty请求。请求参数包含 messageQueue(可以认为标识当前客户端该topic具有唯一性)、当前 Consumer 最大偏移量、每次拉取数量、拉取方式(同步||异步)、回调函数PullCallback。还有netty连接的超时时长 timeoutMillis = 30s 和 broker端hold时长 brokerSuspendMaxTimeMillis =15s;
Consumer响应处理
代码语言:javascript
复制
DefaultMQPushConsumerImpl#pullMessage
 // 当拉取的请求有响应时
PullCallback pullCallback = new PullCallback() {
    public void onSuccess(PullResult pullResult) {
        if (pullResult != null) {
            pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper
			.processPullResult(pullRequest.getMessageQueue(),            pullResult			,subscriptionData);

            switch (pullResult.getPullStatus()) {
                case FOUND:
                    long prevRequestOffset = pullRequest.getNextOffset();
                    pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                    // 统计消费组下消息主题拉取耗时
                    DefaultMQPushConsumerImpl.this.getConsumerStatsManager()
					.incPullRT(pullRequest.getConsumerGroup(),
                        pullRequest.getMessageQueue().getTopic(), pullRT);

                    long firstMsgOffset = Long.MAX_VALUE;
                    if (pullResult.getMsgFoundList() == null 
					|| pullResult.getMsgFoundList().isEmpty()) {
                        DefaultMQPushConsumerImpl.this
				.executePullRequestImmediately(pullRequest);
                    } else {
                        firstMsgOffset = pullResult.getMsgFoundList().get(0)
						.getQueueOffset();
                        // 提交拉取到的消息到消息处理队列
boolean dispatchToConsume = processQueue.                putMessage(pullResult.getMsgFoundList());// 提交消费请求  ConsumeRequest#run 拉取消息响应listener
		//.consumeMessage最终返回给客户端,同时也包括执行前和执行后逻辑
                        DefaultMQPushConsumerImpl.this.consumeMessageService
						.submitConsumeRequest(pullResult.                        getMsgFoundList(),						processQueue, pullRequest                      .getMessageQueue(), dispatchToConsume);
                        if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer
						.getPullInterval() > 0) {
                            DefaultMQPushConsumerImpl.this.executePullRequestLater(
							pullRequest, DefaultMQPushConsumerImpl.this
							.defaultMQPushConsumer.getPullInterval());
                        } else {
                            // 消费者拉取完消息后,立马就有开始下一个拉取任务
                            DefaultMQPushConsumerImpl
							.this.executePullRequestImmediately(pullRequest);
                        }
                    }
                    if (pullResult.getNextBeginOffset() < prevRequestOffset 
					|| firstMsgOffset < prevRequestOffset) {
                    }
                    break;
                case NO_NEW_MSG:
                    pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                    DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
                    //消费者没有消息,立马就有开始下一个拉取任务
                    DefaultMQPushConsumerImpl.
					this.executePullRequestImmediately(pullRequest);
                    break;
                default:
                    break;
            }
        }
    }

PullCallback则根据pullStatus状态判断是否有消息。不管何种状态最终会调用 executePullRequestImmediately 将拉取请求放入队列中进行下一轮消息请求:

  • FOUND:有消息则进行处理结果和统计、更新最新的偏移量(本地或者远程),完成后将请求添加到pullRequestQueue队列里继续轮训;
  • NO_NEW_MSG:拉取请求没有新消息但超过hold时长返回, 会进行下一轮消息拉取请求;
  • NO_MATCHED_MSG:有新消息但是没有匹配;
  • OFFSET_ILLEGAL:拉取的消息队列位置不合法,需要更新消费进度再进行下一轮消息拉取;
Broker收到Consumer请求
Broker没有收到消息如何hold请求
代码语言:javascript
复制
Consumer发起拉取消息请求,Broker端无消息
Broker端
PullMessageProcessor#processRequest
// broker端没有拉取到消息
case ResponseCode.PULL_NOT_FOUND:
    if (brokerAllowSuspend && hasSuspendFlag) {
        long pollingTimeMills = suspendTimeoutMillisLong;
        if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
            pollingTimeMills = this.brokerController.getBrokerConfig()
				.getShortPollingTimeMills();
        }

        String topic = requestHeader.getTopic();
        long offset = requestHeader.getQueueOffset();
        int queueId = requestHeader.getQueueId();
        PullRequest pullRequest = new PullRequest(request, channel, 
			pollingTimeMills,this.brokerController.getMessageStore().now(), 
			 offset, subscriptionData, messageFilter);
        this.brokerController.getPullRequestHoldService()
			.suspendPullRequest(topic, queueId, pullRequest);
        response = null;
        break;
    }
    
// 先将拉取请求放在this.pullRequestTable中,进行挂载起来
public void suspendPullRequest(final String topic, final int queueId, 
							   final PullRequest pullRequest) {
    String key = this.buildKey(topic, queueId);
    ManyPullRequest mpr = this.pullRequestTable.get(key);
    if (null == mpr) {
        mpr = new ManyPullRequest();
        ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);
        if (prev != null) {
            mpr = prev;
        }
    }
    mpr.addPullRequest(pullRequest);
}

如果broker没有获取到新消息,并不会马上返回pullRequest,会在suspendPullRequest方法中,把当前的请求信息(主要是offset,group,topic,requestId这几个值)放到PullRequestHoldService.pullRequestTable中,而在ReputMessageService的doReput--->messageArrivingListener.arriving-->pullRequestHoldService.notifyMessageArriving--->mpr = this.pullRequestTable.get(key)--->requestList = mpr.cloneListAndClear() 把刚才存进去的所有pullRequest取出来,取到消息再返回。这样就避免了不停的轮询。 hold的请求存放在 ConcurrentHashMap<String, ManyPullRequest> 中,key 为 topic@queueId ,value 是 ManyPullRequest 实际是List<PullRequest> 可以理解对应的多个相同的topic客户端;

hold请求超时处理
代码语言:javascript
复制
PullRequestHoldService  轮训遍历是否阻塞请求快到超时时间,进行唤醒    
public void run() {
    while (!this.isStopped()) {
        try {
            if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                this.waitForRunning(5 * 1000);
            } else {
                this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
            }
            long beginLockTimestamp = this.systemClock.now();
            this.checkHoldRequest();
            long costTime = this.systemClock.now() - beginLockTimestamp;
            if (costTime > 5 * 1000) { }
        }  
    }
}
// 
private void checkHoldRequest() {
        for (String key : this.pullRequestTable.keySet()) {
            String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
            if (2 == kArray.length) {
                String topic = kArray[0]; int queueId = Integer.parseInt(kArray[1]);
                final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
                try {
                    this.notifyMessageArriving(topic, queueId, offset);
                }  
            }
        }
}

Broker端启动线程 PullRequestHoldService 不断轮训检测hold请求是否超时,然后唤醒请求并返回给consumer端。其中轮训时间设置可以是5s一次或者设定时长,进行定期检测;

服务端收到Producer消息
代码语言:javascript
复制
DefaultMessageStore#doReput
if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
    && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
    DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
        dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
        dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
        dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
}
NotifyMessageArrivingListener#arriving
public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
    long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
    this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties);
}

PullRequestHoldService
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,
        long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
        String key = this.buildKey(topic, queueId);
        ManyPullRequest mpr = this.pullRequestTable.get(key);
        if (mpr != null) {
            List<PullRequest> requestList = mpr.cloneListAndClear();  // 克隆挂起的请求列表
            if (requestList != null) {
                List<PullRequest> replayList = new ArrayList<PullRequest>();
                for (PullRequest request : requestList) {
                    long newestOffset = maxOffset;
                    if (newestOffset <= request.getPullFromThisOffset()) {
                        newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
                    }
                    if (newestOffset > request.getPullFromThisOffset()) {
                        boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode, new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));                       
                       // 从挂起的请求列表中找到当前新的消息的匹配的,匹配到了则唤起请求立即给客户端返回。
                       if (match) {
                            try {
                                this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),request.getRequestCommand());
                            }  
                            continue;
                        }
                     }
                    // 如果列表中挂起的请求快超时了则立即唤醒返回给客户端
                    if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
                        try {
                            this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),request.getRequestCommand());
                        }  
                        continue;
                    }
                    replayList.add(request);
                }
                if (!replayList.isEmpty()) {
                    mpr.addPullRequest(replayList);
                }
            }
        }
}

Producer写入消息,Broker端有消息通知Consumer端; 当 Broker 是主节点 && Broker 开启的是长轮询,通知消费队列有新的消息。当拉取消息请求获取不到消息则进行阻塞。当有消息或者或者阻塞超时,重新执行获取消息逻辑,主要是NotifyMessageArrivingListener 会 调用 PullRequestHoldService#notifyMessageArriving(…) 方法通知消费端有消息到达。这时候克隆hold的请求列表,从挂起的请求列表中找到当前新的消息的匹配的,匹配到然后在reput这个操作中顺带激活了长轮询休眠的PullRequest;

总结

当生产者发送最新消息过来后,首先持久化到commitLog文件,通过异步方式同时持久化consumerQueue和index。然后激活consumer发送来hold的请求,立即将消息通过channel写入consumer客户; 如果没有消息到达且客户端拉取的偏移量是最新的,会hold住请求。其中hold请求超时时间 < 请求设定的超时时间。同时Broker端也定时检测是否请求超时,超时则立即将请求返回,状态code为NO_NEW_MESSAGE;

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-11-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 对线JAVA面试 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Consumer消费两种模式
    • pull模式
      • push模式
      • RocketMQ实现长轮询
        • Consumer端处理
          • Consumer启动
          • Consumer请求
          • Consumer响应处理
        • Broker收到Consumer请求
          • Broker没有收到消息如何hold请求
          • hold请求超时处理
          • 服务端收到Producer消息
        • 总结
        相关产品与服务
        消息队列 CMQ 版
        消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档