1. 概述
本文主要分享 EndPoint 与 解析器。
目前有多种 Eureka-Server 访问地址的配置方式,本文只分享 Eureka 1.x 的配置,不包含 Eureka 1.x 对 Eureka 2.x 的兼容配置:
eureka.serviceUrl.defaultZone=http://127.0.0.1:8080/v2
。eureka.shouldUseDns=true
并且eureka.eurekaServer.domainName=eureka.iocoder.cn
。本文涉及类在 com.netflix.discovery.shared.resolver
包下,涉及到主体类的类图如下( 打开大图 ):
推荐 Spring Cloud 书籍:
推荐 Spring Cloud 视频:
com.netflix.discovery.shared.resolver.EurekaEndpoint
,Eureka 服务端点接口,实现代码如下:
public interface EurekaEndpoint extends Comparable<Object> {
/**
* @return 完整的服务 URL
*/
String getServiceUrl();
/**
* @deprecated use {@link #getNetworkAddress()}
*/
@Deprecated
String getHostName();
/**
* @return 网络地址
*/
String getNetworkAddress();
/**
* @return 端口
*/
int getPort();
/**
* @return 是否安全( https )
*/
boolean isSecure();
/**
* @return 相对路径
*/
String getRelativeUri();
}
com.netflix.discovery.shared.resolver.DefaultEndpoint
,默认 Eureka 服务端点实现类。实现代码如下:
public class DefaultEndpoint implements EurekaEndpoint {
/**
* 网络地址
*/
protected final String networkAddress;
/**
* 端口
*/
protected final int port;
/**
* 是否安全( https )
*/
protected final boolean isSecure;
/**
* 相对地址
*/
protected final String relativeUri;
/**
* 完整的服务 URL
*/
protected final String serviceUrl;
public DefaultEndpoint(String serviceUrl) {
this.serviceUrl = serviceUrl;
// 将 serviceUrl 分解成 几个属性
try {
URL url = new URL(serviceUrl);
this.networkAddress = url.getHost();
this.port = url.getPort();
this.isSecure = "https".equals(url.getProtocol());
this.relativeUri = url.getPath();
} catch (Exception e) {
throw new IllegalArgumentException("Malformed serviceUrl: " + serviceUrl);
}
}
public DefaultEndpoint(String networkAddress, int port, boolean isSecure, String relativeUri) {
this.networkAddress = networkAddress;
this.port = port;
this.isSecure = isSecure;
this.relativeUri = relativeUri;
// 几个属性 拼接成 serviceUrl
StringBuilder sb = new StringBuilder().append(isSecure ? "https" : "http").append("://").append(networkAddress);
if (port >= 0) {
sb.append(':').append(port);
}
if (relativeUri != null) {
if (!relativeUri.startsWith("/")) {
sb.append('/');
}
sb.append(relativeUri);
}
this.serviceUrl = sb.toString();
}
}
#equals(…)
和 #hashCode(…)
方法,标准实现方式,这里就不贴代码了。#compareTo(…)
方法,基于 serviceUrl
属性做比较。com.netflix.discovery.shared.resolver.aws.AwsEndpoint
,基于 region
、zone
的 Eureka 服务端点实现类 ( 请不要在意 AWS 开头 )。实现代码如下:
public class AwsEndpoint extends DefaultEndpoint {
/**
* 区域
*/
protected final String region;
/**
* 可用区
*/
protected final String zone;
}
#equals(…)
和 #hashCode(…)
方法,标准实现方式,这里就不贴代码了。EndPoint 解析器使用委托设计模式实现。所以,上文图片中我们看到好多个解析器,实际代码非常非常非常清晰。
FROM 《委托模式》 委托模式是软件设计模式中的一项基本技巧。在委托模式中,有两个对象参与处理同一个请求,接受请求的对象将请求委托给另一个对象来处理。委托模式是一项基本技巧,许多其他的模式,如状态模式、策略模式、访问者模式本质上是在更特殊的场合采用了委托模式。委托模式使得我们可以用聚合来替代继承,它还使我们可以模拟mixin。
我们在上图的基础上,增加委托的关系,如下图:
com.netflix.discovery.shared.resolver.ClusterResolver
,集群解析器接口。接口代码如下:
public interface ClusterResolver<T extends EurekaEndpoint> {
/**
* @return 地区
*/
String getRegion();
/**
* @return EndPoint 集群( 数组 )
*/
List<T> getClusterEndpoints();
}
com.netflix.discovery.shared.resolver.ClosableResolver
,可关闭的解析器接口,继承自 ClusterResolver 接口。接口代码如下:
public interface ClosableResolver<T extends EurekaEndpoint> extends ClusterResolver<T> {
/**
* 关闭
*/
void shutdown();
}
com.netflix.discovery.shared.resolver.aws.DnsTxtRecordClusterResolver
,基于 DNS TXT 记录类型的集群解析器。类属性代码如下:
public class DnsTxtRecordClusterResolver implements ClusterResolver<AwsEndpoint> {
/**
* 地区
*/
private final String region;
/**
* 集群根地址,例如 txt.default.eureka.iocoder.cn
*/
private final String rootClusterDNS;
/**
* 是否解析可用区( zone )
*/
private final boolean extractZoneFromDNS;
/**
* 端口
*/
private final int port;
/**
* 是否安全
*/
private final boolean isSecure;
/**
* 相对地址
*/
private final String relativeUri;
}
rootClusterDNS
) 解析出 EndPoint 集群。需要在 DNS 配置两层解析记录:TXT.${ZONE}.${自定义二级域名}
或者 ${ZONE}.${自定义二级域名}
。TXT.${REGION}.${自定义二级域名}
。rootClusterDNS
,集群根地址。例如:txt.default.eureka.iocoder.cn
,其· txt.default.eureka
为 DNS 解析记录的第一层的主机记录。region
:地区。需要和 rootClusterDNS
的 ${REGION}
一致。extractZoneFromDNS
:是否解析 DNS 解析记录的第二层级的主机记录的 ${ZONE}
可用区。#getClusterEndpoints(...)
方法,实现代码如下:
1: @Override
2: public List<AwsEndpoint> getClusterEndpoints() {
3: List<AwsEndpoint> eurekaEndpoints = resolve(region, rootClusterDNS, extractZoneFromDNS, port, isSecure, relativeUri);
4: if (logger.isDebugEnabled()) {
5: logger.debug("Resolved {} to {}", rootClusterDNS, eurekaEndpoints);
6: }
7: return eurekaEndpoints;
8: }
9:
10: private static List<AwsEndpoint> resolve(String region, String rootClusterDNS, boolean extractZone, int port, boolean isSecure, String relativeUri) {
11: try {
12: // 解析 第一层 DNS 记录
13: Set<String> zoneDomainNames = resolve(rootClusterDNS);
14: if (zoneDomainNames.isEmpty()) {
15: throw new ClusterResolverException("Cannot resolve Eureka cluster addresses; there are no data in TXT record for DN " + rootClusterDNS);
16: }
17: // 记录 第二层 DNS 记录
18: List<AwsEndpoint> endpoints = new ArrayList<>();
19: for (String zoneDomain : zoneDomainNames) {
20: String zone = extractZone ? ResolverUtils.extractZoneFromHostName(zoneDomain) : null; //
21: Set<String> zoneAddresses = resolve(zoneDomain);
22: for (String address : zoneAddresses) {
23: endpoints.add(new AwsEndpoint(address, port, isSecure, relativeUri, region, zone));
24: }
25: }
26: return endpoints;
27: } catch (NamingException e) {
28: throw new ClusterResolverException("Cannot resolve Eureka cluster addresses for root: " + rootClusterDNS, e);
29: }
30: }
#resolve(rootClusterDNS)
解析第一层 DNS 记录。实现代码如下:
1: private static Set<String> resolve(String rootClusterDNS) throws NamingException { 2: Set<String> result; 3: try { 4: result = DnsResolver.getCNamesFromTxtRecord(rootClusterDNS); 5: // TODO 芋艿:这块是bug,不需要这一段 6: if (!rootClusterDNS.startsWith("txt.")) { 7: result = DnsResolver.getCNamesFromTxtRecord("txt." + rootClusterDNS); 8: } 9: } catch (NamingException e) { 10: if (!rootClusterDNS.startsWith("txt.")) { 11: result = DnsResolver.getCNamesFromTxtRecord("txt." + rootClusterDNS); 12: } else { 13: throw e; 14: } 15: } 16: return result; 17: }DnsResolver#getCNamesFromTxtRecord(…)
方法,解析 TXT 主机记录。点击链接查看带中文注释的 DnsResolver 的代码,比较解析,笔者就不啰嗦了。rootClusterDNS
不以 txt.
开头时,即使第 4 行解析成功,也会报错,此时是个 Eureka 的 BUG 。因此,配置 DNS 解析记录时,主机记录暂时必须以 txt.
开头。zone
)。#resolve(rootClusterDNS)
解析第二层 DNS 记录。com.netflix.discovery.shared.resolver.aws.ConfigClusterResolver
,基于配置文件的集群解析器。类属性代码如下:
public class ConfigClusterResolver implements ClusterResolver<AwsEndpoint> {
private final EurekaClientConfig clientConfig;
private final InstanceInfo myInstanceInfo;
public ConfigClusterResolver(EurekaClientConfig clientConfig, InstanceInfo myInstanceInfo) {
this.clientConfig = clientConfig;
this.myInstanceInfo = myInstanceInfo;
}
}
#getClusterEndpoints(...)
方法,实现代码如下:
// ... 省略代码,超过微信文章的长度
#getClusterEndpointsFromDns()
方法,实现代码如下:
// ... 省略代码,超过微信文章的长度eureka.shouldUseDns=true
,开启基于 DNS 获取 EndPoint 集群。eureka.eurekaServer.domainName=${xxxxx}
,配置集群根地址。eureka.eurekaServer.port
,eureka.eurekaServer.context
。#getClusterEndpointsFromConfig()
方法,实现代码如下:// ... 省略代码,超过微信文章的长度
eureka.${REGION}.availabilityZones
配置。InstanceInfo#getZone(…)
方法,获得应用实例自己所在的可用区( zone
)。非亚马逊 AWS 环境下,可用区数组的第一个元素就是应用实例自己所在的可用区。EndpointUtils#getServiceUrlsMapFromConfig(...)
方法,获得可用区与 serviceUrls
的映射。实现代码如下:
// ... 省略代码,超过微信文章的长度preferSameZone=true
,即 eureka.preferSameZone=true
( 默认值 :true
) 时,开始位置为可用区数组( availZones
)的第一个和应用实例所在的可用区( myZone
)【相等】元素的位置。preferSameZone=false
,即 eureka.preferSameZone=false
( 默认值 :true
) 时,开始位置为可用区数组( availZones
)的第一个和应用实例所在的可用区( myZone
)【不相等】元素的位置。serviceUrls
添加到结果。顺序理解如下图:
com.netflix.discovery.shared.resolver.aws.ZoneAffinityClusterResolver
,使用可用区亲和的集群解析器。类属性代码如下:
// ... 省略代码,超过微信文章的长度
delegate
,委托的解析器。目前代码里使用的是 ConfigClusterResolver 。zoneAffinity
,是否可用区亲和。#getClusterEndpoints(...)
方法,实现代码如下:
// ... 省略代码,超过微信文章的长度
ClusterResolver#getClusterEndpoints()
方法,获得 EndPoint 集群。再调用 ResolverUtils#splitByZone(…)
方法,拆分成本地和非本地的可用区的 EndPoint 集群,点击链接查看实现。#randomizeAndMerge(...)
方法,分别随机打乱每个 EndPoint 集群,并进行合并数组,实现代码如下:
// ... 省略代码,超过微信文章的长度com.netflix.discovery.shared.resolver.AsyncResolver
,异步执行解析的集群解析器。AsyncResolver 属性较多,而且复杂的多,我们拆分到具体方法里分享。
AsyncResolver 内置定时任务,定时刷新 EndPoint 集群解析结果。
为什么要刷新?例如,Eureka-Server 的 serviceUrls
基于 DNS 配置。
定时任务代码如下:
// ... 省略代码,超过微信文章的长度
backgroundTask
,后台任务,定时解析 EndPoint 集群。delegate
,委托的解析器,目前代码为 ZoneAffinityClusterResolver。updateTask
实现代码如下:
// ... 省略代码,超过微信文章的长度#getClusterEndpoints()
方法,在 「3.6.2 解析 EndPoint 集群」 详细解析。调用 #getClusterEndpoints()
方法,解析 EndPoint 集群,实现代码如下:
// ... 省略代码,超过微信文章的长度
#doWarmUp()
方法,进行预热。若预热失败,取消定时任务的第一次延迟。#doWarmUp()
方法实现代码如下:
// ... 省略代码,超过微信文章的长度updateTask
,解析 EndPoint 集群。#scheduleTask()
方法,实现代码如下:
// ... 省略代码,超过微信文章的长度Eureka-Client 在初始化时,调用 DiscoveryClient#scheduleServerEndpointTask()
方法,初始化 AsyncResolver 解析器。实现代码如下:
// ... 省略代码,超过微信文章的长度
EurekaHttpClients#newBootstrapResolver(...)
方法,创建 EndPoint 解析器,实现代码如下:
// ... 省略代码,超过微信文章的长度#defaultBootstrapResolver()
方法,创建默认的解析器 AsyncResolver 。delegate
参数。ZoneAffinityClusterResolver#getClusterEndpoints()
方法,第一次 Eureka-Server EndPoint 集群解析。eureka.experimental.clientTransportFailFastOnInit=true
),使 Eureka-Client 初始化失败。#failFastOnInitCheck(...)
方法,实现代码如下:
// potential future feature, guarding with experimental flag for now // ... 省略代码,超过微信文章的长度AsyncResolver.resultsRef
属性一开始已经用 initialValue
传递给 AsyncResolver 构造方法。实现代码如下:
Java public AsyncResolver(String name, ClusterResolver<T> delegate, List<T> initialValues, int executorThreadPoolSize, int refreshIntervalMs) { this( name, delegate, initialValues, executorThreadPoolSize, refreshIntervalMs, 0 ); ¨K78K }
如果你对 Dubbo 感兴趣,欢迎加入我的知识星球一起交流。