前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊rocketmq的suspendCurrentQueueTimeMillis

聊聊rocketmq的suspendCurrentQueueTimeMillis

作者头像
code4it
发布2019-11-27 15:25:54
6310
发布2019-11-27 15:25:54
举报

本文主要研究一下rocketmq的suspendCurrentQueueTimeMillis

suspendCurrentQueueTimeMillis

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java

public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {

//......

    /**
     * Suspending pulling time for cases requiring slow pulling like flow-control scenario.
     */
    private long suspendCurrentQueueTimeMillis = 1000;

    public long getSuspendCurrentQueueTimeMillis() {
        return suspendCurrentQueueTimeMillis;
    }

    public void setSuspendCurrentQueueTimeMillis(final long suspendCurrentQueueTimeMillis) {
        this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
    }

    //......
}
  • DefaultMQPushConsumer定义了suspendCurrentQueueTimeMillis属性,默认值为1000

submitConsumeRequestLater

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java

public class ConsumeMessageOrderlyService implements ConsumeMessageService {

//......

    private void submitConsumeRequestLater(
        final ProcessQueue processQueue,
        final MessageQueue messageQueue,
        final long suspendTimeMillis
    ) {
        long timeMillis = suspendTimeMillis;
        if (timeMillis == -1) {
            timeMillis = this.defaultMQPushConsumer.getSuspendCurrentQueueTimeMillis();
        }

        if (timeMillis < 10) {
            timeMillis = 10;
        } else if (timeMillis > 30000) {
            timeMillis = 30000;
        }

        this.scheduledExecutorService.schedule(new Runnable() {

            @Override
            public void run() {
                ConsumeMessageOrderlyService.this.submitConsumeRequest(null, processQueue, messageQueue, true);
            }
        }, timeMillis, TimeUnit.MILLISECONDS);
    }

    //......
}
  • submitConsumeRequestLater方法在timeMillis为-1时会读取defaultMQPushConsumer.getSuspendCurrentQueueTimeMillis()的值,如果该值小于10则重置为10,如果该值大于30000则重置为30000;然后使用scheduledExecutorService延时timeMillis执行submitConsumeRequest方法

小结

DefaultMQPushConsumer定义了suspendCurrentQueueTimeMillis属性,默认值为1000;ConsumeMessageOrderlyService的submitConsumeRequestLater方法在timeMillis为-1时会读取defaultMQPushConsumer.getSuspendCurrentQueueTimeMillis()的值,如果该值小于10则重置为10,如果该值大于30000则重置为30000;然后使用scheduledExecutorService延时timeMillis执行submitConsumeRequest方法

doc

  • DefaultMQPushConsumer
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-11-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • suspendCurrentQueueTimeMillis
  • submitConsumeRequestLater
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档