聊聊RibbonLoadBalancerClient的choose方法

本文主要研究一下RibbonLoadBalancerClient的choose方法

RibbonLoadBalancerClient.choose

spring-cloud-netflix-ribbon-2.0.0.RC1-sources.jar!/org/springframework/cloud/netflix/ribbon/RibbonLoadBalancerClient.java

    public ServiceInstance choose(String serviceId) {
        Server server = getServer(serviceId);
        if (server == null) {
            return null;
        }
        return new RibbonServer(serviceId, server, isSecure(server, serviceId),
                serverIntrospector(serviceId).getMetadata(server));
    }

    protected ILoadBalancer getLoadBalancer(String serviceId) {
        return this.clientFactory.getLoadBalancer(serviceId);
    }

    protected Server getServer(String serviceId) {
        return getServer(getLoadBalancer(serviceId));
    }

    protected Server getServer(ILoadBalancer loadBalancer) {
        if (loadBalancer == null) {
            return null;
        }
        return loadBalancer.chooseServer("default"); // TODO: better handling of key
    }

可以看到,choose方法传入serviceId,然后通过SpringClientFactory获取ILoadBalancer,最后调用ILoadBalancer.chooseServer方法选取Server 这里的key,2.0.0.RC1版本直接写死default了。

ILoadBalancer.chooseServer

ribbon-loadbalancer-2.2.5-sources.jar!/com/netflix/loadbalancer/ILoadBalancer.java

    /**
     * Choose a server from load balancer.
     * 
     * @param key An object that the load balancer may use to determine which server to return. null if 
     *         the load balancer does not use this parameter.
     * @return server chosen
     */
    public Server chooseServer(Object key);

ZoneAwareLoadBalancer

ribbon-loadbalancer-2.2.5-sources.jar!/com/netflix/loadbalancer/ZoneAwareLoadBalancer.java

/**
 * Load balancer that can avoid a zone as a whole when choosing server. 
 *<p>
 * The key metric used to measure the zone condition is Average Active Requests,
which is aggregated per rest client per zone. It is the
total outstanding requests in a zone divided by number of available targeted instances (excluding circuit breaker tripped instances).
This metric is very effective when timeout occurs slowly on a bad zone.
<p>
The  LoadBalancer will calculate and examine zone stats of all available zones. If the Average Active Requests for any zone has reached a configured threshold, this zone will be dropped from the active server list. In case more than one zone has reached the threshold, the zone with the most active requests per server will be dropped.
Once the the worst zone is dropped, a zone will be chosen among the rest with the probability proportional to its number of instances.
A server will be returned from the chosen zone with a given Rule (A Rule is a load balancing strategy, for example {@link AvailabilityFilteringRule})
For each request, the steps above will be repeated. That is to say, each zone related load balancing decisions are made at real time with the up-to-date statistics aiding the choice.

 * @author awang
 *
 * @param <T>
 */
public class ZoneAwareLoadBalancer<T extends Server> extends DynamicServerListLoadBalancer<T> {

    private static final DynamicBooleanProperty ENABLED = DynamicPropertyFactory.getInstance().getBooleanProperty("ZoneAwareNIWSDiscoveryLoadBalancer.enabled", true);

    //......
    public Server chooseServer(Object key) {
        if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
            logger.debug("Zone aware logic disabled or there is only one zone");
            return super.chooseServer(key);
        }
        Server server = null;
        try {
            LoadBalancerStats lbStats = getLoadBalancerStats();
            Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
            logger.debug("Zone snapshots: {}", zoneSnapshot);
            if (triggeringLoad == null) {
                triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
                        "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d);
            }

            if (triggeringBlackoutPercentage == null) {
                triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
                        "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
            }
            Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
            logger.debug("Available zones: {}", availableZones);
            if (availableZones != null &&  availableZones.size() < zoneSnapshot.keySet().size()) {
                String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
                logger.debug("Zone chosen: {}", zone);
                if (zone != null) {
                    BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
                    server = zoneLoadBalancer.chooseServer(key);
                }
            }
        } catch (Exception e) {
            logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);
        }
        if (server != null) {
            return server;
        } else {
            logger.debug("Zone avoidance logic is not invoked.");
            return super.chooseServer(key);
        }
    }
}

这里ZoneAwareNIWSDiscoveryLoadBalancer.enabled默认为true 如果关闭这里ZoneAwareNIWSDiscoveryLoadBalancer,或者只有单个zone的话,则走super.chooseServer(key) ZoneAwareLoadBalancer继承了DynamicServerListLoadBalancer,而DynamicServerListLoadBalancer继承了BaseLoadBalancer,而BaseLoadBalancer又继承了AbstractLoadBalancer。chooseServer方法在BaseLoadBalancer中有实现。

