前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RocketMQ的负载均衡-重平衡

RocketMQ的负载均衡-重平衡

作者头像
路行的亚洲
发布2021-04-28 14:26:13
1.3K0
发布2021-04-28 14:26:13
举报
文章被收录于专栏:后端技术学习

在RocketMQ中要实现重平衡Rebalance,此时会ConsumerManager中会调用consumerIdsChangeListener的handle方法,来执行通知调用操作handle,改变、注册、不注册,而改变的时候,会通知消费者ids改变,此时的reqeustCode会放入:NOTIFY_CONSUMER_IDS_CHANGED

远程客户端处理器会根据case判断匹配到NOTIFY_CONSUMER_IDS_CHANGED,执行notifyConsumerIdsChanged方法,根据请求头的信息,会执行立即重平衡操作rebalanceImmediately。由于rebalanceImmediately实现了服务香肠,因此wakeUp的使用,会执行run方法,而在run方法的前面是waitForRunning,这个方法会等待线程运行,其是一个倒计时线程计数器同步并发,从而执行重平衡操作。而此时我们会看到两个方法:

代码语言:javascript
复制
DefaultMQPullConsumerImpl#doRebalance
DefaultMQPushConsumerImpl#doRebalance

那么两者有什么区别吗?

我们可以看到两者本质都是基于pull模式,虽然两者一个是拉模式,一个是推模式。可以看到DefaultMQPullConsumerImpl的doRebalance操作是一个空实现。

重平衡服务的启动每隔20s执行一次

代码语言:javascript
复制
//执行重平衡操作
public void doRebalance(final boolean isOrder) {
    Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
    if (subTable != null) {
        for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
            final String topic = entry.getKey();
            try {
                //进行重平衡操作 通过主题执行重平衡操作
                this.rebalanceByTopic(topic, isOrder);
            } catch (Throwable e) {
                if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    log.warn("rebalanceByTopic Exception", e);
                }
            }
        }
    }

    // 截断消息队列不是自己的队列
    this.truncateMessageQueueNotMyTopic();
}

可以看到执行重平衡会通过主题执行,同时还会截断不属于自己的队列。而在重平衡中更新处理队列表updateProcessQueueTableInRebalance,如果队列的主题与需要重平衡的主题一样,同时不包含,则设置丢弃,否者移除不必要的队列。否则 查看当前的处理队列是否拉取过期,如果true,则查看是否是消费激活,如果是,则直接跳过,否者不是,则设置丢弃,同时查看移除不必要的队列。同时还会执行一个重要的操作转发到拉取操作:

代码语言:javascript
复制
this.dispatchPullRequest(pullRequestList) //执行转发拉取请求操作

而push的拉取中是将其放入到pullRequestQueue拉取请求队列中:

代码语言:javascript
复制
//执行拉取消息
public void executePullRequestImmediately(final PullRequest pullRequest) {
    try {
        this.pullRequestQueue.put(pullRequest);
    } catch (InterruptedException e) {
        log.error("executePullRequestImmediately pullRequestQueue.put", e);
    }
}

此时可以看到:

代码语言:javascript
复制
PullRequest pullRequest = this.pullRequestQueue.take();

可以可到器会获取请求队列的拉取请求,同时请求到pullMessageProcessor,然后拿到消息。这个过程是主动的,而不是被动的。

代码语言:javascript
复制
public PullResult pullMessage(
    final String addr,
    final PullMessageRequestHeader requestHeader,
    final long timeoutMillis,
    final CommunicationMode communicationMode,
    final PullCallback pullCallback
) throws RemotingException, MQBrokerException, InterruptedException {
    //创建请求数据包
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);

    //交流的模式:oneway、异步、同步
    switch (communicationMode) {
        case ONEWAY:
            assert false;
            return null;
        case ASYNC:
            this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);
            return null;
        case SYNC:
            return this.pullMessageSync(addr, request, timeoutMillis);
        default:
            assert false;
            break;
    }

    return null;
}

可以看到拉取请求时,放入请求的code,从而进行拉取操作执行回调。

代码语言:javascript
复制
//处理拉取响应
PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);
assert pullResult != null;
//拉取回调 onSucess
pullCallback.onSuccess(pullResult);

而执行重平衡的操作的过程中使用了锁,而锁的操作是值得我们去学习的。使用锁的过程中,参考了AQS的方式,也即使用队列对队列进行存储,然后执行操作,而这个体现则是RebalanceLockManager的tryLockBatch和unlockBatch:

代码语言:javascript
复制
创建锁定mqs和未锁定mqs, 对消息队列进行遍历,首先判断是否锁定,如果需要锁定,则将其添加到锁定队列,否者放入不锁定队列,可以看到lockEntry中会放入上次更新的时间戳,同时放入锁定的mq到lockedMqs,这个lockMqs是一个HashSet的队列。
如果lockEntry中如果锁定clientId,则设置上次更新时间戳,同时将其添加到锁定队列,如果lockEntry如果过期,则设置客户端id,同时设置上次更新时间戳,添加锁定队列。

不锁定的过程是一个遍历移除队列的过程。

那在生产端是怎样实现高可用的呢?

在生产端会延迟上,也即latencyFaultTolerance机制,其实现的延迟时间是有13个等级的:

代码语言:javascript
复制
protected static final int[] DELAY_LEVEL = {
    1, 5, 10, 30, 1 * 60, 5 * 60, 10 * 60,
    30 * 60, 1 * 3600, 2 * 3600, 6 * 3600, 12 * 3600, 1 * 24 * 3600};
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-04-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 后端技术学习 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
批量计算
批量计算(BatchCompute,Batch)是为有大数据计算业务的企业、科研单位等提供高性价比且易用的计算服务。批量计算 Batch 可以根据用户提供的批处理规模,智能地管理作业和调动其所需的最佳资源。有了 Batch 的帮助,您可以将精力集中在如何分析和处理数据结果上。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档