前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RocketMQ源码(二)消息消费的模式到底是Push还是Pull?

RocketMQ源码(二)消息消费的模式到底是Push还是Pull?

原创
作者头像
用户2031163
发布2023-11-29 18:15:20
2290
发布2023-11-29 18:15:20
举报

RocketMQ源码(二)消息消费的模式到底是Push还是Pull?

RocketMQ为开发者提供了两种消息的消费模式,分别是PullPush,对应的实现是DefaultMQPullConsumer和DefaultMQPushConsumer; 接下来我将带大家通过以下几个方面了解这两种模式:

  • Pull和Push的使用示例
  • 跟踪源码分析两种模式的实现原理
  • RocketMQ到底是Push还是Pull呢?

Tip:我本人在多年的开发经验中常用的MQ中间件如Kafka、RocketMQ都实战使用的,简单的使用示例可以参考我Git。

代码语言:javascript
复制
// Git代码
https://gitee.com/yeeevip/yeee-memo/tree/master/middle-ware/mq
https://github.com/yeeevip/yeee-memo/tree/master/middle-ware/mq

1 Pull模式

这种模式很容易理解,就是消费者主动请求Broker去拉取一批消息,然后消费; 这种模式的好处是可以根据客户端消费能力主动获取消息量;但是弊端也比较明显,就是获取消息的时机不太好把握 ,获取时间间隔小容易造成CPU浪费,时间间隔太大又会造成消费不及时。

1.1 使用示例

使用提供的DefaultMQPullConsumer这个实现,调用fetchMessageQueuesInBalance拿到该Topic下的Queue,然后调用pull()方法从Queue中指定offset获取消息

代码语言:javascript
复制
public class PullConsumer {
    public static void main(String[] args) throws MQClientException {
        // 创建Pull模式消费实例
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("test_consumer_group");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.start();
        // 获取该Topic下的所有Queue
        Set<MessageQueue> messageQueues = consumer.fetchMessageQueuesInBalance("TopicTest");
        PullResult pullResult = null;
        // 从Queue中获取消息
        for (MessageQueue messageQueue : messageQueues) {
            long offset = this.consumeFromOffset(messageQueue);
            pullResult = consumer.pull(messageQueue, "*", offset, 32);
            switch (pullResult.getPullStatus()) {
                case FOUND:
                    List<MessageExt> msgs = pullResult.getMsgFoundList();
                    // 执行自定义的消费逻辑
                    this.doSomething(msgs);
                    //update offset to broker
                    consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
                    break;
                case OFFSET_ILLEGAL:
                    consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
                    break;
                case NO_NEW_MSG:
                    Thread.sleep(1);
                    consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
                    break;
                case NO_MATCHED_MSG:
                    consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
                    break;
                default:
            }
        }
    }
}

1.2 拉取源码分析

  • 注意一下DefaultMQPullConsumer.pull()方法;
  • 这个方法会执行MQClientAPIImpl().pullMessage把请求封装为RequestCode.PULL_MESSAGE的RemotingCommand命令;
  • 然后由NettyRemotingClient发送RemotingCommand命令到Broker;
  • 最后就是Broker收到拉取请求根据请求信息把匹配的消息响应到客户端。
1.2.1 核心代码
代码语言:javascript
复制
/*
   封装请求报文RemotingCommand.createRequestCommand(RequestCode.**PULL_MESSAGE**)
 */
public class MQClientAPIImpl implements NameServerUpdateCallback {
    public PullResult pullMessage(final String addr, final PullMessageRequestHeader requestHeader, ..., final PullCallback pullCallback) {
        ...
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
        ...
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
        ...
    }
}
/*
   RemotingClient调用channel.writeAndFlush(request)发出拉取请求
 */
public abstract class NettyRemotingAbstract {
    public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) {
        ...
        try {
            ...
            channel.writeAndFlush(request).addListener((ChannelFutureListener) f -> {
                ...
            });
            ...
        } 
        ...
    }
}

2 Push模式

这个模式解决了Pull模式请求时间间隔的痛点,从直观上看来就是Broker主动推送消息,这样消息消费也比较及时。

2.1 使用示例

用api提供的DefaultMQPushConsumer这个实现,首先订阅Topic及注册监听方法,然后调用start方法就可以接收消息了。

代码语言:javascript
复制
public class Consumer {
    
    public static void main(String[] args) throws MQClientException {
        // 创建Push模式消费实例
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
        consumer.subscribe(TOPIC, "*");
        // 注册监听方法处理消息逻辑
        consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        // 开启
        consumer.start();
    }
}

