前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊rocketmq的pullThresholdForTopic

聊聊rocketmq的pullThresholdForTopic

作者头像
code4it
发布2019-11-27 16:03:01
6320
发布2019-11-27 16:03:01
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下rocketmq的pullThresholdForTopic

pullThresholdForTopic

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java

代码语言:javascript
复制
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {

//......

    /**
     * Flow control threshold on topic level, default value is -1(Unlimited)
     * <p>
     * The value of {@code pullThresholdForQueue} will be overwrote and calculated based on
     * {@code pullThresholdForTopic} if it is't unlimited
     * <p>
     * For example, if the value of pullThresholdForTopic is 1000 and 10 message queues are assigned to this consumer,
     * then pullThresholdForQueue will be set to 100
     */
    private int pullThresholdForTopic = -1;

    /**
     * Limit the cached message size on topic level, default value is -1 MiB(Unlimited)
     * <p>
     * The value of {@code pullThresholdSizeForQueue} will be overwrote and calculated based on
     * {@code pullThresholdSizeForTopic} if it is't unlimited
     * <p>
     * For example, if the value of pullThresholdSizeForTopic is 1000 MiB and 10 message queues are
     * assigned to this consumer, then pullThresholdSizeForQueue will be set to 100 MiB
     */
    private int pullThresholdSizeForTopic = -1;

    //......

    public int getPullThresholdForTopic() {
        return pullThresholdForTopic;
    }

    public void setPullThresholdForTopic(final int pullThresholdForTopic) {
        this.pullThresholdForTopic = pullThresholdForTopic;
    }

    public int getPullThresholdSizeForTopic() {
        return pullThresholdSizeForTopic;
    }

    public void setPullThresholdSizeForTopic(final int pullThresholdSizeForTopic) {
        this.pullThresholdSizeForTopic = pullThresholdSizeForTopic;
    }

    //......
}
  • DefaultMQPushConsumer定义了pullThresholdForTopic(默认值-1)、pullThresholdSizeForTopic(默认值-1)属性

checkConfig

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java

代码语言:javascript
复制
public class DefaultMQPushConsumerImpl implements MQConsumerInner {

//......

    private void checkConfig() throws MQClientException {
        Validators.checkGroup(this.defaultMQPushConsumer.getConsumerGroup());

        //......

        // pullThresholdForTopic
        if (this.defaultMQPushConsumer.getPullThresholdForTopic() != -1) {
            if (this.defaultMQPushConsumer.getPullThresholdForTopic() < 1 || this.defaultMQPushConsumer.getPullThresholdForTopic() > 6553500) {
                throw new MQClientException(
                    "pullThresholdForTopic Out of range [1, 6553500]"
                        + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
                    null);
            }
        }

        if (this.defaultMQPushConsumer.getPullThresholdSizeForTopic() != -1) {
            // pullThresholdSizeForTopic
            if (this.defaultMQPushConsumer.getPullThresholdSizeForTopic() < 1 || this.defaultMQPushConsumer.getPullThresholdSizeForTopic() > 102400) {
                throw new MQClientException(
                    "pullThresholdSizeForTopic Out of range [1, 102400]"
                        + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
                    null);
            }
        }

        //......    
    }
    
    //......  
}    
  • checkConfig方法要求defaultMQPushConsumer.getPullThresholdForTopic()在值非-1时必须大于等于1且小于等于6553500;defaultMQPushConsumer.getPullThresholdSizeForTopic()在值非-1时必须大于等于1且小于等于102400

messageQueueChanged

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java

代码语言:javascript
复制
public class RebalancePushImpl extends RebalanceImpl {
    private final static long UNLOCK_DELAY_TIME_MILLS = Long.parseLong(System.getProperty("rocketmq.client.unlockDelayTimeMills", "20000"));
    private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;

    public RebalancePushImpl(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {
        this(null, null, null, null, defaultMQPushConsumerImpl);
    }

    public RebalancePushImpl(String consumerGroup, MessageModel messageModel,
        AllocateMessageQueueStrategy allocateMessageQueueStrategy,
        MQClientInstance mQClientFactory, DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {
        super(consumerGroup, messageModel, allocateMessageQueueStrategy, mQClientFactory);
        this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
    }

    @Override
    public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
        /**
         * When rebalance result changed, should update subscription's version to notify broker.
         * Fix: inconsistency subscription may lead to consumer miss messages.
         */
        SubscriptionData subscriptionData = this.subscriptionInner.get(topic);
        long newVersion = System.currentTimeMillis();
        log.info("{} Rebalance changed, also update version: {}, {}", topic, subscriptionData.getSubVersion(), newVersion);
        subscriptionData.setSubVersion(newVersion);

        int currentQueueCount = this.processQueueTable.size();
        if (currentQueueCount != 0) {
            int pullThresholdForTopic = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForTopic();
            if (pullThresholdForTopic != -1) {
                int newVal = Math.max(1, pullThresholdForTopic / currentQueueCount);
                log.info("The pullThresholdForQueue is changed from {} to {}",
                    this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForQueue(), newVal);
                this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().setPullThresholdForQueue(newVal);
            }

            int pullThresholdSizeForTopic = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdSizeForTopic();
            if (pullThresholdSizeForTopic != -1) {
                int newVal = Math.max(1, pullThresholdSizeForTopic / currentQueueCount);
                log.info("The pullThresholdSizeForQueue is changed from {} to {}",
                    this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdSizeForQueue(), newVal);
                this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().setPullThresholdSizeForQueue(newVal);
            }
        }

        // notify broker
        this.getmQClientFactory().sendHeartbeatToAllBrokerWithLock();
    }

    //......
}
  • messageQueueChanged方法在currentQueueCount不为0的时候,且pullThresholdForTopic、pullThresholdSizeForTopic不为-1的时候会重新设置pullThresholdForQueue、pullThresholdSizeForQueue;具体的规则是pullThresholdForQueue取Math.max(1, pullThresholdForTopic / currentQueueCount),pullThresholdSizeForQueue取Math.max(1, pullThresholdSizeForTopic / currentQueueCount)

小结

  • DefaultMQPushConsumer定义了pullThresholdForTopic(默认值-1)、pullThresholdSizeForTopic(默认值-1)属性
  • checkConfig方法要求defaultMQPushConsumer.getPullThresholdForTopic()在值非-1时必须大于等于1且小于等于6553500;defaultMQPushConsumer.getPullThresholdSizeForTopic()在值非-1时必须大于等于1且小于等于102400
  • messageQueueChanged方法在currentQueueCount不为0的时候,且pullThresholdForTopic、pullThresholdSizeForTopic不为-1的时候会重新设置pullThresholdForQueue、pullThresholdSizeForQueue;具体的规则是pullThresholdForQueue取Math.max(1, pullThresholdForTopic / currentQueueCount),pullThresholdSizeForQueue取Math.max(1, pullThresholdSizeForTopic / currentQueueCount)

doc

  • DefaultMQPushConsumer
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-11-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • pullThresholdForTopic
  • checkConfig
  • messageQueueChanged
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档