接上篇 https://cloud.tencent.com/developer/article/1109577
加权轮询,我第一次没理解,个人觉得不好理解。于是先仿照源码抽象出逻辑模型,代码如下:
public static void main(String[] args) {
//存储调用的方法和总的调用次数的map
final ConcurrentMap<String, AtomicInteger> sequences = new ConcurrentHashMap<String, AtomicInteger>();
//统计invoker 被调用次数map
final ConcurrentMap<String, AtomicInteger> result = new ConcurrentHashMap<String, AtomicInteger>();
//模拟方法三个invoker 分别为 a,b,c
List<String> invokes=new ArrayList<String>(3);
invokes.add("a");
invokes.add("b");
invokes.add("c");
//存储invoker和权重的对应map
final LinkedHashMap<String, AtomicInteger> invokerToWeightMap = new LinkedHashMap<String,AtomicInteger>();
for(int i=0;i<21;i++){
//每次调用都把模拟的权重重新放入
invokerToWeightMap.put("a",new AtomicInteger(3));
invokerToWeightMap.put("b",new AtomicInteger(6));
invokerToWeightMap.put("c",new AtomicInteger(9));
select(invokes,invokerToWeightMap,sequences,result);
}
//打印调用结果统计
for(Map.Entry<String, AtomicInteger> r : result.entrySet()){
System.out.println(r.getKey()+"被调用:"+r.getValue()+"次");
}
}
private static void select(List<String> invokes,
LinkedHashMap<String, AtomicInteger> invokerToWeightMap,
ConcurrentMap<String, AtomicInteger> sequences,
ConcurrentMap<String, AtomicInteger> result){
//假设调用servcie.hello方法
AtomicInteger sequence = sequences.get("service.hello");
if (sequence == null) {
//默认调用次数为0
sequences.putIfAbsent("servcie.hello", new AtomicInteger(0));
sequence = sequences.get("servcie.hello");
}
//调用次数+1
int currentSequence = sequence.getAndIncrement();
System.out.print("currentSequence:" + currentSequence);
int maxWeight=9;//最大权重
int minWeight=3;//最小权重
int weightSum=18;//总权重
if (maxWeight > 0 && minWeight < maxWeight) { // 走权重不一样逻辑。
int mod = currentSequence % weightSum;
System.out.print(" mod:" + mod);
for (int i = 0; i < maxWeight; i++) {
for (Map.Entry<String, AtomicInteger> each : invokerToWeightMap.entrySet()) {
final String k = each.getKey();
final AtomicInteger v = each.getValue();
if (mod == 0 && v.intValue() > 0) {
System.out.println(" selected:"+k);
AtomicInteger count = result.get(k);
if (count == null) {
result.putIfAbsent(k, new AtomicInteger(1));
}else{
count.incrementAndGet();
}
return;
}
if (v.intValue() > 0) {
v.decrementAndGet();
mod--;
}
}
}
}
}
输出结果,分析:
currentSequence:0 mod:0 selected:a currentSequence:1 mod:1 selected:b currentSequence:2 mod:2 selected:c currentSequence:3 mod:3 selected:a currentSequence:4 mod:4 selected:b currentSequence:5 mod:5 selected:c currentSequence:6 mod:6 selected:a currentSequence:7 mod:7 selected:b currentSequence:8 mod:8 selected:c //前9次调用一直是简单轮询 currentSequence:9 mod:9 selected:b currentSequence:10 mod:10 selected:c
currentSequence:11 mod:11 selected:b currentSequence:12 mod:12 selected:c
currentSequence:13 mod:13 selected:b currentSequence:14 mod:14 selected:c //10到16次调用只在b c间轮询 currentSequence:15 mod:15 selected:c currentSequence:16 mod:16 selected:c currentSequence:17 mod:17 selected:c //17到19次调用只调c currentSequence:18 mod:0 selected:a currentSequence:19 mod:1 selected:b currentSequence:20 mod:2 selected:c //由于mod取值归零,20到21次新一轮的轮询 最后调用总结: a被调用:4次 b被调用:7次 c被调用:10次
体现出加权轮询,这就是duboo的加权轮询算法。
理解上面的代码,再看源代码,就容易理解很多。 源码如下:
public static final String NAME = "roundrobin";
private final ConcurrentMap<String, AtomicPositiveInteger> sequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
int length = invokers.size(); // 总个数
int maxWeight = 0; // 最大权重
int minWeight = Integer.MAX_VALUE; // 最小权重
final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>();
int weightSum = 0;
for (int i = 0; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation);
maxWeight = Math.max(maxWeight, weight); // 累计最大权重
minWeight = Math.min(minWeight, weight); // 累计最小权重
if (weight > 0) {
invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight));
weightSum += weight;
}
}
AtomicPositiveInteger sequence = sequences.get(key);
if (sequence == null) {
sequences.putIfAbsent(key, new AtomicPositiveInteger());
sequence = sequences.get(key);
}
//总的调用次数
int currentSequence = sequence.getAndIncrement();
if (maxWeight > 0 && minWeight < maxWeight) { // 权重不一样
//取模操作,保证mod值域在[0,weightSum)
int mod = currentSequence % weightSum;
for (int i = 0; i < maxWeight; i++) {//maxWeight选最大权重,保证加上子循环
// 即 maxWeight*(invoker个数)>mod 值。这个mod就可以减到0
for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) {
final Invoker<T> k = each.getKey();
final IntegerWrapper v = each.getValue();
//在一次选择过程后(mod--,权重--,mod==0结束),选下一个权重大于0的
if (mod == 0 && v.getValue() > 0) {
return k;
}
if (v.getValue() > 0) {
//基于mod,来说之前选过的,权重-1.mod--
v.decrement();
mod--;
}
}
}
}
// 权重一样 取模轮循 简单轮询
return invokers.get(currentSequence % length);
}
private static final class IntegerWrapper {
private int value;
public IntegerWrapper(int value) {
this.value = value;
}
public int getValue() {
return value;
}
public void setValue(int value) {
this.value = value;
}
public void decrement() {
this.value--;
}
}
我理解,这个算法的思想,就是让权重小的候选者,通过-1 总是很快较早的不参与某个轮次的选择。