前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊rocketmq的adjustThreadPoolNumsThreshold

聊聊rocketmq的adjustThreadPoolNumsThreshold

作者头像
code4it
发布2019-11-24 16:33:08
5150
发布2019-11-24 16:33:08
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下rocketmq的adjustThreadPoolNumsThreshold

DefaultMQPushConsumer

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java

代码语言:javascript
复制
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {

    private final InternalLogger log = ClientLogger.getLog();

    //......

    /**
     * Threshold for dynamic adjustment of the number of thread pool
     */
    private long adjustThreadPoolNumsThreshold = 100000;

    public long getAdjustThreadPoolNumsThreshold() {
        return adjustThreadPoolNumsThreshold;
    }

    public void setAdjustThreadPoolNumsThreshold(long adjustThreadPoolNumsThreshold) {
        this.adjustThreadPoolNumsThreshold = adjustThreadPoolNumsThreshold;
    }

    //......
}
  • DefaultMQPushConsumer定义了adjustThreadPoolNumsThreshold属性,默认为100000

DefaultMQPushConsumerImpl

ocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java

代码语言:javascript
复制
public class DefaultMQPushConsumerImpl implements MQConsumerInner {

//......

    public void adjustThreadPool() {
        long computeAccTotal = this.computeAccumulationTotal();
        long adjustThreadPoolNumsThreshold = this.defaultMQPushConsumer.getAdjustThreadPoolNumsThreshold();

        long incThreshold = (long) (adjustThreadPoolNumsThreshold * 1.0);

        long decThreshold = (long) (adjustThreadPoolNumsThreshold * 0.8);

        if (computeAccTotal >= incThreshold) {
            this.consumeMessageService.incCorePoolSize();
        }

        if (computeAccTotal < decThreshold) {
            this.consumeMessageService.decCorePoolSize();
        }
    }

    private long computeAccumulationTotal() {
        long msgAccTotal = 0;
        ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = this.rebalanceImpl.getProcessQueueTable();
        Iterator<Entry<MessageQueue, ProcessQueue>> it = processQueueTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<MessageQueue, ProcessQueue> next = it.next();
            ProcessQueue value = next.getValue();
            msgAccTotal += value.getMsgAccCnt();
        }

        return msgAccTotal;
    }

    //......
}
  • adjustThreadPool方法会计算computeAccTotal,然后使用adjustThreadPoolNumsThreshold * 1.0作为incThreshold,使用adjustThreadPoolNumsThreshold * 0.8作为decThreshold;对于computeAccTotal大于等于incThreshold的,执行consumeMessageService.incCorePoolSize();对于computeAccTotal小于decThreshold的执行consumeMessageService.decCorePoolSize()

MQClientInstance

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/factory/MQClientInstance.java

代码语言:javascript
复制
public class MQClientInstance {
//......

    public void start() throws MQClientException {

        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // If not specified,looking address from name server
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // Start request-response channel
                    this.mQClientAPIImpl.start();
                    // Start various schedule tasks
                    this.startScheduledTask();
                    // Start pull service
                    this.pullMessageService.start();
                    // Start rebalance service
                    this.rebalanceService.start();
                    // Start push service
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                    break;
                case SHUTDOWN_ALREADY:
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break;
            }
        }
    }

    private void startScheduledTask() {

        //......

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    MQClientInstance.this.adjustThreadPool();
                } catch (Exception e) {
                    log.error("ScheduledTask adjustThreadPool exception", e);
                }
            }
        }, 1, 1, TimeUnit.MINUTES);
    }

    public void adjustThreadPool() {
        Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, MQConsumerInner> entry = it.next();
            MQConsumerInner impl = entry.getValue();
            if (impl != null) {
                try {
                    if (impl instanceof DefaultMQPushConsumerImpl) {
                        DefaultMQPushConsumerImpl dmq = (DefaultMQPushConsumerImpl) impl;
                        dmq.adjustThreadPool();
                    }
                } catch (Exception e) {
                }
            }
        }
    }

//......
}
  • MQClientInstance的start方法对于CREATE_JUST状态会执行startScheduledTask()方法,后者会注册一个定时任务,每隔1分钟执行一次adjustThreadPool方法;adjustThreadPool方法则遍历consumerTable的MQConsumerInner,对于DefaultMQPushConsumerImpl类型的MQConsumerInner执行adjustThreadPool方法

小结

DefaultMQPushConsumer定义了adjustThreadPoolNumsThreshold属性,默认为100000;MQClientInstance的start方法对于CREATE_JUST状态会执行startScheduledTask()方法,后者会注册一个定时任务,每隔1分钟执行一次adjustThreadPool方法;adjustThreadPool方法则遍历consumerTable的MQConsumerInner,对于DefaultMQPushConsumerImpl类型的MQConsumerInner执行adjustThreadPool方法

doc

  • DefaultMQPushConsumer
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-11-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • DefaultMQPushConsumer
  • DefaultMQPushConsumerImpl
  • MQClientInstance
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档