本文简单介绍 Dubbo 中的负载均衡策略:最小活跃调用策略。
1
最小活跃调用策略
最小活跃调用策略:指的是当请求调用来临,有多个实例提供服务的时候,选择其中被调用活跃次数最少的实例来提供服务。通俗一点讲就是,当前有 3 个实例在提供服务,A 当前被 2 个服务调用,B 当前被 3 个服务调用,C 当前被 1 个服务调用,一个新的调用请求过来,会选择调用到 C 实例。
最小活跃值怎么计算呢?
在下面 ActiveLimitFilter 代码中可以看到,是通过在调用方法前将值 + 1,调用方法完成后将值 -1。
Dubbo 中实现该策略的代码是:LeastActiveLoadBalance。它分为 3 种情况。
当只有一个最小活跃的实例时,则返回这个唯一的实例。这个就不用多说了。。。
当有多个最小活跃的实例且每个实例的权重相同时,则通过随机选择其中的一个实例
当有多个最小活跃的实例且权重不一时,则通过产生一个随机数,范围为 [0, totalWeight - 1],返回该随机数所在的实例。可以参考随机策略的权重不一情况
2
最小活跃调用策略的优缺点
优点:能动态的根据当前服务的调用情况,选择最小被调用的实例,调用越慢的机器,会接收到更少的请求。
缺点:好吧。。。说不出有什么大的缺点,有了解的同学帮忙补充
3
LeastActiveLoadBalance 源码
public classLeastActiveLoadBalanceextendsAbstractLoadBalance {
public static finalStringNAME="leastactive";
private finalRandomrandom=newRandom();
@Override
protected InvokerdoSelect(List> invokers,URL url,Invocation invocation) {
// 服务实例总数
intlength = invokers.size();// Number of invokers
// 最小活跃值
intleastActive = -1;// The least active value of all invokers
// 最小活跃值的实例数量
intleastCount =;// The number of invokers having the same least active value (leastActive)
// 存储所有最小活跃值的实例
int[] leastIndexs =new int[length];// The index of invokers having the same least active value (leastActive)
// 总权重值(只记录最小活跃值的实例的权重)
inttotalWeight =;// The sum of weights
// 记录第一个最小活跃值的实例的权重
intfirstWeight =;// Initial value, used for comparision
// 所有实例的权重是否相同
booleansameWeight =true;// Every invoker has the same weight value?
// 遍历所有实例
for(inti =;i
Invoker invoker = invokers.get(i);
// 获取该实例的活跃值
intactive = RpcStatus.getStatus(invoker.getUrl(),invocation.getMethodName()).getActive();// Active number
// 获取该实例的权重值
intweight = invoker.getUrl().getMethodParameter(invocation.getMethodName(),Constants.WEIGHT_KEY,Constants.DEFAULT_WEIGHT);// Weight
if(leastActive == -1|| active
// 记录最小的活跃值
leastActive = active;// Record the current least active value
// 重置记录
leastCount =1;// Reset leastCount, count again based on current leastCount
leastIndexs[] = i;// Reset
totalWeight = weight;// Reset
firstWeight = weight;// Record the weight the first invoker
sameWeight =true;// Reset, every invoker has the same weight value?
}else if(active == leastActive) {// If current invoker's active value equals with leaseActive, then accumulating.
// 增加多一个最小活跃值
// 记录该实例的下标
leastIndexs[leastCount++] = i;// Record index number of this invoker
// 统计总权重值
totalWeight += weight;// Add this invoker's weight to totalWeight.
// If every invoker has the same weight?
if(sameWeight && i >
&& weight != firstWeight) {
// 同是最小活跃值的实例中权重不一
sameWeight =false;
}
}
}
// 如果只有一个最小活跃值的实例,则返回该实例
// assert(leastCount > 0)
if(leastCount ==1) {
// If we got exactly one invoker having the least active value, return this invoker directly.
returninvokers.get(leastIndexs[]);
}
// 最小活跃值的实例中的权重不一
if(!sameWeight && totalWeight >) {
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
// 依据总权重值产生随机数,返回该随机数所在的实例
intoffsetWeight =random.nextInt(totalWeight);
// Return a invoker based on the random value.
// 返回随机的最小活跃实例
for(inti =;i
intleastIndex = leastIndexs[i];
offsetWeight -= getWeight(invokers.get(leastIndex),invocation);
if(offsetWeight
returninvokers.get(leastIndex);
}
}
// 所有最小活跃值的实例都是相同的权重,则随机选择一个实例
// If all invokers have the same weight value or totalWeight=0, return evenly.
returninvokers.get(leastIndexs[random.nextInt(leastCount)]);
}
}
启动最小活跃策略时需要额外配置 filter = "activelimit",如下所示
所以我们再看一下 ActiveLimitFilter 源码。
@Activate(group= Constants.CONSUMER,value= Constants.ACTIVES_KEY)
public classActiveLimitFilterimplementsFilter {
@Override
publicResultinvoke(Invoker invoker,Invocation invocation)throwsRpcException {
URL url = invoker.getUrl();
String methodName = invocation.getMethodName();
// 获取当前被调用服务配置的最大并发数
intmax = invoker.getUrl().getMethodParameter(methodName,Constants.ACTIVES_KEY,);
RpcStatus count = RpcStatus.getStatus(invoker.getUrl(),invocation.getMethodName());
if(max >) {
longtimeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(),Constants.TIMEOUT_KEY,);
longstart = System.currentTimeMillis();
longremain = timeout;
intactive = count.getActive();
if(active >= max) {
// 当前并发数达到配置的最大并发数,则等待
synchronized(count) {
while((active = count.getActive()) >= max) {
try{
count.wait(remain);
}catch(InterruptedException e) {
}
longelapsed = System.currentTimeMillis() - start;
remain = timeout - elapsed;
if(remain
throw newRpcException("Waiting concurrent invoke timeout in client-side for service: "
+ invoker.getInterface().getName() +", method: "
+ invocation.getMethodName() +", elapsed: "+ elapsed
+", timeout: "+ timeout +". concurrent invokes: "+ active
+". max concurrent invoke limit: "+ max);
}
}
}
}
}
try{
longbegin = System.currentTimeMillis();
// 每一次调用前活跃数 +1
RpcStatus.beginCount(url,methodName);
try{
Result result = invoker.invoke(invocation);
// 调用后活跃数 -1
RpcStatus.endCount(url,methodName,System.currentTimeMillis() - begin, true);
returnresult;
}catch(RuntimeException t) {
RpcStatus.endCount(url,methodName,System.currentTimeMillis() - begin, false);
throwt;
}
}finally{
if(max >) {
// 完成调用则通知其他阻塞着的线程
synchronized(count) {
count.notify();
}
}
}
}
}
做个有梦想的程序猿
领取专属 10元无门槛券
私享最新 技术干货