BaseLoadBalancer.chooseServer

ribbon-loadbalancer-2.2.5-sources.jar!/com/netflix/loadbalancer/BaseLoadBalancer.java

    public Server chooseServer(Object key) {
        if (counter == null) {
            counter = createCounter();
        }
        counter.increment();
        if (rule == null) {
            return null;
        } else {
            try {
                return rule.choose(key);
            } catch (Exception e) {
                logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
                return null;
            }
        }
    }

这里可以看到最后是调用rule.choose(key),这个rule默认是ZoneAvoidanceRule

RibbonClientConfiguration

spring-cloud-netflix-ribbon-2.0.0.RC1-sources.jar!/org/springframework/cloud/netflix/ribbon/RibbonClientConfiguration.java

@SuppressWarnings("deprecation")
@Configuration
@EnableConfigurationProperties
//Order is important here, last should be the default, first should be optional
// see https://github.com/spring-cloud/spring-cloud-netflix/issues/2086#issuecomment-316281653
@Import({HttpClientConfiguration.class, OkHttpRibbonConfiguration.class, RestClientRibbonConfiguration.class, HttpClientRibbonConfiguration.class})
public class RibbonClientConfiguration {
    //......
    @Bean
    @ConditionalOnMissingBean
    public IRule ribbonRule(IClientConfig config) {
        if (this.propertiesFactory.isSet(IRule.class, name)) {
            return this.propertiesFactory.get(IRule.class, config, name);
        }
        ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
        rule.initWithNiwsConfig(config);
        return rule;
    }
}

可以看到这里指定了ZoneAvoidanceRule

PredicateBasedRule.choose

ribbon-loadbalancer-2.2.5-sources.jar!/com/netflix/loadbalancer/PredicateBasedRule.java

/**
 * A rule which delegates the server filtering logic to an instance of {@link AbstractServerPredicate}.
 * After filtering, a server is returned from filtered list in a round robin fashion.
 * 
 * 
 * @author awang
 *
 */
public abstract class PredicateBasedRule extends ClientConfigEnabledRoundRobinRule {

    /**
     * Method that provides an instance of {@link AbstractServerPredicate} to be used by this class.
     * 
     */
    public abstract AbstractServerPredicate getPredicate();

    /**
     * Get a server by calling {@link AbstractServerPredicate#chooseRandomlyAfterFiltering(java.util.List, Object)}.
     * The performance for this method is O(n) where n is number of servers to be filtered.
     */
    @Override
    public Server choose(Object key) {
        ILoadBalancer lb = getLoadBalancer();
        Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
        if (server.isPresent()) {
            return server.get();
        } else {
            return null;
        }       
    }
}

可以看到这里是调用getPredicate().chooseRoundRobinAfterFiltering方法

AbstractServerPredicate.chooseRoundRobinAfterFiltering

ribbon-loadbalancer-2.2.5-sources.jar!/com/netflix/loadbalancer/AbstractServerPredicate.java

    /**
     * Choose a server in a round robin fashion after the predicate filters a given list of servers and load balancer key. 
     */
    public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
        List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
        if (eligible.size() == 0) {
            return Optional.absent();
        }
        return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
    }

    /**
     * Get servers filtered by this predicate from list of servers. 
     */
    public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
        if (loadBalancerKey == null) {
            return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate()));            
        } else {
            List<Server> results = Lists.newArrayList();
            for (Server server: servers) {
                if (this.apply(new PredicateKey(loadBalancerKey, server))) {
                    results.add(server);
                }
            }
            return results;            
        }
    }

而chooseRoundRobinAfterFiltering调用的是getEligibleServers(servers, loadBalancerKey)方法 这里loadBalancerKey不为null,走的是else里头的apply方法。

CompositePredicate.getEligibleServers

ribbon-loadbalancer-2.2.5-sources.jar!/com/netflix/loadbalancer/CompositePredicate.java

    @Override
    public boolean apply(@Nullable PredicateKey input) {
        return delegate.apply(input);
    }

    /**
     * Get the filtered servers from primary predicate, and if the number of the filtered servers
     * are not enough, trying the fallback predicates  
     */
    @Override
    public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
        List<Server> result = super.getEligibleServers(servers, loadBalancerKey);
        Iterator<AbstractServerPredicate> i = fallbacks.iterator();
        while (!(result.size() >= minimalFilteredServers && result.size() > (int) (servers.size() * minimalFilteredPercentage))
                && i.hasNext()) {
            AbstractServerPredicate predicate = i.next();
            result = predicate.getEligibleServers(servers, loadBalancerKey);
        }
        return result;
    }

