前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【DUBBO】 负载均衡AbstractClusterInvoker权重随机权重轮询最少活跃数一致性Hash

【DUBBO】 负载均衡AbstractClusterInvoker权重随机权重轮询最少活跃数一致性Hash

作者头像
spilledyear
发布2018-12-27 15:14:18
5400
发布2018-12-27 15:14:18
举报
文章被收录于专栏:小白鼠小白鼠

在上一节的服务引用中已经知道,当消费者调用提供者的方法时,最终在代理类里面还是通过之前生成的Invoker调用提供者方法。那么dubbo中的Invoker有哪些?

  • AbstractClusterInvoker:正常情况下的非mock调用;
  • FailbackClusterInvoker:负责失败重试调用,继承自AbstractClusterInvoker,并没有自己实现invoke方法,所以只是调用父类的invoke方法;
  • MockClusterInvoker:mock调用;

这里只关注正常情况下的调用,即AbstractClusterInvoker方法。

AbstractClusterInvoker

代码语言:javascript
复制
@Override
public Result invoke(final Invocation invocation) throws RpcException {
    // 判断当前Invoker是否被销毁,如果销毁直接抛出异常
    checkWhetherDestroyed();

    // binding attachments into invocation.
    Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
    if (contextAttachments != null && contextAttachments.size() != 0) {
        ((RpcInvocation) invocation).addAttachments(contextAttachments);
    }

    // 查找所有的invoker,即有几个节点
    List<Invoker<T>> invokers = list(invocation);

    // 通过SPI加载负载均衡的扩展。如果invoke为空,则使用默认的RandomLoadBalance;如果invoke不为空,根据invocation和第一个invoker的url选择负载均衡器
    LoadBalance loadbalance = initLoadBalance(invokers, invocation);
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    return doInvoke(invocation, invokers, loadbalance);
}


/**
 *通过SPI加载负载均衡的扩展。如果invoke为空,则使用默认的RandomLoadBalance;如果invoke不为空,根据invocation和第一个invoker的url选择负载均衡器
 */
protected LoadBalance initLoadBalance(List<Invoker<T>> invokers, Invocation invocation) {
    if (CollectionUtils.isNotEmpty(invokers)) {
        return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                .getMethodParameter(RpcUtils.getMethodName(invocation), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
    } else {
        return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
    }
}

大致流程如下: 1、判断当前Invoker是否被销毁,如果销毁直接抛出异常; 2、查找所有的invoker,即有几个节点; 3、通过SPI加载负载均衡的扩展。如果invoke为空,则使用默认的RandomLoadBalance;如果invoke不为空,根据invocation和第一个invoker的url选择负载均衡器; 4、执行doInvoke方法;

调用关系如下: AbstractClusterInvoker.invoke => FailoverClusterInvoker.doInvoke => AbstractClusterInvoker.select => AbstractClusterInvoker.doSelect => AbstractClusterInvoker.reselect => LoadBalance.select

LoadBalance的select方法主要就是为了根据对应负载均衡算法返回一个invoker。dubbo中提供了4种负载均衡算法实现:RandomLoadBalanceRoundRobinLoadBalanceLeastActiveLoadBalanceConsistentHashLoadBalance

权重随机

RandomLoadBalance,如果所有invoker的权重都相同,则产生一个invokers.length范围内的随机数作为下标,然后按下标返回该invoker;如果不是所有的invoker权重都相同,则将所有的invoker权重值相加得到totalWeight,然后产生一个totalWeight范围内的随机数offset,之后再遍历所有的invoker,遍历的时候用offset减去该invoker的权重值得到新的offset,如果offset小于0,就返回该invoker。

代码语言:javascript
复制
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    // invoker个数
    int length = invokers.size();

    // 总的权重
    int totalWeight = 0;

    // 各个invoker的权重是否一致
    boolean sameWeight = true;

    for (int i = 0; i < length; i++) {
        int weight = getWeight(invokers.get(i), invocation);
        totalWeight += weight;
        if (sameWeight && i > 0
                && weight != getWeight(invokers.get(i - 1), invocation)) {
            sameWeight = false;
        }
    }

    // 如果不是所有的invoker的权重都一样并且至少有一个invoker的权重大于0
    if (totalWeight > 0 && !sameWeight) {
        // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
        // 产生totalWeight范围内的一个随机数
        int offset = ThreadLocalRandom.current().nextInt(totalWeight);

        // 依次用offset去减各个invoker的权重,知道offset小于0
        for (int i = 0; i < length; i++) {
            offset -= getWeight(invokers.get(i), invocation);
            if (offset < 0) {
                return invokers.get(i);
            }
        }
    }

    // 如果所有的invoker的权重一样或者totalWeight=0,则随机调用一个invoker,产生一个length范围内的随机数
    return invokers.get(ThreadLocalRandom.current().nextInt(length));
}
代码语言:javascript
复制
假如A B C D这4个invoker的权重值分别如下
A: 100
B: 200
C: 300
D: 400

