摘要: 原创出处 http://www.iocoder.cn/Eureka/transport/ 「芋道源码」欢迎转载,保留摘要,谢谢!
本文主要基于 Eureka 1.8.X 版本
本文主要分享 Eureka 的网络通信部分。在不考虑 Eureka 2.x 的兼容的情况下,Eureka 1.x 主要两部分的网络通信:
本文涉及类在 com.netflix.discovery.shared.transport
包下,涉及到主体类的类图如下( 打开大图 ):
类图看起来很复杂,整体调用关系如下( 打开大图 ):
OK ,我们逐层解析,嗨起来。
推荐 Spring Cloud 书籍:
推荐 Spring Cloud 视频:
com.netflix.discovery.shared.transport.jersey.EurekaJerseyClient
,EurekaHttpClient 接口。接口代码如下:
public interface EurekaJerseyClient {
ApacheHttpClient4 getClient();
void destroyResources();
}
com.sun.jersey.client.apache4.ApacheHttpClient4
,基于 Apache HttpClient4 实现的 Jersey Client 。com.netflix.discovery.shared.transport.jersey.EurekaJerseyClientImpl
,EurekaHttpClient 实现类。实现代码如下:
// 超过微信字数上限
com.netflix.discovery.shared.transport.jersey.ApacheHttpClientConnectionCleaner
,Apache HttpClient 空闲连接清理器,负责周期性关闭处于 half-close
状态的空闲连接。点击 链接 查看带中文注释的 ApacheHttpClientConnectionCleaner。推荐阅读:《HttpClient容易忽视的细节——连接关闭》 。EurekaJerseyClientBuilder ,EurekaJerseyClientImpl 内部类,用于创建 EurekaJerseyClientImpl 。
调用 #build()
方法,创建 EurekaJerseyClientImpl ,实现代码如下:
// EurekaJerseyClientBuilder.java
public EurekaJerseyClient build() {
MyDefaultApacheHttpClient4Config config = new MyDefaultApacheHttpClient4Config();
try {
return new EurekaJerseyClientImpl(connectionTimeout, readTimeout, connectionIdleTimeout, config);
} catch (Throwable e) {
throw new RuntimeException("Cannot create Jersey client ", e);
}
}
com.sun.jersey.client.apache4.config.DefaultApacheHttpClient4Config
,实现自定义配置。点击 链接 查看带中文注释的 MyDefaultApacheHttpClient4Config。例如 :com.netflix.discovery.shared.transport.EurekaHttpClient
,Eureka-Server HTTP 访问客户端,定义了具体的 Eureka-Server API 调用方法 。点击 链接 查看带中文注释的 EurekaHttpClient。
com.netflix.discovery.shared.transport.EurekaHttpResponse
,请求响应对象,实现代码如下:
public class EurekaHttpResponse<T> {
/**
* 返回状态码
*/
private final int statusCode;
/**
* 返回对象( Entity )
*/
private final T entity;
/**
* 返回 header
*/
private final Map<String, String> headers;
/**
* 重定向地址
*/
private final URI location;
// ... 省略 setting / getting 和 Builder
}
com.netflix.discovery.shared.transport.TransportClientFactory
,创建 EurekaHttpClient 的工厂接口。接口代码如下:
public interface TransportClientFactory {
/**
* 创建 EurekaHttpClient
*
* @param serviceUrl Eureka-Server 地址
* @return EurekaHttpClient
*/
EurekaHttpClient newClient(EurekaEndpoint serviceUrl);
/**
* 关闭工厂
*/
void shutdown();
}
大多数 EurekaHttpClient 实现类都有其对应的工厂实现类。
com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient
,实现 EurekaHttpClient 的抽象类,真正实现了具体的 Eureka-Server API 调用方法。实现代码如下:
// 超过微信字数上限
jerseyClient
属性,Jersey Client ,使用上文的 EurekaHttpClient#getClient(…)
方法,获取 ApacheHttpClient4 。serviceUrl
属性,请求的 Eureka-Server 地址。#register()
方法,实现向 Eureka-Server 注册应用实例。其他方法代码类似。#addExtraHeaders(...)
方法,设置请求头( header )。该方法是抽象方法,提供子类实现自定义的请求头。代码如下:
protected abstract void addExtraHeaders(Builder webResource);com.netflix.discovery.shared.transport.jersey.JerseyApplicationClient
,实现 Eureka-Client 请求 Eureka-Server 的网络通信。点击 链接 查看带中文注释的 JerseyApplicationClient。
com.netflix.discovery.shared.transport.jersey.JerseyEurekaHttpClientFactory
,创建 JerseyApplicationClient 的工厂类。实现代码如下:
// 超过微信字数上限
JerseyEurekaHttpClientFactoryBuilder ,JerseyEurekaHttpClientFactory 内部类,用于创建 JerseyEurekaHttpClientFactory 。点击 链接 查看带中文注释的 JerseyEurekaHttpClientFactory。
调用 JerseyEurekaHttpClientFactory#create(...)
方法,创建 JerseyEurekaHttpClientFactory ,实现代码如下:
// 超过微信字数上限
com.netflix.eureka.transport.JerseyReplicationClient
,Eureka-Server 集群内,Eureka-Server 请求 其它的Eureka-Server 的网络通信。
AbstractJerseyEurekaHttpClient#addExtraHeaders()
方法,添加自定义头 x-netflix-discovery-replication=true
,代码如下:
@Override protected void addExtraHeaders(Builder webResource) { webResource.header(PeerEurekaNode.HEADER_REPLICATION, "true"); }#sendHeartBeat(…)
方法,在 《Eureka 源码解析 —— Eureka-Server 集群同步》 有详细解析。com.netflix.eureka.cluster.HttpReplicationClient
接口,实现了 #submitBatchUpdates(…)
方法,在 《Eureka 源码解析 —— Eureka-Server 集群同步》 有详细解析。JerseyReplicationClient 没有专属的工厂。
调用 JerseyReplicationClient#createReplicationClient(...)
静态方法,创建 JerseyReplicationClient 。点击 链接 查看带中文注释的方法代码。
com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator
,EurekaHttpClient 委托者抽象类。实现代码如下:
// 超过微信字数上限
#execute(…)
抽象方法,子类实现该方法,实现自己的特性。#register()
方法,实现向 Eureka-Server 注册应用实例。其他方法代码类似。#execute(…)
方法,并将原有的注册实现通过 RequestExecutor 传递进去。#execute(…)
方法,可以调用 RequestExecutor#execute(…)
方法,继续执行原有逻辑。EurekaHttpClientDecorator 的每个实现类实现一个特性,代码非常非常非常清晰。
FROM 《委托模式》 委托模式是软件设计模式中的一项基本技巧。在委托模式中,有两个对象参与处理同一个请求,接受请求的对象将请求委托给另一个对象来处理。委托模式是一项基本技巧,许多其他的模式,如状态模式、策略模式、访问者模式本质上是在更特殊的场合采用了委托模式。委托模式使得我们可以用聚合来替代继承,它还使我们可以模拟mixin。
我们在上图的基础上,增加委托的关系,如下图( 打开大图 ):
com.netflix.discovery.shared.transport.decorator.MetricsCollectingEurekaHttpClient
,监控指标收集 EurekaHttpClient ,配合 Netflix Servo 实现监控信息采集。
#execute()
方法,代码如下:
// 超过微信字数上限
RequestExecutor#execute(…)
方法,继续执行请求。com.netflix.discovery.shared.transport.decorator.RedirectingEurekaHttpClient
,寻找非 302 重定向的 Eureka-Server 的 EurekaHttpClient 。
#execute()
方法,代码如下:
// 超过微信字数上限
serviceEndpoint
( 相当于 serviceUrls
) 创建委托 EurekaHttpClient 。#executeOnNewServer(…)
方法,通过执行请求的方式,寻找非 302 状态码返回的 Eureka-Server。实现代码,点击 链接 查看带中文注释的代码实现。delegateRef
( 因为此处可能存在并发,多个线程都找到非 302 状态码返回的 Eureka-Server ),并设置当前成功非 302 请求的 EurekaHttpClient 到 delegateRef
。currentEurekaClientRef
,当请求发生异常或者超过最大重定向次数。currentEurekaClient
,后面要重新非返回 302 状态码的 Eureka-Server 。RedirectingEurekaHttpClient 提供 #createFactory(...)
静态方法获得创建其的工厂,点击 链接 查看。
com.netflix.discovery.shared.transport.decorator.RetryableEurekaHttpClient
,支持向多个 Eureka-Server 请求重试的 EurekaHttpClient 。
#execute()
方法,代码如下:
// 超过微信字数上限
currentHttpClient
不存在,意味着原有 delegate
不存在向 Eureka-Server 成功请求的 EurekaHttpClient 。delegate
,直接使用它进行执行请求。#getHostCandidates()
方法,获得候选的 Eureka-Server serviceUrls
数组。实现代码如下:
// 超过微信字数上限eureka.retryableClientQuarantineRefreshPercentage
来设置百分比,默认值:0.66
。quarantineSet
数量超过阀值,清空 quarantineSet
,全部 candidateHosts
重试。quarantineSet
数量未超过阀值,移除 candidateHosts
中在 quarantineSet
的元素。ClusterResolver#getClusterEndpoints()
方法,获得候选的 Eureka-Server 地址数组( candidateHosts
)。注意:该方法返回的 Eureka-Server 地址数组,使用以本机 IP 为随机种子,达到不同 IP 的应用实例获得的数组顺序不同,而相同 IP 的应用实例获得的数组顺序一致,效果类似基于 IP HASH 的负载均衡算法。实现该功能的代码,在 《Eureka 源码解析 —— EndPoint 与 解析器》搜索关键字【ResolverUtils#randomize(…)】 详细解析。Set#retainAll()
方法,移除隔离的故障 Eureka-Server 地址数组( quarantineSet
) 中不在 candidateHosts
的元素。candidateHosts
,移除在 quarantineSet
的元素。candidateHosts
上限,全部 Eureka-Server 请求失败,抛出异常。ServerStatusEvaluator#accept()
方法,判断响应状态码和请求类型是否能够接受。实现代码如下:
// ServerStatusEvaluators.java // 超过微信字数上限delegate
。下次请求,优先使用 delegate
,失败才进行候选的 Eureka-Server 地址数组重试。delegate
若等于 currentHttpClient
,进行清除。quarantineSet
。RetryableEurekaHttpClient 提供 #createFactory(...)
静态方法获得创建其的工厂,点击 链接 查看。
com.netflix.discovery.shared.transport.decorator.SessionedEurekaHttpClient
,支持会话的 EurekaHttpClient 。执行定期的重建会话,防止一个 Eureka-Client 永远只连接一个特定的 Eureka-Server 。反过来,这也保证了 Eureka-Server 集群变更时,Eureka-Client 对 Eureka-Server 连接的负载均衡。
#execute(...)
,代码如下:
// 超过微信字数上限
#randomizeSessionDuration(...)
方法,计算计算下一次会话超时时长,公式为 sessionDurationMs * (0.5, 1.5)
,代码如下:
protected long randomizeSessionDuration(long sessionDurationMs) { long delta = (long) (sessionDurationMs * (random.nextDouble() - 0.5)); return sessionDurationMs + delta; }TransportUtils#getOrSetAnotherClient(...)
方法代码如下:
// 超过微信字数上限eurekaHttpClientRef
里的 EurekaHttpClient 。若获取不到,将 another
设置到 eurekaHttpClientRef
。当有多个线程设置时,有且只有一个线程设置成功,另外的设置失败的线程们,意味着当前 eurekaHttpClientRef
有 EurekaHttpClient ,返回 eurekaHttpClientRef
。existing
的是 null
,需要修改成 return eurekaHttpClientRef.get()
。模拟重现该 BUG 代码如下 :
在 SessionedEurekaHttpClient 类里,没有实现创建其的工厂。在 「6. 创建网络通讯客户端」搜索 canonicalClientFactory
,可以看到 EurekaHttpClients#canonicalClientFactory(...)
方法,内部有 SessionedEurekaHttpClient 的创建工厂。
对于 Eureka-Server 来说,调用 JerseyReplicationClient#createReplicationClient(...)
静态方法即可创建用于 Eureka-Server 集群内,Eureka-Server 请求 其它的Eureka-Server 的网络通信客户端。
对于 Eureka-Client 来说,分成用于注册应用实例( registrationClient
)和查询注册信息( newQueryClient
)的两个不同网络通信客户端。在 DiscoveryClient 初始化时进行创建,代码如下:
// DiscoveryClient.class
// 超过微信字数上限
Jersey1TransportClientFactories#newTransportClientFactory(...)
方法,创建 registrationClient
和 queryClient
公用的委托的 EurekaHttpClientFactory ,代码如下:
// Jersey1TransportClientFactories.java // 超过微信字数上限EurekaHttpClients#registrationClientFactory(...)
方法,创建 registrationClient
的 EurekaHttpClientFactory ,代码如下 :
// EurekaHttpClients.java // 超过微信字数上限EurekaHttpClients#queryClientFactory(...)
方法,创建 queryClient
的 EurekaHttpClientFactory ,代码如下 :
// EurekaHttpClients.java // 超过微信字数上限这次真的是彩蛋,我们将整体调用关系调整如下如下( 打开大图 ):