前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ribbon源码

ribbon源码

作者头像
用户7798898
发布2020-09-27 17:01:02
5660
发布2020-09-27 17:01:02
举报

一. ribbon的使用

通过前面对微服务ribbon的学习, 知道了ribbon的基本使用方式。

比如:我的order服务,想要调用stock减库存的操作。 应该怎么实现呢?

第一步:引入ribbon

代码语言:javascript
复制
@LoadBalanced
@Bean
public RestTemplate getRestTemplate() {
    return new RestTemplate();
}

这里通过@LoadBalance注解, 引入了ribbon, 自动实现ribbon的负载均衡策略

第二步:写接口

如上图, 通过restTemplate手动指定stock服务,并调用其接口. 我们看到在域名部分,我们写的是服务名. 其实调用了ribbon的负载均衡策略以后, 我们大概可以知道, 它是将

http://stock 变成了 http://ip:port/的形式. 这其实就是ribbon的原理. 那么他是如何来选择ip和port的呢? 这就是具体的实现. 是使用轮训的方式找到ip, 还是使用随机的方式找到ip

代码语言:javascript
复制
学习源码的方法一
1. 以ribbon为例, 先学会使用ribbon, 知道ribbon具体有哪些功能, 知道其效果
2. 猜测ribbon是如何实现的, 也就是说, 如果是我们自己来实现ribbon的负载均衡功能, 我们要怎么做?
3. 看源码, 对比自己的思想和源码的异同. 

这是一种有自己思考的学习方式. 对于学习源码来说也会觉得更有趣

其实,如果想在项目中使用ribbon, 这两步基础就ok了, 那么, 他到底是在底层如何运转的呢? 来看看ribbon的实现.

二. ribbon源码入口

1. 就从@LoadBalanced这个注解入手

点击进入到@LoadBalanced的源码

这个源码就是定义了一个注解,没有特殊的含义. 这是一个接口, 那么注解是在哪里被实现的呢?那么,就需要查源码调用了

2. 查找loadBalanced的实现类

怎么找实现类呢?入口在哪里?

方法一: 入口通常在META-INF/spring.factories文件里.里面找到引入了LoadBalanced 类的初始化类.

通过观察, 发现和LoadBalanced有关的自动配置类有两个, 二第一个关联性更大, 因为名字基本一样. 所以, 先定位到第一个

方法二: 纯经验猜测.

首先找到LoadBalanced注解所在的包, 然后看看里面有没有和LoadBalanced有关系的AutoConfiguration配置了, 这就是靠猜了

然后, 我们可以很容易的就看到LoadBalancerAutoConfiguration类. 这个是凭经验找到了, 接下来, 验证一下是不是这个类.

三. LoadBalancerAutoConfiguration自动配置类

看一看,它里面都注入了哪些东西呢?

首先, 初始化了一个LoadBalancerInterceptor拦截器,

代码语言:javascript
复制
@Bean
public LoadBalancerInterceptor ribbonInterceptor(LoadBalancerClient loadBalancerClient, LoadBalancerRequestFactory requestFactory) {
     return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
}

这是一个拦截器, 也就是负载均衡具体实现的拦截器, 他是通过哪个拦截器实现的呢? 这里面是使用LoadBalancerInterceptor实现的. 其实, 这里面是真正实现负载均衡功能的地方

先简单看一下, 下面来看看另一个初始化方法

第二. 初始化RestTemplateCustomeizer方法

代码语言:javascript
复制
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {
     return (restTemplate) -> {
         List<ClientHttpRequestInterceptor> list = new ArrayList(restTemplate.getInterceptors());
         list.add(loadBalancerInterceptor);
         restTemplate.setInterceptors(list);
     };
}

首先看这个方法的入参, 方法的入参是谁呢? 就是第一步初始化的LoadBalancerInterceptor. 在@Bean注解中, 入参前面省略了@AutoWired, 也就是说, 相当于自动引入了LoadBalancerInterceptor类.

然后再来看返回值, 返回值是一个RestTemplateCustomizer, 这是一个接口, 里面就定义了一个方法

代码语言:javascript
复制
public interface RestTemplateCustomizer {
    void customize(RestTemplate restTemplate);
}

而下面这段代码返回的一定是一个RestTemplateCustomeizer, 那么return的内容就是customize(RestTemplate restTemplate)的具体实现了

