注册中心 Eureka 源码解析 —— 网络通信

摘要: 原创出处 http://www.iocoder.cn/Eureka/transport/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 Eureka 1.8.X 版本

  • 1. 概述
  • 2. EurekaHttpClient
    • 2.1 EurekaJerseyClientImpl
    • 2.2 EurekaJerseyClientBuilder
  • 3. EurekaHttpClient
    • 3.1 EurekaHttpResponse
    • 3.2 TransportClientFactory
  • 4. AbstractJerseyEurekaHttpClient
    • 4.1 JerseyApplicationClient
    • 4.2 JerseyReplicationClient
  • 5. EurekaHttpClientDecorator
    • 5.1 MetricsCollectingEurekaHttpClient
    • 5.2 RedirectingEurekaHttpClient
    • 5.3 RetryableEurekaHttpClient
    • 5.4 SessionedEurekaHttpClient
  • 6. 创建网络通讯客户端
  • 666. 彩蛋

1. 概述

本文主要分享 Eureka 的网络通信部分。在不考虑 Eureka 2.x 的兼容的情况下,Eureka 1.x 主要两部分的网络通信:

  • Eureka-Client 请求 Eureka-Server 的网络通信
  • Eureka-Server 集群内,Eureka-Server 请求 其它的Eureka-Server 的网络通信

本文涉及类在 com.netflix.discovery.shared.transport 包下,涉及到主体类的类图如下( 打开大图 ):

  • 粉色部分 —— EurekaJerseyClient ,对基于 Jersey Server 的 Eureka-Server 的 Jersey 客户端封装。
  • 绿色部分 —— EurekaHttpClient ,Eureka-Server HTTP 访问客户端,定义了具体的 Eureka-Server API 调用方法。如果把 DiscoveryClient 类比成 Service ,那么 EurekaHttpClient 可以类比城 Dao 。
  • 综色部分 —— EurekaHttpClient 实现类,真正实现了具体的 Eureka-Server API 调用方法。
  • 红色部分 —— EurekaHttpClient 委托类,提供了会话、重试、重定向、监控指标收集等特性。
  • 黄色部分 —— EurekaHttpClientFactory,用于创建 EurekaHttpClient 。

类图看起来很复杂,整体调用关系如下( 打开大图 ):

OK ,我们逐层解析,嗨起来。

推荐 Spring Cloud 书籍

  • 请支持正版。下载盗版,等于主动编写低级 BUG
  • 程序猿DD —— 《Spring Cloud微服务实战》
  • 周立 —— 《Spring Cloud与Docker微服务架构实战》
  • 两书齐买,京东包邮。

推荐 Spring Cloud 视频

  • Java 微服务实践 - Spring Boot
  • Java 微服务实践 - Spring Cloud
  • Java 微服务实践 - Spring Boot / Spring Cloud

2. EurekaHttpClient

com.netflix.discovery.shared.transport.jersey.EurekaJerseyClient ,EurekaHttpClient 接口。接口代码如下:

public interface EurekaJerseyClient {

    ApacheHttpClient4 getClient();

    void destroyResources();
}
  • com.sun.jersey.client.apache4.ApacheHttpClient4 ,基于 Apache HttpClient4 实现的 Jersey Client 。

2.1 EurekaJerseyClientImpl

com.netflix.discovery.shared.transport.jersey.EurekaJerseyClientImpl ,EurekaHttpClient 实现类。实现代码如下:

// 超过微信字数上限
  • com.netflix.discovery.shared.transport.jersey.ApacheHttpClientConnectionCleaner ,Apache HttpClient 空闲连接清理器,负责周期性关闭处于 half-close 状态的空闲连接。点击 链接 查看带中文注释的 ApacheHttpClientConnectionCleaner。推荐阅读:《HttpClient容易忽视的细节——连接关闭》 。

2.2 EurekaJerseyClientBuilder

EurekaJerseyClientBuilder ,EurekaJerseyClientImpl 内部类,用于创建 EurekaJerseyClientImpl 。

调用 #build() 方法,创建 EurekaJerseyClientImpl ,实现代码如下:

// EurekaJerseyClientBuilder.java
public EurekaJerseyClient build() {
    MyDefaultApacheHttpClient4Config config = new MyDefaultApacheHttpClient4Config();
    try {
        return new EurekaJerseyClientImpl(connectionTimeout, readTimeout, connectionIdleTimeout, config);
    } catch (Throwable e) {
        throw new RuntimeException("Cannot create Jersey client ", e);
    }
}
  • MyDefaultApacheHttpClient4Config ,继承自 com.sun.jersey.client.apache4.config.DefaultApacheHttpClient4Config ,实现自定义配置。点击 链接 查看带中文注释的 MyDefaultApacheHttpClient4Config。例如 :
    • 自定义的请求、响应的编解码器 `com.netflix.discovery.provider.DiscoveryJerseyProvider` 。
    • 禁用重定向,使用 RedirectingEurekaHttpClient 实现该特性。
    • 自定义 UserAgent 。
    • 自定义 Http Proxy 。
    • SSL 功能的增强。ApacheHttpClient4 使用的是 Apache HttpClient 4.1.1 版本,`com.netflix.discovery.shared.transport.jersey.SSLSocketFactoryAdapter` 将 Apache HttpClient 4.3.4 对 SSL 功能的增强适配到老版本 API 。点击 链接 查看带中文注释的 SSLSocketFactoryAdapter。

3. EurekaHttpClient

com.netflix.discovery.shared.transport.EurekaHttpClient ,Eureka-Server HTTP 访问客户端,定义了具体的 Eureka-Server API 调用方法 。点击 链接 查看带中文注释的 EurekaHttpClient。

3.1 EurekaHttpResponse

com.netflix.discovery.shared.transport.EurekaHttpResponse ,请求响应对象,实现代码如下:

public class EurekaHttpResponse<T> {

    /**
     * 返回状态码
     */
    private final int statusCode;
    /**
     * 返回对象( Entity )
     */
    private final T entity;
    /**
     * 返回 header
     */
    private final Map<String, String> headers;
    /**
     * 重定向地址
     */
    private final URI location;

    // ... 省略 setting / getting 和 Builder
}

3.2 TransportClientFactory

com.netflix.discovery.shared.transport.TransportClientFactory ,创建 EurekaHttpClient 的工厂接口。接口代码如下:

public interface TransportClientFactory {

    /**
     * 创建 EurekaHttpClient
     *
     * @param serviceUrl Eureka-Server 地址
     * @return EurekaHttpClient
     */
    EurekaHttpClient newClient(EurekaEndpoint serviceUrl);

    /**
     * 关闭工厂
     */
    void shutdown();

}

大多数 EurekaHttpClient 实现类都有其对应的工厂实现类

4. AbstractJerseyEurekaHttpClient

com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient ,实现 EurekaHttpClient 的抽象类真正实现了具体的 Eureka-Server API 调用方法。实现代码如下:

// 超过微信字数上限
  • jerseyClient 属性,Jersey Client ,使用上文的 EurekaHttpClient#getClient(…) 方法,获取 ApacheHttpClient4 。
  • serviceUrl 属性,请求的 Eureka-Server 地址。
  • #register() 方法,实现向 Eureka-Server 注册应用实例。其他方法代码类似
    • x
    • 第 22 至 26 行 :设置请求地址。
    • 第 28 行 :调用 #addExtraHeaders(...) 方法,设置请求头( header )。该方法是抽象方法,提供子类实现自定义的请求头。代码如下: protected abstract void addExtraHeaders(Builder webResource);
  • 第 29 至 34 行 :请求 Eureka-Server 。
  • 第 35 至 36 行 :解析响应结果,创建 EurekaHttpResponse 。

4.1 JerseyApplicationClient

com.netflix.discovery.shared.transport.jersey.JerseyApplicationClient ,实现 Eureka-Client 请求 Eureka-Server 的网络通信。点击 链接 查看带中文注释的 JerseyApplicationClient。

4.1.1 JerseyEurekaHttpClientFactory

com.netflix.discovery.shared.transport.jersey.JerseyEurekaHttpClientFactory ,创建 JerseyApplicationClient 的工厂类。实现代码如下:

// 超过微信字数上限

4.1.2 JerseyEurekaHttpClientFactoryBuilder

