前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊rocketmq的consumeConcurrentlyMaxSpan

聊聊rocketmq的consumeConcurrentlyMaxSpan

作者头像
code4it
发布2019-11-23 16:19:07
7580
发布2019-11-23 16:19:07
举报

本文主要研究一下rocketmq的consumeConcurrentlyMaxSpan

consumeConcurrentlyMaxSpan

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java

public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {

    private final InternalLogger log = ClientLogger.getLog();

    //......

    /**
     * Concurrently max span offset.it has no effect on sequential consumption
     */
    private int consumeConcurrentlyMaxSpan = 2000;

    public int getConsumeConcurrentlyMaxSpan() {
        return consumeConcurrentlyMaxSpan;
    }

    public void setConsumeConcurrentlyMaxSpan(int consumeConcurrentlyMaxSpan) {
        this.consumeConcurrentlyMaxSpan = consumeConcurrentlyMaxSpan;
    }

    //......
}
  • DefaultMQPushConsumer定义了consumeConcurrentlyMaxSpan属性,默认值为2000

checkConfig

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java

public class DefaultMQPushConsumerImpl implements MQConsumerInner {
//......

    private void checkConfig() throws MQClientException {
        Validators.checkGroup(this.defaultMQPushConsumer.getConsumerGroup());

        //......

        // consumeConcurrentlyMaxSpan
        if (this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() < 1
            || this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() > 65535) {
            throw new MQClientException(
                "consumeConcurrentlyMaxSpan Out of range [1, 65535]"
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
                null);
        }

        //......
    }

//......
}
  • checkConfig方法要求defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()大于等于1且小于等于65535

pullMessage

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java

public class DefaultMQPushConsumerImpl implements MQConsumerInner {

    /**
     * Delay some time when exception occur
     */
    private static final long PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION = 3000;
    /**
     * Flow control interval
     */
    private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50;

//......

    public void pullMessage(final PullRequest pullRequest) {
        final ProcessQueue processQueue = pullRequest.getProcessQueue();
        if (processQueue.isDropped()) {
            log.info("the pull request[{}] is dropped.", pullRequest.toString());
            return;
        }

        pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());

        //......

        if (!this.consumeOrderly) {
            if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
                if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
                    log.warn(
                        "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
                        processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
                        pullRequest, queueMaxSpanFlowControlTimes);
                }
                return;
            }
        } else {
            if (processQueue.isLocked()) {
                if (!pullRequest.isLockedFirst()) {
                    final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
                    boolean brokerBusy = offset < pullRequest.getNextOffset();
                    log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
                        pullRequest, offset, brokerBusy);
                    if (brokerBusy) {
                        log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
                            pullRequest, offset);
                    }

                    pullRequest.setLockedFirst(true);
                    pullRequest.setNextOffset(offset);
                }
            } else {
                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
                log.info("pull message later because not locked in broker, {}", pullRequest);
                return;
            }
        }

        //......        

    }

    private void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
        this.mQClientFactory.getPullMessageService().executePullRequestLater(pullRequest, timeDelay);
    }

//......
}
  • pullMessage方法在不是consumeOrderly的时候,会判断processQueue.getMaxSpan()是否大于this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan(),大于则执行executePullRequestLater方法进行流控,PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL默认值为50

ProcessQueue

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java

public class ProcessQueue {
    public final static long REBALANCE_LOCK_MAX_LIVE_TIME =
        Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockMaxLiveTime", "30000"));
    public final static long REBALANCE_LOCK_INTERVAL = Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockInterval", "20000"));
    private final static long PULL_MAX_IDLE_TIME = Long.parseLong(System.getProperty("rocketmq.client.pull.pullMaxIdleTime", "120000"));
    private final InternalLogger log = ClientLogger.getLog();
    private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
    private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();

//......

    public long getMaxSpan() {
        try {
            this.lockTreeMap.readLock().lockInterruptibly();
            try {
                if (!this.msgTreeMap.isEmpty()) {
                    return this.msgTreeMap.lastKey() - this.msgTreeMap.firstKey();
                }
            } finally {
                this.lockTreeMap.readLock().unlock();
            }
        } catch (InterruptedException e) {
            log.error("getMaxSpan exception", e);
        }

        return 0;
    }

    //......
}
  • ProcessQueue的getMaxSpan取的是msgTreeMap.lastKey() - this.msgTreeMap.firstKey()

PullMessageService

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/PullMessageService.java

public class PullMessageService extends ServiceThread {
    private final InternalLogger log = ClientLogger.getLog();
    private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();
    private final MQClientInstance mQClientFactory;
    private final ScheduledExecutorService scheduledExecutorService = Executors
        .newSingleThreadScheduledExecutor(new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "PullMessageServiceScheduledThread");
            }
        });

    public PullMessageService(MQClientInstance mQClientFactory) {
        this.mQClientFactory = mQClientFactory;
    }

    public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
        if (!isStopped()) {
            this.scheduledExecutorService.schedule(new Runnable() {
                @Override
                public void run() {
                    PullMessageService.this.executePullRequestImmediately(pullRequest);
                }
            }, timeDelay, TimeUnit.MILLISECONDS);
        } else {
            log.warn("PullMessageServiceScheduledThread has shutdown");
        }
    }

    public void executePullRequestImmediately(final PullRequest pullRequest) {
        try {
            this.pullRequestQueue.put(pullRequest);
        } catch (InterruptedException e) {
            log.error("executePullRequestImmediately pullRequestQueue.put", e);
        }
    }

    @Override
    public void run() {
        log.info(this.getServiceName() + " service started");

        while (!this.isStopped()) {
            try {
                PullRequest pullRequest = this.pullRequestQueue.take();
                this.pullMessage(pullRequest);
            } catch (InterruptedException ignored) {
            } catch (Exception e) {
                log.error("Pull Message Service Run Method exception", e);
            }
        }

        log.info(this.getServiceName() + " service end");
    }

    private void pullMessage(final PullRequest pullRequest) {
        final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
        if (consumer != null) {
            DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
            impl.pullMessage(pullRequest);
        } else {
            log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
        }
    }

    //......
}
  • executePullRequestLater方法往scheduledExecutorService调度一个延时任务,该任务执行的是executePullRequestImmediately(pullRequest)方法,该方法往pullRequestQueue队列放入pullRequest;run方法会从pullRequestQueue取pullRequest,然后执行pullMessage方法;pullMessage方法首先通过mQClientFactory.selectConsumer取出consumer,然后执行该consumer的pullMessage方法

小结

DefaultMQPushConsumer定义了consumeConcurrentlyMaxSpan属性,默认值为2000;DefaultMQPushConsumerImpl的pullMessage方法在不是consumeOrderly的时候,会判断processQueue.getMaxSpan()是否大于this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan(),大于则执行executePullRequestLater方法进行流控,PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL默认值为50

doc

  • DefaultMQPushConsumer
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-11-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • consumeConcurrentlyMaxSpan
  • checkConfig
  • pullMessage
  • ProcessQueue
  • PullMessageService
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档