代码语言:javascript
复制
return (restTemplate) -> {
    List<ClientHttpRequestInterceptor> list = new ArrayList(restTemplate.getInterceptors());
    list.add(loadBalancerInterceptor);
    restTemplate.setInterceptors(list);
};

public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {....}具体实现是什么呢?

就是将我们上面定义的拦截器添加到restTemplate中. restTemplate哪里来的呢? 我们可以看到最上面定义了这个

代码语言:javascript
复制
@LoadBalanced
@Autowired(
     required = false
    )
private List<RestTemplate> restTemplates = Collections.emptyList();

他的含义是: 为每一个加了@LoadBalanced注解的RestTemplate, 都将其添加到restTemplates集合中. 然后对这个集合进行处理

第三步: 初始化SmartInitializingSingleton类

代码语言:javascript
复制
  @Bean
    public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
        return () -> {
            restTemplateCustomizers.ifAvailable((customizers) -> {
                Iterator var2 = this.restTemplates.iterator();

                while(var2.hasNext()) {
                    RestTemplate restTemplate = (RestTemplate)var2.next();
                    Iterator var4 = customizers.iterator();

                    while(var4.hasNext()) {
                        RestTemplateCustomizer customizer = (RestTemplateCustomizer)var4.next();
                        customizer.customize(restTemplate);
                    }
                }

            });
        };
    }

这个类的入参是final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers, 也就是RestTemplate的定制器. 是一个list列表集合

具体实现是: 循环遍历this.restTemplates.iterator(), 也就是初始化时带有@LoadBalanced的RestTemplate, 然后执行里面的定制内容customize.

大概知道了自动配置里面引入了哪些类, 其中拦截器的实现是具体ribbon逻辑实现部分, 所以, 下面我们来看LoadBalancerInterceptor

四. LoadBalancerInterceptor拦截器实现类

代码语言:javascript
复制
小贴士: 看源码技巧

这里有一个规则, 如果看过滤器, 那么其他方法都不重要, 主要看filter的具体实现. 看拦截器, 其他都不重要, 主要看intercept方法的实现

直接看intercept方法的实现

代码语言:javascript
复制
package org.springframework.cloud.client.loadbalancer;

public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
    ......
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException {
        URI originalUri = request.getURI();
        String serviceName = originalUri.getHost();
        Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
        return (ClientHttpResponse)this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
    }
}

具体实现: 第一步: 获取请求的uri, 我们这里的是http://stock/stock/reduct/count/1/2

第二步: 获取服务的host域名, 这里对应的就是stock.

     第三步: this.loadBalancer.execute(serviceName, ......). 这里传入了服务名, 我们猜测一下实现. 首先会根据服务名获取集群, 然后在根据负载均衡策略找到ip, 然后调用http请求, 发送到指定ip

前两步不说了, 不那么重要, 来看第三步. 具体实现

代码语言:javascript
复制
  public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint) throws IOException {
        ILoadBalancer loadBalancer = this.getLoadBalancer(serviceId);
        Server server = this.getServer(loadBalancer, hint);
        if (server == null) {
            throw new IllegalStateException("No instances available for " + serviceId);
        } else {
            RibbonLoadBalancerClient.RibbonServer ribbonServer = new RibbonLoadBalancerClient.RibbonServer(serviceId, server, this.isSecure(server, serviceId), this.serverIntrospector(serviceId).getMetadata(server));
            return this.execute(serviceId, (ServiceInstance)ribbonServer, (LoadBalancerRequest)request);
        }
    }

这里代码很短, 简单看一下

  • 第一步: 获取负载均衡器,
    • ILoadBalancer loadBalancer = this.getLoadBalancer(serviceId);
    • 负载均衡器猜一下干嘛的, 一会我们选出了3个或者5个节点, 要使用负载均衡器来看看, 到底选择哪一个节点发送请求
  • Server server = this.getServer(loadBalancer, hint);
    • 将LoadBalancer作为参数传入, 最终过滤选择出一个server, 将请求发送到这个server上.

我们来看第一步: 获取负载均衡器 ILoadBalancer loadBalancer = this.getLoadBalancer(serviceId);