所以,totalWeight等于1000,加入此时产生的随机数offset为500,(500 -100 -200 -300 =-100)  < 0,所以此时返回invoker C

权重轮询

RoundRobinLoadBalance,有一个变量currentSequence,专门用于记录该服务已经被调用的次数,每次调用+1。如果所有invoker的权重都相同假如,则用currentSequence对invokers的长度取模得到下标,然后返回该下标对应的invoker;如果不是所有的invoker权重都相同,用三个变量maxWeight、minWeight、weightSum分别记录:最大权重、最小权重、总的权重。用currentSequence对weightSum取模得到一个mod,以maxWeight作为外层循环限制,以invoker个数作为内层循环限制,每次循环的时候mod--,对应的invoke权重值--,如果mod等于0并且该invoke对应的权重值大于0,则返回该invoke。

代码语言:javascript
复制
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
    // length 代表提供者的个数 Number of invokers
    int length = invokers.size();

    // 最大权重
    int maxWeight = 0;

    // 最小权重
    int minWeight = Integer.MAX_VALUE;

    // Invoker:该Invoker对应的权重
    final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<>();

    // 权重总和
    int weightSum = 0;

    // 找出最大权重、最小权重、总的权重
    for (int i = 0; i < length; i++) {
        int weight = getWeight(invokers.get(i), invocation);
        maxWeight = Math.max(maxWeight, weight);
        minWeight = Math.min(minWeight, weight);
        if (weight > 0) {
            invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight));
            weightSum += weight;
        }
    }

    // sequence代表请求次数,即总共请求了多少次
    AtomicPositiveInteger sequence = sequences.get(key);
    if (sequence == null) {
        sequences.putIfAbsent(key, new AtomicPositiveInteger());
        sequence = sequences.get(key);
    }

    // 当前是第多少次调用
    int currentSequence = sequence.getAndIncrement();

    // 权重不一样,则用 当前调用次数对总的权重取模,然后轮询返回invoker。外层循环用maxWeight限制,因为当前调用次数不可能超过 maxWeight * length
    // 例如:A:100,B:200,C:300,D:400,当currentSequence=400时,这4个invoker对应的调用次数是一样的,都是100,这时候的权重比并不是 1:2:3:4
    // 当currentSequence=490的时候,ABCD对应的调用次数分别为100 130 130 130,只有当currentSequence=1000时,这时候的权重比才是 1:2:3:4
    if (maxWeight > 0 && minWeight < maxWeight) {
        int mod = currentSequence % weightSum;
        for (int i = 0; i < maxWeight; i++) {
            for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) {
                final Invoker<T> k = each.getKey();
                final IntegerWrapper v = each.getValue();
                if (mod == 0 && v.getValue() > 0) {
                    return k;
                }
                if (v.getValue() > 0) {
                    v.decrement();
                    mod--;
                }
            }
        }
    }

    // 如果所有权重一样,就按当前调用次数对invoker长度取模
    return invokers.get(currentSequence % length);
}
代码语言:javascript
复制
假如 A:100,B:200,C:300,D:400
当currentSequence=400时,这4个invoker对应的调用次数是一样的,都是100,这时候的权重比并不是 1:2:3:4;
当currentSequence=490的时候,ABCD对应的调用次数分别为100 130 130 130,这时候的权重比并不是 1:2:3:4;
只有当currentSequence是1000的整数倍时,这时候的权重比才是 1:2:3:4;

