首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Dubbo 的负载均衡策略:最小活跃调用策略

Dubbo 的负载均衡策略:最小活跃调用策略

作者头像
LieBrother
发布2019-03-28 18:05:07
1.4K0
发布2019-03-28 18:05:07
举报
文章被收录于专栏:LieBrotherLieBrother

本文简单介绍 Dubbo 中的负载均衡策略:最小活跃调用策略。

1

最小活跃调用策略

最小活跃调用策略:指的是当请求调用来临,有多个实例提供服务的时候,选择其中被调用活跃次数最少的实例来提供服务。通俗一点讲就是,当前有 3 个实例在提供服务,A 当前被 2 个服务调用,B 当前被 3 个服务调用,C 当前被 1 个服务调用,一个新的调用请求过来,会选择调用到 C 实例。

最小活跃值怎么计算呢?

在下面 ActiveLimitFilter 代码中可以看到,是通过在调用方法前将值 + 1,调用方法完成后将值 -1。

Dubbo 中实现该策略的代码是:LeastActiveLoadBalance。它分为 3 种情况。

  • 当只有一个最小活跃的实例时,则返回这个唯一的实例。这个就不用多说了。。。
  • 当有多个最小活跃的实例且每个实例的权重相同时,则通过随机选择其中的一个实例
  • 当有多个最小活跃的实例且权重不一时,则通过产生一个随机数,范围为 [0, totalWeight - 1],返回该随机数所在的实例。可以参考随机策略的权重不一情况

2

最小活跃调用策略的优缺点

  • 优点:能动态的根据当前服务的调用情况,选择最小被调用的实例,调用越慢的机器,会接收到更少的请求。
  • 缺点:好吧。。。说不出有什么大的缺点,有了解的同学帮忙补充

3

LeastActiveLoadBalance 源码

public class LeastActiveLoadBalance extends AbstractLoadBalance {

    public static final String NAME = "leastactive";

    private final Random random = new Random();

    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        // 服务实例总数
        int length = invokers.size(); // Number of invokers
        // 最小活跃值
        int leastActive = -1; // The least active value of all invokers
        // 最小活跃值的实例数量
        int leastCount = 0; // The number of invokers having the same least active value (leastActive)
        // 存储所有最小活跃值的实例
        int[] leastIndexs = new int[length]; // The index of invokers having the same least active value (leastActive)
        // 总权重值(只记录最小活跃值的实例的权重)
        int totalWeight = 0; // The sum of weights
        // 记录第一个最小活跃值的实例的权重
        int firstWeight = 0; // Initial value, used for comparision
        // 所有实例的权重是否相同
        boolean sameWeight = true; // Every invoker has the same weight value?
        // 遍历所有实例
        for (int i = 0; i < length; i++) {
            Invoker<T> invoker = invokers.get(i);
            // 获取该实例的活跃值
            int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // Active number
            // 获取该实例的权重值
            int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // Weight
            if (leastActive == -1 || active < leastActive) { // Restart, when find a invoker having smaller least active value.
                // 记录最小的活跃值
                leastActive = active; // Record the current least active value
                // 重置记录
                leastCount = 1; // Reset leastCount, count again based on current leastCount
                leastIndexs[0] = i; // Reset
                totalWeight = weight; // Reset
                firstWeight = weight; // Record the weight the first invoker
                sameWeight = true; // Reset, every invoker has the same weight value?
            } else if (active == leastActive) { // If current invoker's active value equals with leaseActive, then accumulating.
                // 增加多一个最小活跃值
                // 记录该实例的下标
                leastIndexs[leastCount++] = i; // Record index number of this invoker
                // 统计总权重值
                totalWeight += weight; // Add this invoker's weight to totalWeight.
                // If every invoker has the same weight?
                if (sameWeight && i > 0
                        && weight != firstWeight) {
                    // 同是最小活跃值的实例中权重不一
                    sameWeight = false;
                }
            }
        }

        // 如果只有一个最小活跃值的实例,则返回该实例
        // assert(leastCount > 0)
        if (leastCount == 1) {
            // If we got exactly one invoker having the least active value, return this invoker directly.
            return invokers.get(leastIndexs[0]);
        }

        // 最小活跃值的实例中的权重不一
        if (!sameWeight && totalWeight > 0) {
            // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
            // 依据总权重值产生随机数,返回该随机数所在的实例
            int offsetWeight = random.nextInt(totalWeight);
            // Return a invoker based on the random value.
            // 返回随机的最小活跃实例
            for (int i = 0; i < leastCount; i++) {
                int leastIndex = leastIndexs[i];
                offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
                if (offsetWeight <= 0)
                    return invokers.get(leastIndex);
            }
        }
        // 所有最小活跃值的实例都是相同的权重,则随机选择一个实例
        // If all invokers have the same weight value or totalWeight=0, return evenly.
        return invokers.get(leastIndexs[random.nextInt(leastCount)]);
    }
}

启动最小活跃策略时需要额外配置 filter = "activelimit",如下所示

<dubbo:service interface="service.AbcService" ref="abcService" loadbalance="leastactive" filter="activelimit"/>

所以我们再看一下 ActiveLimitFilter 源码。

@Activate(group = Constants.CONSUMER, value = Constants.ACTIVES_KEY)
public class ActiveLimitFilter implements Filter {

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        URL url = invoker.getUrl();
        String methodName = invocation.getMethodName();
        // 获取当前被调用服务配置的最大并发数
        int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);
        RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
        if (max > 0) {
            long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);
            long start = System.currentTimeMillis();
            long remain = timeout;
            int active = count.getActive();
            if (active >= max) {
                // 当前并发数达到配置的最大并发数,则等待
                synchronized (count) {
                    while ((active = count.getActive()) >= max) {
                        try {
                            count.wait(remain);
                        } catch (InterruptedException e) {
                        }
                        long elapsed = System.currentTimeMillis() - start;
                        remain = timeout - elapsed;
                        if (remain <= 0) {
                            throw new RpcException("Waiting concurrent invoke timeout in client-side for service:  "
                                    + invoker.getInterface().getName() + ", method: "
                                    + invocation.getMethodName() + ", elapsed: " + elapsed
                                    + ", timeout: " + timeout + ". concurrent invokes: " + active
                                    + ". max concurrent invoke limit: " + max);
                        }
                    }
                }
            }
        }
        try {
            long begin = System.currentTimeMillis();
            // 每一次调用前活跃数 +1
            RpcStatus.beginCount(url, methodName);
            try {
                Result result = invoker.invoke(invocation);
                // 调用后活跃数 -1
                RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true);
                return result;
            } catch (RuntimeException t) {
                RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false);
                throw t;
            }
        } finally {
            if (max > 0) {
                // 完成调用则通知其他阻塞着的线程
                synchronized (count) {
                    count.notify();
                }
            }
        }
    }
}

做个有梦想的程序猿

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-08-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 LieBrother 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
负载均衡
负载均衡(Cloud Load Balancer,CLB)提供安全快捷的流量分发服务,访问流量经由 CLB 可以自动分配到云中的多台后端服务器上,扩展系统的服务能力并消除单点故障。负载均衡支持亿级连接和千万级并发,可轻松应对大流量访问,满足业务需求。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档