在这一步走到getInstance(name, ILoadBalancer.class);根据名字获取一个负载均衡器. 返回的是一个ILoadBalancer, 我们知道这是一个接口, 那么他到底是什么类型的负载均衡器呢?

我们不知道, 那么能知道的是他一定是在某个地方初始化的时候, 指定了使用哪一个实现类. 而通常这用定义实现类的地方在Configuration的@Bean中.

根据这个思路, 我们在RibbonLoadBalancerClient包下找一找有没有类似的Configuration. 根据名字猜测, 最终找到了

代码语言:javascript
复制
RibbonClientConfiguration

而这里面刚好有ILoadBalancer的实现

代码语言:javascript
复制
    @Bean
    @ConditionalOnMissingBean
    public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
            ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
            IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
        if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
            return this.propertiesFactory.get(ILoadBalancer.class, config, name);
        }
        return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
                serverListFilter, serverListUpdater);
    }

棕色部分的代码是去application.yml属性值取, 如果有特别指定使用哪个ILoadBalancer类, 那么优先使用配置中的, 如果配置没有, 则使用new ZoneAwareLoadBalancer<>(...)类.

好了, 我们找到这里就可以了, 先不看ZoneAwareLoadBalancer的具体实现.

接下来看这一步: Server server = this.getServer(loadBalancer, hint); 获取服务

代码语言:javascript
复制
package org.springframework.cloud.netflix.ribbon;

public class RibbonLoadBalancerClient implements LoadBalancerClient {
        ......

    protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
        if (loadBalancer == null) {
            return null;
        }
        // Use 'default' on a null hint, or just pass it on?
        return loadBalancer.chooseServer(hint != null ? hint : "default");
    }
}

走到这段源码, 我们就知道了, 要低啊用LoadBalancer的chooseServer方法. 而在上一步,我们知道这个LoadBalancer的实现类是ZoneAwareLoadBalancer, 所以, 我们可以直接到ZoneAwareLoadBalancer里面找到chooseServer方法了.

这里指向的是亚马逊的区域, 在中国只有一个区, 所以, 这里的数量始终是1, 所以, 最后走的是else分支

调用return super.chooseServer(key);方法

在super的chooseServer(key)方法里, 其他都不重要, 重要的是this.rule.choose(key)方法

这里的rule是一个接口protected IRule rule; 联想一下, 现在有负载均衡器了, 那么还要有负载均衡规则, 而rule正是具体实现的负载均衡规则.

根据经验我们知道, 这个IRule接口一定是在某个地方通过@Bean被初始化了,

又是根据经验, 我们猜到这个初始化的位置, 应该和刚才ILoadBalancer在同一个地方, 因为他们是同一个功能的代码, 那么如果是我写的话, 我会把同一个功能的代码的初始化放在一个地方.

所以, 来看看初始化ILoadBalancer的类RibbonClientConfiguration有没有, 搜索一下IRule ,果然找到了

代码语言:javascript
复制
    @Bean
    @ConditionalOnMissingBean
    public IRule ribbonRule(IClientConfig config) {
        if (this.propertiesFactory.isSet(IRule.class, name)) {
            return this.propertiesFactory.get(IRule.class, config, name);
        }
        ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
        rule.initWithNiwsConfig(config);
        return rule;
    }

棕色部分的代码是先从application.yml配置文件中读取用户指定的IRule实现类. 如果没有, 就是用默认的ZoneAvoidanceRule实现类.

代码语言:javascript
复制
Ribbon的三大组件
1. Rule
2. Ping
3. LoadBalancer

这是ribbon三个最重要的组件, 他们三个都是在RibbonClientConfiguration被初始化的. 
代码语言:javascript
复制
   @Bean
    @ConditionalOnMissingBean
    public IRule ribbonRule(IClientConfig config) {
        if (this.propertiesFactory.isSet(IRule.class, name)) {
            return this.propertiesFactory.get(IRule.class, config, name);
        }
        ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
        rule.initWithNiwsConfig(config);
        return rule;
    }

    @Bean
    @ConditionalOnMissingBean
    public IPing ribbonPing(IClientConfig config) {
        if (this.propertiesFactory.isSet(IPing.class, name)) {
            return this.propertiesFactory.get(IPing.class, config, name);
        }
        return new DummyPing();
    }

   @Bean
    @ConditionalOnMissingBean
    public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
            ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
            IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
        if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
            return this.propertiesFactory.get(ILoadBalancer.class, config, name);
        }
        return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
                serverListFilter, serverListUpdater);
    }