这个版本的实现有几个比较大的问题,一个是很不平滑的;第二个就是当权重设置的很大的时候,要执行大量的循环,很影响性能。不过这个问题在2.6.5版本中好像已经修复了RoundRobinLoadBalance

最少活跃数

LeastActiveLoadBalance,用一个变量leastActive记录最少活跃数,用一个数组int[] leastIndexs记录活跃数等于最少活跃数相同的invoker下标。每个invoker都有一个对应的活跃数active,每个invoker调用开始的时候active++,每个invoker调用结束的时候active--;如果活跃数等于最少活跃数的invoker只有一个,返回该invoker;如果有多个并且它们之间的权重值不全相同,则按权重随机算法选择一个invoker,否则从中随机选择一个

代码语言:javascript
复制
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    // invoker个数
    int length = invokers.size();

    // 所有invoker中最少的活跃数
    int leastActive = -1;

    // 最少的活跃数相同的invoker个数
    int leastCount = 0;

    // 具有相同最小活跃数的invoker下标
    int[] leastIndexs = new int[length]; // The index of invokers having the same least active value (leastActive)

    // 权重总和
    int totalWeight = 0;

    // 用于判断权重是否一样
    int firstWeight = 0; // Initial value, used for comparision

    // 是否所有invoker的权重相同
    boolean sameWeight = true;

    for (int i = 0; i < length; i++) {
        Invoker<T> invoker = invokers.get(i);
        int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // Active number
        int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // Weight

        // 如果是第一个invoke或者找到了更小活跃数的invoker,则重置,主要就是重置这些属性的值
        if (leastActive == -1 || active < leastActive) {
            leastActive = active; // Record the current least active value
            leastCount = 1; // Reset leastCount, count again based on current leastCount
            leastIndexs[0] = i;
            totalWeight = weight;
            firstWeight = weight;
            sameWeight = true;

            // 如果找到相同最少活跃数的invoker,则累加权重,并将invoker的下标放到leastIndexs中
        } else if (active == leastActive) {
            leastIndexs[leastCount++] = i; // Record index number of this invoker
            totalWeight += weight; // Add this invoker's weight to totalWeight.
            // If every invoker has the same weight?
            if (sameWeight && i > 0 && weight != firstWeight) {
                sameWeight = false;
            }
        }
    }

    // 如果只有一个最少活跃的invoker,直接返回
    if (leastCount == 1) {
        return invokers.get(leastIndexs[0]);
    }

    // 如果有多个invoker的最小活跃数相同,则按权重随机(和随机负载均衡很像)
    if (!sameWeight && totalWeight > 0) {
        int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
        // Return a invoker based on the random value.
        for (int i = 0; i < leastCount; i++) {
            int leastIndex = leastIndexs[i];
            offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
            if (offsetWeight <= 0)
                return invokers.get(leastIndex);
        }
    }

    // 如果所有的invoker的权重一样或者totalWeight=0,则随机调用一个invoker,产生一个length范围内的随机数
    return invokers.get(leastIndexs[ThreadLocalRandom.current().nextInt(leastCount)]);
}

一致性Hash

ConsistentHashLoadBalance,感觉单纯看代码还是有些复杂,主要是因为这里涉及到一个一致性Hash算法,因此有必要对一致性Hash简单了解。 1、每个方法对应一个hash选择器ConsistentHashSelector,缓存于treeMap; 2、每个invoker生成replicaNumber个hash值,即产生replicaNumber个虚拟节点。所有invoker的虚拟节点都缓存于virtualInvokers这个treeMap中; 3、调用方法的时候,根据请求参数生成16字节的md5值,然后再取前4个字节生成hash值; 4、根据上面生成的hash值,取virtualInvokers中获取大于等于该hash值的节点;如果不存在,则取第一个节点(形成环); 5、返回invoker;

代码语言:javascript
复制
public class ConsistentHashLoadBalance extends AbstractLoadBalance {
    public static final String NAME = "consistenthash";

    // 一个方法对应一个一致性hash选择器
    private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<>();