CompositePredicate重写了getEligibleServers方法,可以看到先调用了父类AbstractServerPredicate的getEligibleServers,然后再对fallback等逻辑进行处理

ZoneAvoidanceRule

ribbon-loadbalancer-2.2.5-sources.jar!/com/netflix/loadbalancer/ZoneAvoidanceRule.java

/**
 * A rule that uses the a {@link CompositePredicate} to filter servers based on zone and availability. The primary predicate is composed of
 * a {@link ZoneAvoidancePredicate} and {@link AvailabilityPredicate}, with the fallbacks to {@link AvailabilityPredicate}
 * and an "always true" predicate returned from {@link AbstractServerPredicate#alwaysTrue()} 
 * 
 * @author awang
 *
 */
public class ZoneAvoidanceRule extends PredicateBasedRule {
    //......
    private CompositePredicate compositePredicate;

    public ZoneAvoidanceRule() {
        super();
        ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this);
        AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this);
        compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate);
    }

    private CompositePredicate createCompositePredicate(ZoneAvoidancePredicate p1, AvailabilityPredicate p2) {
        return CompositePredicate.withPredicates(p1, p2)
                             .addFallbackPredicate(p2)
                             .addFallbackPredicate(AbstractServerPredicate.alwaysTrue())
                             .build();

    }

    @Override
    public AbstractServerPredicate getPredicate() {
        return compositePredicate;
    }
}

这里的getPredicate返回的是CompositePredicate,是组合Predicate,ZoneAvoidancePredicate及AvailabilityPredicate,如果二者and不满足,则fallback到AvailabilityPredicate,再不满足,则fallback到AbstractServerPredicate.alwaysTrue(),即返回true,不过滤。

小结

  • RibbonLoadBalancerClient.choose方法里头,调用loadBalancer.chooseServer(“default”),写死了loadBalancerKey。
  • loadBalancer.chooseServer方法最后是委托给IRule的choose方法,默认是使用ZoneAvoidanceRule,其内部又委托给AbstractServerPredicate的chooseRoundRobinAfterFiltering方法进行过滤。
  • 最后调用的是CompositePredicate的apply进行筛选,如果ZoneAvoidancePredicate与AvailabilityPredicate的and不通过,则看AvailabilityPredicate是否通过,如果再不通过则返回true,不进行过滤。

doc

  • Client Side Load Balancer: Ribbon

原文发布于微信公众号 - 码匠的流水账(geek_luandun)

原文发表时间:2018-05-28

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏计算机视觉与深度学习基础

Leetcode 28 Implement strStr() KMP算法

Implement strStr(). Returns the index of the first occurrence of needle in hay...

2149
来自专栏Phoenix的Android之旅

如何方便的收集app崩溃日志

很多人可能没了解过这个东西可以干嘛用, 其实它的作用是可以传入一个 Handler来捕获那些没有被捕获的异常, 比如 app 层面的 crash。 下面提供了一...

1322
来自专栏码匠的流水账

FluxSink实例及解析

reactor-core-3.1.3.RELEASE-sources.jar!/reactor/core/publisher/FluxSink.java

1712
来自专栏函数式编程语言及工具

Akka(11): 分布式运算:集群-均衡负载

在上篇讨论里我们主要介绍了Akka-Cluster的基本原理。同时我们也确认了几个使用Akka-Cluster的重点:首先,Akka-Cluster集群构建与...

5377
来自专栏码匠的流水账

聊聊kafka 0.8 ConsumerFetcherManager的MaxLag指标

本文主要研究一下kafka0.8.2.2版本中ConsumerFetcherManager的MaxLag指标的统计。

1121
来自专栏码匠的流水账

bloomfilter的简单实现

布隆过滤器(英语:Bloom Filter)是1970年由布隆提出的,可以用于检索一个元素是否在一个集合中。

741
来自专栏码匠的流水账

聊聊storm的WindowedBoltExecutor

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor....

1112
来自专栏Android知识点总结

看得见的数据结构Android版之双链表篇

741
来自专栏曾大稳的博客

Android ClassLoader流程解读并简单方式实现热更新

ClassLoader在启动Activity的时候会调用loadClass方法,我们就从这里入手:

3042
来自专栏Hellovass 的博客

Android 的消息机制

Handler 需要获取当前线程的 Looper,这时候 Looper 的作用域就是线程并且不同线程具有不同的 Looper。

1374

扫码关注云+社区

领取腾讯云代金券