下面看看ZoneAvoidanceRule的实现, 为什么看看实现呢, 要了解他的父类继承关系, 因为很可能在调用的时候, 不是调用的它本身, 而是调用的父类

我们记住zoneAvoidanceRule的一个父类是PredicateBaseRule.

下面我们在回到this.rule.choose(key)这个方法上来, 这回我们就知道这里调用的choose()是哪一个类下面的了, 他是PredicateBaseRule下面的.

代码语言:javascript
复制
  @Override
    public Server choose(Object key) {
        ILoadBalancer lb = getLoadBalancer();
        Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
        if (server.isPresent()) {
            return server.get();
        } else {
            return null;
        }       
    }

很显然这里采用的是轮训策略选择服务器. 具体的轮训策略是如何选择的呢? 具体来看看chooseRoundRobinAfterFiltering()方法

代码语言:javascript
复制
/**
     * Choose a server in a round robin fashion after the predicate filters a given list of servers and load balancer key. 
     */
    public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
     // 获取全部可用的服务
        List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
        if (eligible.size() == 0) {
            return Optional.absent();
        }
     // 采用的服务策略.
        return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
    }
代码语言:javascript
复制
  private int incrementAndGetModulo(int modulo) {
        for (;;) {
            int current = nextIndex.get();
            int next = (current + 1) % modulo;
            if (nextIndex.compareAndSet(current, next) && current < modulo)
                return current;
        }
    }

有效的服务数+1取模, 为了防止并发, 这里使用了CAS的思想, 比较并赋值. 如果赋值失败, 会再次进入for循环, 知道成功为止

在选择轮训策略的时候, chooseRoundRobinAfterFiltering(lb.getAllServers(), key); 我们传进来了一个参数 ,lb.getAllServers(), 获取负载均衡中的所有服务.这都有哪些服务呢?往前推理, 应该是有某个地方传入了这个参数, 或者通过某个参数计算得到了服务列表. ---> 这时我们也不知道在哪里, 那就看看构造方法吧, 看谁的构造方法呢, 负载均衡器的构造方法

前面已经知道负载均衡器使用的是ZoneAwareLoadBalancer, 调用了父类的构造方法

代码语言:javascript
复制
public ZoneAwareLoadBalancer(IClientConfig clientConfig, IRule rule,
                                 IPing ping, ServerList<T> serverList, ServerListFilter<T> filter,
                                 ServerListUpdater serverListUpdater) {
        super(clientConfig, rule, ping, serverList, filter, serverListUpdater);
    }

下面来看父类的构造方法

代码语言:javascript
复制
public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
                                         ServerList<T> serverList, ServerListFilter<T> filter,
                                         ServerListUpdater serverListUpdater) {
        super(clientConfig, rule, ping);
        this.serverListImpl = serverList;
        this.filter = filter;
        this.serverListUpdater = serverListUpdater;
        if (filter instanceof AbstractServerListFilter) {
            ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
        }
        restOfInit(clientConfig);
    }

一般情况下, 看到init方法和start方法, 都要进去看看, 这里通常都是重点, 里面有内容

代码语言:javascript
复制
     void restOfInit(IClientConfig clientConfig) {
        boolean primeConnection = this.isEnablePrimingConnections();
        // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
        this.setEnablePrimingConnections(false);
        enableAndInitLearnNewServersFeature();

        updateListOfServers();
        if (primeConnection && this.getPrimeConnections() != null) {
            this.getPrimeConnections()
                    .primeConnections(getReachableServers());
        }
        this.setEnablePrimingConnections(primeConnection);
        LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
    }

启动, 初始化服务, 而且还要不断你的学习. 这里的学习是什么意思呢? 其实就是不停的问问nacos, 服务列表有更新么?有更新,我就去拉取过来, 更新本地的服务.

具体的enableAndInitLearnNewServersFeature()方法是什么呢

代码语言:javascript
复制
  public void enableAndInitLearnNewServersFeature() {
        LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
        serverListUpdater.start(updateAction);
    }

updateAction是要更新的动作, 也就是最终更新操作是在updateAction中执行的, 来看看为什么这么说呢