JerseyEurekaHttpClientFactoryBuilder ,JerseyEurekaHttpClientFactory 内部类,用于创建 JerseyEurekaHttpClientFactory 。点击 链接 查看带中文注释的 JerseyEurekaHttpClientFactory。

调用 JerseyEurekaHttpClientFactory#create(...) 方法,创建 JerseyEurekaHttpClientFactory ,实现代码如下:

// 超过微信字数上限

4.2 JerseyReplicationClient

com.netflix.eureka.transport.JerseyReplicationClient ,Eureka-Server 集群内,Eureka-Server 请求 其它的Eureka-Server 的网络通信。

  • 实现 AbstractJerseyEurekaHttpClient#addExtraHeaders() 方法,添加自定义头 x-netflix-discovery-replication=true ,代码如下: @Override protected void addExtraHeaders(Builder webResource) { webResource.header(PeerEurekaNode.HEADER_REPLICATION, "true"); }
  • 重写了 #sendHeartBeat(…) 方法,在 《Eureka 源码解析 —— Eureka-Server 集群同步》 有详细解析。
  • 实现 com.netflix.eureka.cluster.HttpReplicationClient 接口,实现了 #submitBatchUpdates(…) 方法,在 《Eureka 源码解析 —— Eureka-Server 集群同步》 有详细解析。

4.2.1 没有工厂

JerseyReplicationClient 没有专属的工厂

调用 JerseyReplicationClient#createReplicationClient(...) 静态方法,创建 JerseyReplicationClient 。点击 链接 查看带中文注释的方法代码。

5. EurekaHttpClientDecorator

com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator,EurekaHttpClient 委托者抽象类。实现代码如下:

// 超过微信字数上限
  • #execute(…) 抽象方法,子类实现该方法,实现自己的特性。
  • #register() 方法,实现向 Eureka-Server 注册应用实例。其他方法代码类似
    • 调用 #execute(…) 方法,并将原有的注册实现通过 RequestExecutor 传递进去。
    • 子类在实现的 #execute(…) 方法,可以调用 RequestExecutor#execute(…) 方法,继续执行原有逻辑。
    • 参考设计模式:《设计模式 ( 十九 ) 模板方法模式Template method(类行为型)》 。
  • RequestType ,请求类型枚举类。代码如下: // EurekaHttpClientDecorator.java public enum RequestType { Register, Cancel, SendHeartBeat, StatusUpdate, DeleteStatusOverride, GetApplications, GetDelta, GetVip, GetSecureVip, GetApplication, GetInstance, GetApplicationInstance }
  • RequestExecutor ,请求执行器接口。接口代码如下: // EurekaHttpClientDecorator.java // 超过微信字数上限

EurekaHttpClientDecorator 的每个实现类实现一个特性,代码非常非常非常清晰。

FROM 《委托模式》 委托模式是软件设计模式中的一项基本技巧。在委托模式中,有两个对象参与处理同一个请求,接受请求的对象将请求委托给另一个对象来处理。委托模式是一项基本技巧,许多其他的模式,如状态模式、策略模式、访问者模式本质上是在更特殊的场合采用了委托模式。委托模式使得我们可以用聚合来替代继承,它还使我们可以模拟mixin。

我们在上图的基础上,增加委托的关系,如下图( 打开大图 ):

  • 请注意,每个委托着实现类,上面可能有类型为 EurekaHttpClientFactory 的属性,用于创建其委托的 EurekaHttpClient 。为什么会有 Factory ?例如,RetryableEurekaHttpClient 重试请求多个 Eureka-Server 地址时,每个 Eureka-Server 地址会创建一个 EurekaHttpClient 。所以,下文涉及到 EurekaHttpClientFactory 和委托的 EurekaHttpClient 的地方,你都需要仔细理解。

5.1 MetricsCollectingEurekaHttpClient

com.netflix.discovery.shared.transport.decorator.MetricsCollectingEurekaHttpClient ,监控指标收集 EurekaHttpClient ,配合 Netflix Servo 实现监控信息采集。

#execute() 方法,代码如下:

// 超过微信字数上限
  • 第 10 行 :调用 RequestExecutor#execute(…) 方法,继续执行请求。
    • `delegate` 属性,对应 JerseyApplicationClient 。