    @SuppressWarnings("unchecked")
    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        String methodName = RpcUtils.getMethodName(invocation);
        String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;
        // 基于invoker集合,根据对象内存地址来定义hash值。用于判断invokers是否发生变化(比如数量上的增减),ConsistentHashSelector保存了上一次调用生成的identityHashCode,如果发生变化,则重新生成ConsistentHashSelector
        int identityHashCode = System.identityHashCode(invokers);
        ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
        if (selector == null || selector.identityHashCode != identityHashCode) {
            selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, identityHashCode));
            selector = (ConsistentHashSelector<T>) selectors.get(key);
        }
        return selector.select(invocation);
    }

    private static final class ConsistentHashSelector<T> {

        // 虚拟节点 key是hash值,value是Invoker
        private final TreeMap<Long, Invoker<T>> virtualInvokers;

        // 副本数,默认160
        private final int replicaNumber;

        // 调用结点HashCode
        private final int identityHashCode;

        // 参数索引数组
        private final int[] argumentIndex;

        ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
            this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
            this.identityHashCode = identityHashCode;
            URL url = invokers.get(0).getUrl();
            this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);
            // 获取需要进行hash的参数数组索引,默认对第一个参数进行hash
            String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));
            argumentIndex = new int[index.length];
            for (int i = 0; i < index.length; i++) {
                argumentIndex[i] = Integer.parseInt(index[i]);
            }
            for (Invoker<T> invoker : invokers) {
                // 10.9.25.36:20880
                String address = invoker.getUrl().getAddress();
                for (int i = 0; i < replicaNumber / 4; i++) {
                    // 根据md5算法为每4个虚拟结点生成一个消息摘要,摘要长为16字节128位
                    byte[] digest = md5(address + i);
                    for (int h = 0; h < 4; h++) {
                        // 将digest分为4部分(根据h的值取,一次 0-3 4-7 8-11 12-15) 并生成4个32位数,存于long中,long的高32位都为0 并作为虚拟结点的key
                        long m = hash(digest, h);
                        virtualInvokers.put(m, invoker);
                    }
                }
            }
        }

        public Invoker<T> select(Invocation invocation) {
            // 根据调用参数来生成Key
            String key = toKey(invocation.getArguments());
            // 根据这个参数生成消息摘要,返回16个字节的byte数组
            byte[] digest = md5(key);
            // 取前四个字节计算hash值,调用hash(digest, 0),将消息摘要转换为hashCode,这里仅取0-31位来生成HashCode,调用sekectForKey方法选择结点。
            return selectForKey(hash(digest, 0));
        }

        private String toKey(Object[] args) {
            StringBuilder buf = new StringBuilder();
            for (int i : argumentIndex) {
                if (i >= 0 && i < args.length) {
                    buf.append(args[i]);
                }
            }
            return buf.toString();
        }

        private Invoker<T> selectForKey(long hash) {
            // ceilingEntry方法用来返回与该键至少大于或等于给定键对应的entry
            Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash);
            if (entry == null) {
                // 如果不存在,那么可能这个hash值比虚拟节点的最大值还大,那么取第一个,这样就形成了一个环
                entry = virtualInvokers.firstEntry();
            }
            return entry.getValue();
        }

        private long hash(byte[] digest, int number) {
            // 这里生成一个32位的正好数,若用int保存可能会产生负数,所以强转成long
            return (((long) (digest[3 + number * 4] & 0xFF) << 24)
                    | ((long) (digest[2 + number * 4] & 0xFF) << 16)
                    | ((long) (digest[1 + number * 4] & 0xFF) << 8)
                    | (digest[number * 4] & 0xFF))
                    & 0xFFFFFFFFL;
        }

        private byte[] md5(String value) {
            MessageDigest md5;
            try {
                md5 = MessageDigest.getInstance("MD5");
            } catch (NoSuchAlgorithmException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
            md5.reset();
            byte[] bytes;
            try {
                bytes = value.getBytes("UTF-8");
            } catch (UnsupportedEncodingException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
            md5.update(bytes);
            return md5.digest();
        }
    }
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018.12.20 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • AbstractClusterInvoker
  • 权重随机
  • 权重轮询
  • 最少活跃数
  • 一致性Hash
相关产品与服务
负载均衡
负载均衡(Cloud Load Balancer,CLB)提供安全快捷的流量分发服务,访问流量经由 CLB 可以自动分配到云中的多台后端服务器上,扩展系统的服务能力并消除单点故障。负载均衡支持亿级连接和千万级并发,可轻松应对大流量访问,满足业务需求。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档