代码语言:javascript
复制
  protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
        @Override
        public void doUpdate() {
            updateListOfServers();
        }
    };

这里updateAction这个变量是UpdateAction的一个实例对象, 这个对象有一个方法doUpdate(), 而doUpdate()方法中调用了updateListOfServers()方法.

代码语言:javascript
复制
  @VisibleForTesting
    public void updateListOfServers() {
        List<T> servers = new ArrayList<T>();
        if (serverListImpl != null) {
            servers = serverListImpl.getUpdatedListOfServers();
            LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                    getIdentifier(), servers);

            if (filter != null) {
                servers = filter.getFilteredListOfServers(servers);
                LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                        getIdentifier(), servers);
            }
        }
        updateAllServerList(servers);
    }

更新服务. 这是具体的更新服务的方法

这个更新操作是什么时候被执行的呢?serverListUpdater.start(updateAction); 在这里, 一看见start, 知道是个类似于定时任务的方法, 很重要. 进去看看

代码语言:javascript
复制
@Override
    public synchronized void start(final UpdateAction updateAction) {
        if (isActive.compareAndSet(false, true)) {
            final Runnable wrapperRunnable = new Runnable() {
                @Override
                public void run() {
                    if (!isActive.get()) {
                        if (scheduledFuture != null) {
                            scheduledFuture.cancel(true);
                        }
                        return;
                    }
                    try {
                        updateAction.doUpdate();
                        lastUpdated = System.currentTimeMillis();
                    } catch (Exception e) {
                        logger.warn("Failed one update cycle", e);
                    }
                }
            };

            scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
                    wrapperRunnable,
                    initialDelayMs,
                    refreshIntervalMs,
                    TimeUnit.MILLISECONDS
            );
        } else {
            logger.info("Already active, no-op");
        }
    }

updateAction.doUpdate();就是上面执行的方法 , 这个方法的返回值是一个Runnable线程, 然后继续往下看, 将runnable线程传给了一个定时任务. 定时执行更新操作.

上面的操作其实就是上图, ribbon和nacos同步服务集群数据的过程. 我们知道, 在nacos中有一个注册表用来存储服务注册过来的信息, 项目启动后, 这些元数据信息会同步回传给ribbon, ribbon会在本地维护一个注册表. 但这个注册表可能随时变化, 所以, 需要定期去同步更新服务数据. 所以.

1. 初始化的时候去nacos去服务注册表数据

2. 定时任务同步获取注册表数据

代码语言:javascript
复制
@VisibleForTesting
    public void updateListOfServers() {
        List<T> servers = new ArrayList<T>();
        if (serverListImpl != null) {
            servers = serverListImpl.getUpdatedListOfServers();
            LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                    getIdentifier(), servers);

            if (filter != null) {
                servers = filter.getFilteredListOfServers(servers);
                LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                        getIdentifier(), servers);
            }
        }
        updateAllServerList(servers);
    }

更新服务的三个重要的方法

1. getUpdatedListOfServers : 查询nacos中的服务实例, 在查询之前更新, 然后返回最新的服务列表

2. getFilteredListOfServers: 过滤服务实例, 将满足条件的服务实例过滤, 不满足条件的去掉

3. updateAllServerList: 更新服务实例

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-08-03 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一. ribbon的使用
    • 第一步:引入ribbon
      • 第二步:写接口
        • 二. ribbon源码入口
          • 1. 就从@LoadBalanced这个注解入手
            • 2. 查找loadBalanced的实现类
              • 方法一: 入口通常在META-INF/spring.factories文件里.里面找到引入了LoadBalanced 类的初始化类.
              • 方法二: 纯经验猜测.
            • 三. LoadBalancerAutoConfiguration自动配置类
              • 首先, 初始化了一个LoadBalancerInterceptor拦截器,
                • 第二. 初始化RestTemplateCustomeizer方法
                • 四. LoadBalancerInterceptor拦截器实现类
                相关产品与服务
                负载均衡
                负载均衡(Cloud Load Balancer,CLB)提供安全快捷的流量分发服务,访问流量经由 CLB 可以自动分配到云中的多台后端服务器上,扩展系统的服务能力并消除单点故障。负载均衡支持亿级连接和千万级并发,可轻松应对大流量访问,满足业务需求。
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档