2.2 推送源码分析

  • 在执行DefaultMQPushConsumer.start()方法后
  • 实际上会开启一个PullMessageService任务
  • 在PullMessageService这个任务中会轮询执行DefaultMQPushConsumerImpl.pullMessage
  • 跟踪pullMessage源码发现其实是在Pull模式拉取逻辑上增加一系列延迟请求,一定程度上避免短时间内无效请求
代码语言:javascript
复制
/*
   开启拉取消息任务
 */
public class MQClientInstance {

    public void start() throws MQClientException {
        ...
        // Start pull service
        this.pullMessageService.start();
        ...
    }
}
/*
   轮询执行拉取消息请求
 */
public class PullMessageService extends ServiceThread {
    @Override
    public void run() {
        while (!this.isStopped()) {
            try {
                MessageRequest messageRequest = this.messageRequestQueue.take();
                // 执行DefaultMQPushConsumerImpl.pullMessage拉取
                this.pullMessage((PullRequest) messageRequest);
            } catch (Exception e) {
                logger.error("Pull Message Service Run Method exception", e);
            }
        }
    }
}
/*
   同Pull模式QClientAPIImpl().pullMessage基础上进一步封装了Pull逻辑;
   在命中某些条件下执行executePullRequestLater方法延迟请求拉取,避免短时间内大量无效请求
 */
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
    
    public void pullMessage(final PullRequest pullRequest) {

        // 通过判断各种条件下是否执行延迟处理,避免短时间内大量无效请求
        if (...) {
            this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
        }

        ...

        // 拉取回调逻辑,执行之前注册的registerMessageListener监听
        PullCallback pullCallback = new PullCallback() {
            @Override
            public void onSuccess(PullResult pullResult) {
                ...
            }
            @Override
            public void onException(Throwable e) {
                ...
            }
        };
        
        ...
        
        try {
            this.pullAPIWrapper.pullKernelImpl(
                    pullRequest.getMessageQueue(),
                    subExpression,
                    ...
                    ...
                    pullCallback
            );
        } catch (Exception e) {
            // 延迟请求
            this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
        }
    }
}
  • 在Broker端,收到Consumer拉取请求后如果没有新消息时会将请求挂起一定时间,也可以避免Consumer重复无效请求
  • 此外如果CommitLog有消息产生,Broker也会主动将消息返回给之前挂起的Consumer,已达到消息消费的及时性
代码语言:javascript
复制
/*
   没有消息则挂起此次请求
 */
public class DefaultPullMessageResultHandler implements PullMessageResultHandler {
    @Override
    public RemotingCommand handle(final GetMessageResult getMessageResult,
                                  final RemotingCommand request,
                                  ...
                                  final Channel channel,
                                  ...
                                  RemotingCommand response) {

        switch (response.getCode()) {
            switch (response.getCode()) {
                case ResponseCode.SUCCESS:
                    ...
                    // 有消息则写回Channel
                case ResponseCode.PULL_NOT_FOUND: // 没有消息
                    ...
                    // 挂起请求
                    this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
                    return null;
                ...
            }
        }
    }
}
/*
   CommitLog有新消息主动通知Consumer来拉取消息
 */
public class NotifyMessageArrivingListener implements MessageArrivingListener {
    @Override
    public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
                         long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
        // 通知挂起的客户端来拉取消息
        this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode,
                msgStoreTime, filterBitMap, properties);
        ...
    }
}

最后

目前为止通过源码跟踪我们可以发现,RocketMq的Push模式的实现和我们通常了解的实现上有一定的差异,它是由由Consumer主要来发起拉取请求去Broker拉取, 但是Rocketmq通过对拉取逻辑的一系列封装,以及采用长轮询机制让Consumer请求挂起避免短轮询无效请求,同时Broker在消息产生时也会及时通知挂起的Consumer来拉取消息,最终达到了Push的效果。

Tip:我本人在多年的开发经验中常用的MQ中间件如Kafka、RocketMQ都实战使用的,简单的使用示例可以参考我Git。

代码语言:javascript
复制
// Git代码
https://gitee.com/yeeevip/yeee-memo/tree/master/middle-ware/mq
https://github.com/yeeevip/yeee-memo/tree/master/middle-ware/mq

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • RocketMQ源码(二)消息消费的模式到底是Push还是Pull?
    • 1 Pull模式
      • 1.1 使用示例
      • 1.2 拉取源码分析
    • 2 Push模式
      • 2.1 使用示例
      • 2.2 推送源码分析
    • 最后
    相关产品与服务
    消息队列 TDMQ
    消息队列 TDMQ (Tencent Distributed Message Queue)是腾讯基于 Apache Pulsar 自研的一个云原生消息中间件系列,其中包含兼容Pulsar、RabbitMQ、RocketMQ 等协议的消息队列子产品,得益于其底层计算与存储分离的架构,TDMQ 具备良好的弹性伸缩以及故障恢复能力。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档