5.2 RedirectingEurekaHttpClient

com.netflix.discovery.shared.transport.decorator.RedirectingEurekaHttpClient寻找非 302 重定向的 Eureka-Server 的 EurekaHttpClient 。

#execute() 方法,代码如下:

// 超过微信字数上限
  • 注意:和我们理解的常规的 302 状态返回处理不同!!!
  • 整个分成两部分:【第 4 至 15 行】、【第 16 至 24 行】。
    • 当返回非 302 状态码时,找到非返回 302 状态码的 Eureka-Server 。
    • 当返回 302 状态码时,向新的重定向的 Eureka-Server 执行请求直到成功找到或超过最大次数。
    • 前者,意味着未找到非返回 302 状态码的 Eureka-Server ,此时通过在原始传递进来的 `serviceUrls` 执行请求,寻找非 302 状态码返回的 Eureka-Server。
  • 后者,意味着当前已经找到非返回 302 状态码的 Eureka-Server ,直接执行请求。注意 :此时 Eureka-Server 再返回 302 状态码,不再处理。
  • 目前 Eureka 1.x 的 Eureka-Server 不存在返回 302 状态码,猜测和 Eureka 2.X TODO[0028]:写入集群和读取集群 有关。
  • 【前者】第 5 行 :使用初始的 serviceEndpoint ( 相当于 serviceUrls ) 创建委托 EurekaHttpClient 。
  • 【前者】第 7 行 :调用 #executeOnNewServer(…) 方法,通过执行请求的方式,寻找非 302 状态码返回的 Eureka-Server。实现代码,点击 链接 查看带中文注释的代码实现。
  • 【前者】【前者】第 9 行 :关闭原有的 delegateRef ( 因为此处可能存在并发,多个线程都找到非 302 状态码返回的 Eureka-Server ),并设置当前成功非 302 请求的 EurekaHttpClient 到 delegateRef
  • 【前者】第 13 行 :关闭 currentEurekaClientRef ,当请求发生异常或者超过最大重定向次数。
  • 【后者】第 18 行 :意味着当前已经找到非返回 302 状态码的 Eureka-Server ,直接执行请求。
  • 【后者】第 21 至 22 行 :执行请求发生异常,关闭 currentEurekaClient ,后面要重新非返回 302 状态码的 Eureka-Server 。

5.2.1 工厂

RedirectingEurekaHttpClient 提供 #createFactory(...) 静态方法获得创建其的工厂,点击 链接 查看。

5.3 RetryableEurekaHttpClient

com.netflix.discovery.shared.transport.decorator.RetryableEurekaHttpClient ,支持向多个 Eureka-Server 请求重试的 EurekaHttpClient 。

#execute() 方法,代码如下:

// 超过微信字数上限
  • 第 10 行 :当前 currentHttpClient 不存在,意味着原有 delegate 不存在向 Eureka-Server 成功请求的 EurekaHttpClient 。
    • 此时需要从配置中的 Eureka-Server 数组重试请求,获得可以请求的 Eureka-Server 。
    • 如果已经存在请求成功的 delegate ,直接使用它进行执行请求。
  • 第 11 至 17 行 :调用 #getHostCandidates() 方法,获得候选的 Eureka-Server serviceUrls 数组。实现代码如下: // 超过微信字数上限
    • 第 10 行 :最小可用的阀值,配置 eureka.retryableClientQuarantineRefreshPercentage 来设置百分比,默认值:0.66
    • 最 13 至 15 行 :quarantineSet 数量超过阀值,清空 quarantineSet ,全部 candidateHosts重试。
    • 第 17 至 24 行 :quarantineSet 数量未超过阀值,移除 candidateHosts 中在 quarantineSet的元素。
    • 第 3 行 :调用 ClusterResolver#getClusterEndpoints() 方法,获得候选的 Eureka-Server 地址数组( candidateHosts )。注意:该方法返回的 Eureka-Server 地址数组,使用以本机 IP 为随机种子,达到不同 IP 的应用实例获得的数组顺序不同,而相同 IP 的应用实例获得的数组顺序一致,效果类似基于 IP HASH 的负载均衡算法。实现该功能的代码,在 《Eureka 源码解析 —— EndPoint 与 解析器》搜索关键字【ResolverUtils#randomize(…)】 详细解析。
    • 第 6 行 :调用 Set#retainAll() 方法,移除隔离的故障 Eureka-Server 地址数组( quarantineSet ) 中不在 candidateHosts 的元素。
    • 第 8 至 24 行 :在保证最小可用的 candidateHosts,移除在 quarantineSet 的元素。
  • 第 19 至 22 行 :超过 candidateHosts 上限,全部 Eureka-Server 请求失败,抛出异常。
  • 第 24 至 26 行 :创建委托的 EurekaHttpClient ,用于下面请求执行。
  • 第 31 行 :执行请求。
  • 第 33 行 :调用 ServerStatusEvaluator#accept() 方法,判断响应状态码和请求类型是否能够接受。实现代码如下: // ServerStatusEvaluators.java // 超过微信字数上限
  • 第 34 行 :请求成功,设置 delegate 。下次请求,优先使用 delegate ,失败才进行候选的 Eureka-Server 地址数组重试。
  • 第 47 行 :请求失败,delegate 若等于 currentHttpClient ,进行清除。
  • 第 50 至 52 行 :请求失败,将请求的 Eureka-Server 地址添加到 quarantineSet
  • 总结来说:
    • 【第一步】若当前有请求成功的 EurekaHttpClient ,继续使用。若请求失败,执行【第二步】。
    • 【第二步】若当前无请求成功的 EurekaHttpClient ,获取候选的 Eureka-Server 地址数组顺序创建新的 EurekaHttpClient,直到成功,或者超过最大重试次数。当请求成功,保存该 EurekaHttpClient ,下次继续使用,直到请求失败。

5.3.1 工厂

RetryableEurekaHttpClient 提供 #createFactory(...) 静态方法获得创建其的工厂,点击 链接 查看。

5.4 SessionedEurekaHttpClient

com.netflix.discovery.shared.transport.decorator.SessionedEurekaHttpClient ,支持会话的 EurekaHttpClient 。执行定期的重建会话,防止一个 Eureka-Client 永远只连接一个特定的 Eureka-Server 。反过来,这也保证了 Eureka-Server 集群变更时,Eureka-Client 对 Eureka-Server 连接的负载均衡。

#execute(...) ,代码如下:

// 超过微信字数上限
  • 第 7 至 12 行 :超过当前会话时间,关闭当前委托的 EurekaHttpClient 。
    • 增加会话过期的随机性,实现所有 Eureka-Client 的会话过期重连的发生时间更加离散,避免集中时间过期。目前猜测这么做的目的和 TODO[0028]:写入集群和读取集群 有关,即返回 302 。关联 1.x new transport enhancements 。
    • 第 10 行 :调用 #randomizeSessionDuration(...) 方法,计算计算下一次会话超时时长,公式为 sessionDurationMs * (0.5, 1.5) ,代码如下: protected long randomizeSessionDuration(long sessionDurationMs) { long delta = (long) (sessionDurationMs * (random.nextDouble() - 0.5)); return sessionDurationMs + delta; }
  • 第 15 至 18 行 :获得委托的 EurekaHttpClient 。若不存在,创建新的委托的 EurekaHttpClient 。TransportUtils#getOrSetAnotherClient(...) 方法代码如下: // 超过微信字数上限
    • 该方法实现,获得 eurekaHttpClientRef 里的 EurekaHttpClient 。若获取不到,将 another 设置到 eurekaHttpClientRef 。当有多个线程设置时,有且只有一个线程设置成功,另外的设置失败的线程们,意味着当前 eurekaHttpClientRef 有 EurekaHttpClient ,返回 eurekaHttpClientRef
    • 目前该方法存在 BUG ,失败的线程直接返回 existing 的是 null ,需要修改成 return eurekaHttpClientRef.get() 。模拟重现该 BUG 代码如下 :
  • 第 19 行 :执行请求。

5.4.1 没有工厂

在 SessionedEurekaHttpClient 类里,没有实现创建其的工厂。在 「6. 创建网络通讯客户端」搜索 canonicalClientFactory ,可以看到 EurekaHttpClients#canonicalClientFactory(...) 方法,内部有 SessionedEurekaHttpClient 的创建工厂。

