Dubbo 的负载均衡策略:最小活跃调用策略

本文简单介绍 Dubbo 中的负载均衡策略:最小活跃调用策略。

1

最小活跃调用策略

最小活跃调用策略:指的是当请求调用来临,有多个实例提供服务的时候,选择其中被调用活跃次数最少的实例来提供服务。通俗一点讲就是,当前有 3 个实例在提供服务,A 当前被 2 个服务调用,B 当前被 3 个服务调用,C 当前被 1 个服务调用,一个新的调用请求过来,会选择调用到 C 实例。

最小活跃值怎么计算呢?

在下面 ActiveLimitFilter 代码中可以看到,是通过在调用方法前将值 + 1,调用方法完成后将值 -1。

Dubbo 中实现该策略的代码是:LeastActiveLoadBalance。它分为 3 种情况。

当只有一个最小活跃的实例时,则返回这个唯一的实例。这个就不用多说了。。。

当有多个最小活跃的实例且每个实例的权重相同时,则通过随机选择其中的一个实例

当有多个最小活跃的实例且权重不一时,则通过产生一个随机数,范围为 [0, totalWeight - 1],返回该随机数所在的实例。可以参考随机策略的权重不一情况

2

最小活跃调用策略的优缺点

优点:能动态的根据当前服务的调用情况,选择最小被调用的实例,调用越慢的机器,会接收到更少的请求。

缺点:好吧。。。说不出有什么大的缺点,有了解的同学帮忙补充

3

LeastActiveLoadBalance 源码

public classLeastActiveLoadBalanceextendsAbstractLoadBalance {

public static finalStringNAME="leastactive";

private finalRandomrandom=newRandom();

@Override

protected InvokerdoSelect(List> invokers,URL url,Invocation invocation) {

// 服务实例总数

intlength = invokers.size();// Number of invokers

// 最小活跃值

intleastActive = -1;// The least active value of all invokers

// 最小活跃值的实例数量

intleastCount =;// The number of invokers having the same least active value (leastActive)

// 存储所有最小活跃值的实例

int[] leastIndexs =new int[length];// The index of invokers having the same least active value (leastActive)

// 总权重值(只记录最小活跃值的实例的权重)

inttotalWeight =;// The sum of weights

// 记录第一个最小活跃值的实例的权重

intfirstWeight =;// Initial value, used for comparision

// 所有实例的权重是否相同

booleansameWeight =true;// Every invoker has the same weight value?

// 遍历所有实例

for(inti =;i

Invoker invoker = invokers.get(i);

// 获取该实例的活跃值

intactive = RpcStatus.getStatus(invoker.getUrl(),invocation.getMethodName()).getActive();// Active number

// 获取该实例的权重值

intweight = invoker.getUrl().getMethodParameter(invocation.getMethodName(),Constants.WEIGHT_KEY,Constants.DEFAULT_WEIGHT);// Weight

if(leastActive == -1|| active

// 记录最小的活跃值

leastActive = active;// Record the current least active value

// 重置记录

leastCount =1;// Reset leastCount, count again based on current leastCount

leastIndexs[] = i;// Reset

totalWeight = weight;// Reset

firstWeight = weight;// Record the weight the first invoker

sameWeight =true;// Reset, every invoker has the same weight value?

}else if(active == leastActive) {// If current invoker's active value equals with leaseActive, then accumulating.

// 增加多一个最小活跃值

// 记录该实例的下标

leastIndexs[leastCount++] = i;// Record index number of this invoker

// 统计总权重值

totalWeight += weight;// Add this invoker's weight to totalWeight.

// If every invoker has the same weight?

if(sameWeight && i >

&& weight != firstWeight) {

// 同是最小活跃值的实例中权重不一

sameWeight =false;

}

}

}

// 如果只有一个最小活跃值的实例,则返回该实例

// assert(leastCount > 0)

if(leastCount ==1) {

// If we got exactly one invoker having the least active value, return this invoker directly.

returninvokers.get(leastIndexs[]);

}

// 最小活跃值的实例中的权重不一

if(!sameWeight && totalWeight >) {

// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.

// 依据总权重值产生随机数,返回该随机数所在的实例

intoffsetWeight =random.nextInt(totalWeight);

// Return a invoker based on the random value.

// 返回随机的最小活跃实例

for(inti =;i

intleastIndex = leastIndexs[i];

offsetWeight -= getWeight(invokers.get(leastIndex),invocation);

if(offsetWeight

returninvokers.get(leastIndex);

}

}

// 所有最小活跃值的实例都是相同的权重,则随机选择一个实例

// If all invokers have the same weight value or totalWeight=0, return evenly.

returninvokers.get(leastIndexs[random.nextInt(leastCount)]);

}

}

启动最小活跃策略时需要额外配置 filter = "activelimit",如下所示

所以我们再看一下 ActiveLimitFilter 源码。

@Activate(group= Constants.CONSUMER,value= Constants.ACTIVES_KEY)

public classActiveLimitFilterimplementsFilter {

@Override

publicResultinvoke(Invoker invoker,Invocation invocation)throwsRpcException {

URL url = invoker.getUrl();

String methodName = invocation.getMethodName();

// 获取当前被调用服务配置的最大并发数

intmax = invoker.getUrl().getMethodParameter(methodName,Constants.ACTIVES_KEY,);

RpcStatus count = RpcStatus.getStatus(invoker.getUrl(),invocation.getMethodName());

if(max >) {

longtimeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(),Constants.TIMEOUT_KEY,);

longstart = System.currentTimeMillis();

longremain = timeout;

intactive = count.getActive();

if(active >= max) {

// 当前并发数达到配置的最大并发数,则等待

synchronized(count) {

while((active = count.getActive()) >= max) {

try{

count.wait(remain);

}catch(InterruptedException e) {

}

longelapsed = System.currentTimeMillis() - start;

remain = timeout - elapsed;

if(remain

throw newRpcException("Waiting concurrent invoke timeout in client-side for service: "

+ invoker.getInterface().getName() +", method: "

+ invocation.getMethodName() +", elapsed: "+ elapsed

+", timeout: "+ timeout +". concurrent invokes: "+ active

+". max concurrent invoke limit: "+ max);

}

}

}

}

}

try{

longbegin = System.currentTimeMillis();

// 每一次调用前活跃数 +1

RpcStatus.beginCount(url,methodName);

try{

Result result = invoker.invoke(invocation);

// 调用后活跃数 -1

RpcStatus.endCount(url,methodName,System.currentTimeMillis() - begin, true);

returnresult;

}catch(RuntimeException t) {

RpcStatus.endCount(url,methodName,System.currentTimeMillis() - begin, false);

throwt;

}

}finally{

if(max >) {

// 完成调用则通知其他阻塞着的线程

synchronized(count) {

count.notify();

}

}

}

}

}

做个有梦想的程序猿

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20180816G1TJKG00?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 yunjia_community@tencent.com 删除。

扫码关注云+社区

领取腾讯云代金券