6. 创建网络通讯客户端

对于 Eureka-Server 来说,调用 JerseyReplicationClient#createReplicationClient(...) 静态方法即可创建用于 Eureka-Server 集群内,Eureka-Server 请求 其它的Eureka-Server 的网络通信客户端。

对于 Eureka-Client 来说,分成用于注册应用实例( registrationClient )查询注册信息( newQueryClient )两个不同网络通信客户端。在 DiscoveryClient 初始化时进行创建,代码如下:

// DiscoveryClient.class
// 超过微信字数上限
  • 第 18 至 27 行 :调用 Jersey1TransportClientFactories#newTransportClientFactory(...) 方法,创建 registrationClientqueryClient 公用的委托的 EurekaHttpClientFactory ,代码如下: // Jersey1TransportClientFactories.java // 超过微信字数上限
    • 在 TransportClientFactory 里委托 JerseyEurekaHttpClientFactory 。
  • 第 34 至 49 行 :调用 EurekaHttpClients#registrationClientFactory(...) 方法,创建 registrationClient 的 EurekaHttpClientFactory ,代码如下 : // EurekaHttpClients.java // 超过微信字数上限
  • 第 51 至 71 行 :调用 EurekaHttpClients#queryClientFactory(...) 方法,创建 queryClient 的 EurekaHttpClientFactory ,代码如下 : // EurekaHttpClients.java // 超过微信字数上限

666. 彩蛋

这次真的是彩蛋,我们将整体调用关系调整如下如下( 打开大图 ):

原文发布于微信公众号 - 芋道源码(YunaiV)

原文发表时间:2018-08-02

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏阿杜的世界

RocketMQ学习-NameServer-1

NameServer在RocketMQ中的角色是配置中心,主要有两个功能:Broker管理、路由管理。因此NameServer上存放的主要信息也包括两类:Bro...

1253
来自专栏互联网大杂烩

Java锁与并发

保护临界区资源不会被多个线程同时访问时而受到破坏。通过锁,可以让多个线程排队。一个一个地进入临界区访问目标对象,使目标对象的状态总是保持一致。

1022
来自专栏java学习

关于Spring 和 Spring MVC的43个问题【问题汇总】

通过Spring提供的IoC容器,可以将对象之间的依赖关系交由Spring进行控制,避免硬编码所造成的过度程序耦合。

901
来自专栏转载gongluck的CSDN博客

_CrtSetDbgFlag

_CrtSetDbgFlag 若要了解有关 Visual Studio 2017 RC 的最新文档,请参阅 Visual Studio 2017 RC 文档。...

4129
来自专栏一个会写诗的程序员的博客

8.8 Spring Boot静态资源处理小结

当使用Spring Boot来开发一个完整的系统时,我们往往需要用到前端页面,这就不可或缺地需要访问到静态资源,比如图片、css、js等文件。

1433
来自专栏日常分享

Thrift IDL使用方式

众所周知,Thrift是一个RPC的框架,其可用于不同语言之间的服务相互调用。比如最近接触到的一个运用环境: *前端使用Node.Js重构了部分我们的老旧代码...

3381
来自专栏desperate633

深入理解volatile的内存语义内存可见性禁止重排序

一旦一个共享变量(类的成员变量、 类的静态成员变量) 被 volatile 修饰之后, 那么就具备了两层语义:

742
来自专栏ImportSource

Spring Boot处理REST API错误的正确姿势

如何正确的处理API的返回信息,让返回的错误信息提供更多的含义是一个非常值得做的功能。 默认一般返回的都是难以理解的堆栈信息,然而这些信息也许对于API的客户...

59013
来自专栏xingoo, 一个梦想做发明家的程序员

Java程序员的日常——经验贴(纯干货)二

继昨天的经验贴,今天的工作又收获不少。 windows下编辑器会给文件添加BOM 在windows的编辑器中,为了区分编码,通常会添加一个BOM标记。比如...

2179
来自专栏Java后端技术栈

Jenkins指定具体分支持续集成-使用Generic Webhook Trigger插件和码云

使用Generic Webhook Trigger插件实现Jenkins+WebHooks(码云)持续集成

3042

扫码关注云+社区

领取腾讯云代金券