首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >注册中心 Eureka 源码解析 —— 应用实例注册发现(六)之全量获取

注册中心 Eureka 源码解析 —— 应用实例注册发现(六)之全量获取

作者头像
芋道源码
发布2018-07-31 17:37:10
1.1K0
发布2018-07-31 17:37:10
举报
文章被收录于专栏:芋道源码1024芋道源码1024

1. 概述

本文主要分享 Eureka-Client 向 Eureka-Server 获取全量注册信息的过程

FROM 《深度剖析服务发现组件Netflix Eureka》

Eureka-Client 获取注册信息,分成全量获取增量获取。默认配置下,Eureka-Client 启动时,首先执行一次全量获取进行本地缓存注册信息,而后每 30增量获取刷新本地缓存( 非“正常”情况下会是全量获取 )。

本文重点在于全量获取

推荐 Spring Cloud 书籍

  • 请支持正版。下载盗版,等于主动编写低级 BUG
  • 程序猿DD —— 《Spring Cloud微服务实战》
  • 周立 —— 《Spring Cloud与Docker微服务架构实战》
  • 两书齐买,京东包邮。

推荐 Spring Cloud 视频

  • Java 微服务实践 - Spring Boot
  • Java 微服务实践 - Spring Cloud
  • Java 微服务实践 - Spring Boot / Spring Cloud

2. Eureka-Client 发起全量获取

本小节调用关系如下:

2.1 初始化全量获取

Eureka-Client 启动时,首先执行一次全量获取进行本地缓存注册信息,首先代码如下:

// DiscoveryClient.java
/**
* Applications 在本地的缓存
*/
private final AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>();

DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                    Provider<BackupRegistry> backupRegistryProvider) {

     // ... 省略无关代码

    // 【3.2.5】初始化应用集合在本地的缓存
    localRegionApps.set(new Applications());

    // ... 省略无关代码     

    // 【3.2.12】从 Eureka-Server 拉取注册信息
    if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
        fetchRegistryFromBackup();
    }

     // ... 省略无关代码       
}
  • com.netflix.discovery.shared.Applications,注册的应用集合。较为容易理解,点击 链接 链接查看带中文注释的类,这里就不啰嗦了。Applications 与 InstanceInfo 类关系如下:
  • 配置 eureka.shouldFetchRegistry = true,开启从 Eureka-Server 获取注册信息。默认值:true
  • 调用 #fetchRegistry(false) 方法,从 Eureka-Server 全量获取注册信息,在 「2.4 发起获取注册信息」 详细解析。

2.2 定时获取

Eureka-Client 在初始化过程中,创建获取注册信息线程,固定间隔向 Eureka-Server 发起获取注册信息( fetch ),刷新本地注册信息缓存。实现代码如下:

// DiscoveryClient.java
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
               Provider<BackupRegistry> backupRegistryProvider) {
    // ... 省略无关代码

    // 【3.2.9】初始化线程池
    // default size of 2 - 1 each for heartbeat and cacheRefresh
    scheduler = Executors.newScheduledThreadPool(2,
         new ThreadFactoryBuilder()
                 .setNameFormat("DiscoveryClient-%d")
                 .setDaemon(true)
                 .build());

    cacheRefreshExecutor = new ThreadPoolExecutor(
         1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
         new SynchronousQueue<Runnable>(),
         new ThreadFactoryBuilder()
                 .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                 .setDaemon(true)
                 .build()
     );  // use direct handoff

    // ... 省略无关代码

    // 【3.2.14】初始化定时任务
    initScheduledTasks();

    // ... 省略无关代码
}

private void initScheduledTasks() {
    // 向 Eureka-Server 心跳(续租)执行器
    if (clientConfig.shouldFetchRegistry()) {
       // registry cache refresh timer
       int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
       int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
       scheduler.schedule(
               new TimedSupervisorTask(
                       "cacheRefresh",
                       scheduler,
                       cacheRefreshExecutor,
                       registryFetchIntervalSeconds,
                       TimeUnit.SECONDS,
                       expBackOffBound,
                       new CacheRefreshThread()
               ),
               registryFetchIntervalSeconds, TimeUnit.SECONDS);
     }
     // ... 省略无关代码
}
  • 初始化定时任务代码,和续租的定时任务代码类似,在 《Eureka 源码解析 —— 应用实例注册发现(二)之续租 》 有详细解析,这里不重复分享。
  • com.netflix.discovery.DiscoveryClient.CacheRefreshThread,注册信息缓存刷新任务,实现代码如下: class CacheRefreshThread implements Runnable { public void run() { refreshRegistry(); } }
    • 调用 #refreshRegistry(false) 方法,刷新注册信息缓存,在 「2.3 刷新注册信息缓存」 详细解析。

2.3 刷新注册信息缓存

调用 #refreshRegistry(false) 方法,刷新注册信息缓存,实现代码如下:

// DiscoveryClient.java
  1: void refreshRegistry() {
  2:     try {
  3:         // TODO 芋艿:TODO[0009]:RemoteRegionRegistry
  4:         boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();
  5: 
  6:         boolean remoteRegionsModified = false;
  7:         // This makes sure that a dynamic change to remote regions to fetch is honored.
  8:         String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
  9:         if (null != latestRemoteRegions) {
 10:             String currentRemoteRegions = remoteRegionsToFetch.get();
 11:             if (!latestRemoteRegions.equals(currentRemoteRegions)) {
 12:                 // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
 13:                 synchronized (instanceRegionChecker.getAzToRegionMapper()) {
 14:                     if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
 15:                         String[] remoteRegions = latestRemoteRegions.split(",");
 16:                         remoteRegionsRef.set(remoteRegions);
 17:                         instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
 18:                         remoteRegionsModified = true;
 19:                     } else {
 20:                         logger.info("Remote regions to fetch modified concurrently," +
 21:                                 " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
 22:                     }
 23:                 }
 24:             } else {
 25:                 // Just refresh mapping to reflect any DNS/Property change
 26:                 instanceRegionChecker.getAzToRegionMapper().refreshMapping();
 27:             }
 28:         }
 29: 
 30:         boolean success = fetchRegistry(remoteRegionsModified);
 31:         if (success) {
 32:             // 设置 注册信息的应用实例数
 33:             registrySize = localRegionApps.get().size();
 34:             // 设置 最后获取注册信息时间
 35:             lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
 36:         }
 37: 
 38:         // 打印日志
 39:         if (logger.isDebugEnabled()) {
 40:             StringBuilder allAppsHashCodes = new StringBuilder();
 41:             allAppsHashCodes.append("Local region apps hashcode: ");
 42:             allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());
 43:             allAppsHashCodes.append(", is fetching remote regions? ");
 44:             allAppsHashCodes.append(isFetchingRemoteRegionRegistries);
 45:             for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) {
 46:                 allAppsHashCodes.append(", Remote region: ");
 47:                 allAppsHashCodes.append(entry.getKey());
 48:                 allAppsHashCodes.append(" , apps hashcode: ");
 49:                 allAppsHashCodes.append(entry.getValue().getAppsHashCode());
 50:             }
 51:             logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ",
 52:                     allAppsHashCodes.toString());
 53:         }
 54:     } catch (Throwable e) {
 55:         logger.error("Cannot fetch registry from server", e);
 56:     }        
 57: }
  • 第 3 至 28 行 :TODO[0009]:RemoteRegionRegistry
  • 第 30 行 :调用 #fetchRegistry(false) 方法,从 Eureka-Server 获取注册信息,在 「2.4 发起获取注册信息」 详细解析。
  • 第 31 至 36 行 :获取注册信息成功,设置注册信息的应用实例数,最后获取注册信息时间。变量代码如下: /** * 注册信息的应用实例数 */ private volatile int registrySize = 0; /** * 最后成功从 Eureka-Server 拉取注册信息时间戳 */ private volatile long lastSuccessfulRegistryFetchTimestamp = -1;
  • 第 38 至 53 行 :打印调试日志。
  • 第 54 至 56 行 :打印异常日志。

2.4 发起获取注册信息

调用 #fetchRegistry(false) 方法,从 Eureka-Server 获取注册信息( 根据条件判断,可能是全量,也可能是增量 ),实现代码如下:

  1: private boolean fetchRegistry(boolean forceFullRegistryFetch) {
  2:     Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
  3: 
  4:     try {
  5:         // 获取 本地缓存的注册的应用实例集合
  6:         // If the delta is disabled or if it is the first time, get all
  7:         // applications
  8:         Applications applications = getApplications();
  9: 
 10:         // 全量获取
 11:         if (clientConfig.shouldDisableDelta() // 禁用增量获取
 12:                 || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
 13:                 || forceFullRegistryFetch
 14:                 || (applications == null) // 空
 15:                 || (applications.getRegisteredApplications().size() == 0) // 空
 16:                 || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
 17:         {
 18:             logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
 19:             logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
 20:             logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
 21:             logger.info("Application is null : {}", (applications == null));
 22:             logger.info("Registered Applications size is zero : {}",
 23:                     (applications.getRegisteredApplications().size() == 0));
 24:             logger.info("Application version is -1: {}", (applications.getVersion() == -1));
 25:             // 执行 全量获取
 26:             getAndStoreFullRegistry();
 27:         } else {
 28:             // 执行 增量获取
 29:             getAndUpdateDelta(applications);
 30:         }
 31:         // 设置 应用集合 hashcode
 32:         applications.setAppsHashCode(applications.getReconcileHashCode());
 33:         // 打印 本地缓存的注册的应用实例数量
 34:         logTotalInstances();
 35:     } catch (Throwable e) {
 36:         logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e);
 37:         return false;
 38:     } finally {
 39:         if (tracer != null) {
 40:             tracer.stop();
 41:         }
 42:     }
 43: 
 44:     // Notify about cache refresh before updating the instance remote status
 45:     onCacheRefreshed();
 46: 
 47:     // Update remote status based on refreshed data held in the cache
 48:     updateInstanceRemoteStatus();
 49: 
 50:     // registry was fetched successfully, so return true
 51:     return true;
 52: }
  • 第 5 至 8 行 :获取本地缓存的注册的应用实例集合,实现代码如下: public Applications getApplications() { return localRegionApps.get(); }
  • 第 10 至 26 行 :全量获取注册信息。
    • 第 11 行 :配置 eureka.disableDelta = true ,禁用增量获取注册信息。默认值:false
    • 第 12 行 :只获得一个 vipAddress 对应的应用实例们的注册信息。
    • 第 13 行 :方法参数 forceFullRegistryFetch 强制全量获取注册信息。
    • 第 14 至 15 行 :本地缓存为空。
    • 第 25 至 26 行 :调用 #getAndStoreFullRegistry() 方法,全量获取注册信息,并设置到本地缓存。下文详细解析。
  • 第 27 至 30 行 :增量获取注册信息,并刷新本地缓存,在 《Eureka 源码解析 —— 应用实例注册发现 (七)之增量获取》 详细解析。
  • 第 31 至 32 行 :计算应用集合 hashcode 。该变量用于校验增量获取的注册信息和 Eureka-Server 全量的注册信息是否一致( 完整 ),在 《Eureka 源码解析 —— 应用实例注册发现 (七)之增量获取》 详细解析。
  • 第 33 至 34 行 :打印调试日志,输出本地缓存的注册的应用实例数量。实现代码如下: private void logTotalInstances() { if (logger.isDebugEnabled()) { int totInstances = 0; for (Application application : getApplications().getRegisteredApplications()) { totInstances += application.getInstancesAsIsFromEureka().size(); } logger.debug("The total number of all instances in the client now is {}", totInstances); } }
  • 第 44 至 45 行 :触发 CacheRefreshedEvent 事件,事件监听器执行。目前 Eureka 未提供默认的该事件监听器。
    • x
    • #onCacheRefreshed() 方法,实现代码如下: /** * Eureka 事件监听器 */ private final CopyOnWriteArraySet<EurekaEventListener> eventListeners = new CopyOnWriteArraySet<>(); protected void onCacheRefreshed() { fireEvent(new CacheRefreshedEvent()); } protected void fireEvent(final EurekaEvent event) { for (EurekaEventListener listener : eventListeners) { listener.onEvent(event); } }
    • 笔者的YY :你可以实现自定义的事件监听器监听 CacheRefreshedEvent 事件,以达到持久化最新的注册信息到存储器( 例如,本地文件 ),通过这样的方式,配合实现 BackupRegistry 接口读取存储器。BackupRegistry 接口调用如下: // 【3.2.12】从 Eureka-Server 拉取注册信息 if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) { fetchRegistryFromBackup(); }
  • 第47 至 48 行 :更新本地缓存的当前应用实例在 Eureka-Server 的状态。 1: private volatile InstanceInfo.InstanceStatus lastRemoteInstanceStatus = InstanceInfo.InstanceStatus.UNKNOWN; 2: 3: private synchronized void updateInstanceRemoteStatus() { 4: // Determine this instance's status for this app and set to UNKNOWN if not found 5: InstanceInfo.InstanceStatus currentRemoteInstanceStatus = null; 6: if (instanceInfo.getAppName() != null) { 7: Application app = getApplication(instanceInfo.getAppName()); 8: if (app != null) { 9: InstanceInfo remoteInstanceInfo = app.getByInstanceId(instanceInfo.getId()); 10: if (remoteInstanceInfo != null) { 11: currentRemoteInstanceStatus = remoteInstanceInfo.getStatus(); 12: } 13: } 14: } 15: if (currentRemoteInstanceStatus == null) { 16: currentRemoteInstanceStatus = InstanceInfo.InstanceStatus.UNKNOWN; 17: } 18: 19: // Notify if status changed 20: if (lastRemoteInstanceStatus != currentRemoteInstanceStatus) { 21: onRemoteStatusChanged(lastRemoteInstanceStatus, currentRemoteInstanceStatus); 22: lastRemoteInstanceStatus = currentRemoteInstanceStatus; 23: } 24: }
    • Eureka-Client 本地应用实例与 Eureka-Server 的该应用实例状态不同的原因,因为应用实例的覆盖状态,在 《Eureka 源码解析 —— 应用实例注册发现 (八)之覆盖状态》 有详细解析。
    • 第 4 至 14 行 :从注册信息中获取当前应用在 Eureka-Server 的状态。
    • 第 19 至 23 行 :对比本地缓存最新的的当前应用实例在 Eureka-Server 的状态,若不同,更新本地缓存( 注意,只更新该缓存变量,不更新本地当前应用实例的状态( instanceInfo.status ) ),触发 StatusChangeEvent 事件,事件监听器执行。目前 Eureka 未提供默认的该事件监听器。#onRemoteStatusChanged(...) 实现代码如下: protected void onRemoteStatusChanged(InstanceInfo.InstanceStatus oldStatus, InstanceInfo.InstanceStatus newStatus) { fireEvent(new StatusChangeEvent(oldStatus, newStatus)); }

2.4.1 全量获取注册信息,并设置到本地缓存

调用 #getAndStoreFullRegistry() 方法,全量获取注册信息,并设置到本地缓存。下实现代码如下:

  1: private void getAndStoreFullRegistry() throws Throwable {
  2:     long currentUpdateGeneration = fetchRegistryGeneration.get();
  3: 
  4:     logger.info("Getting all instance registry info from the eureka server");
  5: 
  6:     // 全量获取注册信息
  7:     Applications apps = null;
  8:     EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
  9:             ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
 10:             : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
 11:     if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
 12:         apps = httpResponse.getEntity();
 13:     }
 14:     logger.info("The response status is {}", httpResponse.getStatusCode());
 15: 
 16:     // 设置到本地缓存
 17:     if (apps == null) {
 18:         logger.error("The application is null for some reason. Not storing this information");
 19:     } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
 20:         localRegionApps.set(this.filterAndShuffle(apps));
 21:         logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
 22:     } else {
 23:         logger.warn("Not updating applications as another thread is updating it already");
 24:     }
 25: }
  • 第 6 至 14 行 :全量获取注册信息,实现代码如下: // AbstractJerseyEurekaHttpClient.java @Override public EurekaHttpResponse<Applications> getApplications(String... regions) { return getApplicationsInternal("apps/", regions); } private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) { ClientResponse response = null; String regionsParamValue = null; try { WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath); if (regions != null && regions.length > 0) { regionsParamValue = StringUtil.join(regions); webResource = webResource.queryParam("regions", regionsParamValue); } Builder requestBuilder = webResource.getRequestBuilder(); addExtraHeaders(requestBuilder); response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class); // JSON Applications applications = null; if (response.getStatus() == Status.OK.getStatusCode() &amp;&amp; response.hasEntity()) { applications = response.getEntity(Applications.class); } return anEurekaHttpResponse(response.getStatus(), Applications.class) .headers(headersOf(response)) .entity(applications) .build(); } finally { if (logger.isDebugEnabled()) { logger.debug("Jersey HTTP GET {}/{}?{}; statusCode={}", serviceUrl, urlPath, regionsParamValue == null ? "" : "regions=" + regionsParamValue, response == null ? "N/A" : response.getStatus() ); } if (response != null) { response.close(); } } }
    • 调用 AbstractJerseyEurekaHttpClient#getApplications(...) 方法,GET 请求 Eureka-Server 的 apps/ 接口,参数为 regions ,返回格式为 JSON ,实现全量获取注册信息
  • 第 16 至 24 行 :设置到本地注册信息缓存
    • 第 19 行 :TODO[0025] :并发更新的情况???
    • 第 20 行 :调用 #filterAndShuffle(...) 方法,根据配置 eureka.shouldFilterOnlyUpInstances = true ( 默认值 :true ) 过滤只保留状态为开启( UP )的应用实例,并随机打乱应用实例顺序。打乱后,实现调用应用服务的随机性。代码比较易懂,点击链接查看方法实现。

3. Eureka-Server 接收全量获取

3.1 接收全量获取请求

com.netflix.eureka.resources.ApplicationsResource,处理所有应用的请求操作的 Resource ( Controller )。

接收全量获取请求,映射 ApplicationsResource#getContainers() 方法,实现代码如下:

  1: @GET
  2: public Response getContainers(@PathParam("version") String version,
  3:                               @HeaderParam(HEADER_ACCEPT) String acceptHeader,
  4:                               @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
  5:                               @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
  6:                               @Context UriInfo uriInfo,
  7:                               @Nullable @QueryParam("regions") String regionsStr) {
  8:     // TODO[0009]:RemoteRegionRegistry
  9:     boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
 10:     String[] regions = null;
 11:     if (!isRemoteRegionRequested) {
 12:         EurekaMonitors.GET_ALL.increment();
 13:     } else {
 14:         regions = regionsStr.toLowerCase().split(",");
 15:         Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
 16:         EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
 17:     }
 18: 
 19:     // 判断是否可以访问
 20:     // Check if the server allows the access to the registry. The server can
 21:     // restrict access if it is not
 22:     // ready to serve traffic depending on various reasons.
 23:     if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {
 24:         return Response.status(Status.FORBIDDEN).build();
 25:     }
 26: 
 27:     // API 版本
 28:     CurrentRequestVersion.set(Version.toEnum(version));
 29: 
 30:     // 返回数据格式
 31:     KeyType keyType = Key.KeyType.JSON;
 32:     String returnMediaType = MediaType.APPLICATION_JSON;
 33:     if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
 34:         keyType = Key.KeyType.XML;
 35:         returnMediaType = MediaType.APPLICATION_XML;
 36:     }
 37: 
 38:     // 响应缓存键( KEY )
 39:     Key cacheKey = new Key(Key.EntityType.Application,
 40:             ResponseCacheImpl.ALL_APPS,
 41:             keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
 42:     );
 43: 
 44:     //
 45:     Response response;
 46:     if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
 47:         response = Response.ok(responseCache.getGZIP(cacheKey))
 48:                 .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
 49:                 .header(HEADER_CONTENT_TYPE, returnMediaType)
 50:                 .build();
 51:     } else {
 52:         response = Response.ok(responseCache.get(cacheKey))
 53:                 .build();
 54:     }
 55:     return response;
 56: }
  • 第 8 至 17 行 :TODO[0009]:RemoteRegionRegistry
  • 第 19 至 25 行 :Eureka-Server 启动完成,但是未处于就绪( Ready )状态,不接受请求全量应用注册信息的请求,例如,Eureka-Server 启动时,未能从其他 Eureka-Server 集群的节点获取到应用注册信息。
  • 第 27 至 28 行 :设置 API 版本号。默认最新 API 版本为 V2。实现代码如下: public enum Version { V1, V2;public static Version toEnum(String v) { for (Version version : Version.values()) { if (version.name().equalsIgnoreCase(v)) { return version; } } //Defaults to v2 return V2; } }
  • 第 30 至 36 行 :设置返回数据格式,默认 JSON 。
  • 第 38 至 42 行 :创建响应缓存( ResponseCache ) 的键( KEY ),在 「3.2.1 缓存键」详细解析。
  • 第 44 至 55 行 :从响应缓存读取全量注册信息,在 「3.3 缓存读取」详细解析。

3.2 响应缓存 ResponseCache

com.netflix.eureka.registry.ResponseCache,响应缓存接口,接口代码如下:

public interface ResponseCache {

    String get(Key key);

    byte[] getGZIP(Key key);

    void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress);

    AtomicLong getVersionDelta();

    AtomicLong getVersionDeltaWithRegions();

}
  • 其中,#getVersionDelta()#getVersionDeltaWithRegions() 已经废弃。这里保留的原因主要是考虑兼容性。判断依据来自如下代码: // Applications.java @Deprecated public void setVersion(Long version) { this.versionDelta = version; } // AbstractInstanceRegistry.java public Applications getApplicationDeltas() { // ... 省略其它无关代码 apps.setVersion(responseCache.getVersionDelta().get()); // 唯一调用到 ResponseCache#getVersionDelta() 方法的地方 // ... 省略其它无关代码 }
  • #get() :获得缓存。
  • #getGZIP() :获得缓存,并 GZIP 。
  • #invalidate() :过期缓存。

3.2.1 缓存键

com.netflix.eureka.registry.Key,缓存键。实现代码如下:

public class Key {

    public enum KeyType {
        JSON, XML
    }

    /**
     * An enum to define the entity that is stored in this cache for this key.
     */
    public enum EntityType {
        Application, VIP, SVIP
    }

    /**
     * 实体名
     */
    private final String entityName;
    /**
     * TODO[0009]:RemoteRegionRegistry
     */
    private final String[] regions;
    /**
     * 请求参数类型
     */
    private final KeyType requestType;
    /**
     * 请求 API 版本号
     */
    private final Version requestVersion;
    /**
     * hashKey
     */
    private final String hashKey;
    /**
     * 实体类型
     *
     * {@link EntityType}
     */
    private final EntityType entityType;
    /**
     * {@link EurekaAccept}
     */
    private final EurekaAccept eurekaAccept;

    public Key(EntityType entityType, String entityName, KeyType type, Version v, EurekaAccept eurekaAccept, @Nullable String[] regions) {
        this.regions = regions;
        this.entityType = entityType;
        this.entityName = entityName;
        this.requestType = type;
        this.requestVersion = v;
        this.eurekaAccept = eurekaAccept;
        hashKey = this.entityType + this.entityName + (null != this.regions ? Arrays.toString(this.regions) : "")
                + requestType.name() + requestVersion.name() + this.eurekaAccept.name();
    }

    public Key(EntityType entityType, String entityName, KeyType type, Version v, EurekaAccept eurekaAccept, @Nullable String[] regions) {
        this.regions = regions;
        this.entityType = entityType;
        this.entityName = entityName;
        this.requestType = type;
        this.requestVersion = v;
        this.eurekaAccept = eurekaAccept;
        hashKey = this.entityType + this.entityName + (null != this.regions ? Arrays.toString(this.regions) : "")
                + requestType.name() + requestVersion.name() + this.eurekaAccept.name();
    }

    @Override
    public int hashCode() {
        String hashKey = getHashKey();
        return hashKey.hashCode();
    }

    @Override
    public boolean equals(Object other) {
        if (other instanceof Key) {
            return getHashKey().equals(((Key) other).getHashKey());
        } else {
            return false;
        }
    }

}

3.2.2 响应缓存实现类

com.netflix.eureka.registry.ResponseCacheImpl,响应缓存实现类。

在 ResponseCacheImpl 里,将缓存拆分成两层 :

  • 只读缓存( readOnlyCacheMap )
  • 固定过期 + 固定大小读写缓存( readWriteCacheMap )

默认配置下,缓存读取策略如下:

缓存过期策略如下:

  • 应用实例注册、下线、过期时,只只只过期 readWriteCacheMap
  • readWriteCacheMap 写入一段时间( 可配置 )后自动过期。
  • 定时任务对比 readWriteCacheMapreadOnlyCacheMap 的缓存值,若不一致,以前者为主。通过这样的方式,实现了 readOnlyCacheMap 的定时过期。

注意:应用实例注册、下线、过期时,不会很快刷新到 readWriteCacheMap 缓存里。默认配置下,最大延迟在 30 秒。

为什么可以使用缓存?

在 CAP 的选择上,Eureka 选择了 AP ,不同于 Zookeeper 选择了 CP 。

推荐阅读:

  • 《为什么不应该使用ZooKeeper做服务发现》
  • 《Spring Cloud Netflix Eureka源码导读与原理分析》「4. 作为服务注册中心,Eureka比Zookeeper好在哪里」

3.3 缓存读取

调用 ResponseCacheImpl#get(...) 方法( #getGzip(...) 类似 ),读取缓存,实现代码如下:

  1: private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>();
  2: 
  3: private final LoadingCache<Key, Value> readWriteCacheMap;
  4: 
  5: public String get(final Key key) {
  6:     return get(key, shouldUseReadOnlyResponseCache);
  7: }
  8: 
  9: String get(final Key key, boolean useReadOnlyCache) {
 10:     Value payload = getValue(key, useReadOnlyCache);
 11:     if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) {
 12:         return null;
 13:     } else {
 14:         return payload.getPayload();
 15:     }
 16: }
 17: 
 18: Value getValue(final Key key, boolean useReadOnlyCache) {
 19:     Value payload = null;
 20:     try {
 21:         if (useReadOnlyCache) {
 22:             final Value currentPayload = readOnlyCacheMap.get(key);
 23:             if (currentPayload != null) {
 24:                 payload = currentPayload;
 25:             } else {
 26:                 payload = readWriteCacheMap.get(key);
 27:                 readOnlyCacheMap.put(key, payload);
 28:             }
 29:         } else {
 30:             payload = readWriteCacheMap.get(key);
 31:         }
 32:     } catch (Throwable t) {
 33:         logger.error("Cannot get value for key :" + key, t);
 34:     }
 35:     return payload;
 36: }
  • 第 5 至 7 行 :调用 #get(key, useReadOnlyCache) 方法,读取缓存。其中 shouldUseReadOnlyResponseCache 通过配置 eureka.shouldUseReadOnlyResponseCache = true (默认值 :true ) 开启只读缓存。如果你对数据的一致性有相对高的要求,可以关闭这个开关,当然因为少了 readOnlyCacheMap ,性能会有一定的下降。
  • 第 9 至 16 行 :调用 getValue(key, useReadOnlyCache) 方法,读取缓存。从 readOnlyCacheMapreadWriteCacheMap 变量可以看到缓存值的类为 com.netflix.eureka.registry.ResponseCacheImpl.Value ,实现代码如下: public class Value { /** * 原始值 */ private final String payload; /** * GZIP 压缩后的值 */ private byte[] gzipped; public Value(String payload) { this.payload = payload; if (!EMPTY_PAYLOAD.equals(payload)) { // ... 省略 GZIP 压缩代码 gzipped = bos.toByteArray(); } else { gzipped = null; } } public String getPayload() { return payload; } public byte[] getGzipped() { return gzipped; } }
  • 第 21 至 31 行 :读取缓存。
    • readWriteCacheMap 最大缓存数量为 1000 。
    • 调用 #generatePayload(key) 方法,生成缓存值。
    • 第 21 至 28 行 :先读取 readOnlyCacheMap 。读取不到,读取 readWriteCacheMap ,并设置到 readOnlyCacheMap
    • 第 29 至 31 行 :读取 readWriteCacheMap
    • readWriteCacheMap 实现代码如下: this.readWriteCacheMap = CacheBuilder.newBuilder().initialCapacity(1000) .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS) .removalListener(new RemovalListener<Key, Value>() { @Override public void onRemoval(RemovalNotification<Key, Value> notification) { // TODO[0009]:RemoteRegionRegistry Key removedKey = notification.getKey(); if (removedKey.hasRegions()) { Key cloneWithNoRegions = removedKey.cloneWithoutRegions(); regionSpecificKeys.remove(cloneWithNoRegions, removedKey); } } }) .build(new CacheLoader<Key, Value>() { @Override public Value load(Key key) throws Exception { // // TODO[0009]:RemoteRegionRegistry if (key.hasRegions()) { Key cloneWithNoRegions = key.cloneWithoutRegions(); regionSpecificKeys.put(cloneWithNoRegions, key); } Value value = generatePayload(key); return value; } });
  • #generatePayload(key) 方法,实现代码如下: 1: private Value generatePayload(Key key) { 2: Stopwatch tracer = null; 3: try { 4: String payload; 5: switch (key.getEntityType()) { 6: case Application: 7: boolean isRemoteRegionRequested = key.hasRegions(); 8: 9: if (ALL_APPS.equals(key.getName())) { 10: if (isRemoteRegionRequested) { // TODO[0009]:RemoteRegionRegistry 11: tracer = serializeAllAppsWithRemoteRegionTimer.start(); 12: payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions())); 13: } else { 14: tracer = serializeAllAppsTimer.start(); 15: payload = getPayLoad(key, registry.getApplications()); 16: } 17: } else if (ALL_APPS_DELTA.equals(key.getName())) { 18: // ... 省略增量获取相关的代码 19: } else { 20: tracer = serializeOneApptimer.start(); 21: payload = getPayLoad(key, registry.getApplication(key.getName())); 22: } 23: break; 24: // ... 省略部分代码 25: } 26: return new Value(payload); 27: } finally { 28: if (tracer != null) { 29: tracer.stop(); 30: } 31: } 32: }
    • 第 10 至 12 行 :TODO[0009]:RemoteRegionRegistry
    • 第 13 至 16 行 :调用 AbstractInstanceRegistry#getApplications() 方法,获得注册的应用集合。后调用 #getPayLoad() 方法,将注册的应用集合转换成缓存值。? 这两个方法代码较多,下面详细解析。
    • 第 17 至 18 行 :获取增量注册信息的缓存值,在 《Eureka 源码解析 —— 应用实例注册发现 (七)之增量获取》 详细解析。

3.3.1 获得注册的应用集合

调用 AbstractInstanceRegistry#getApplications() 方法,获得注册的应用集合,实现代码如下:

// ... 省略代码,超过微信文章长度
  • 第 6 至 8 行 :TODO[0009]:RemoteRegionRegistry
  • 第 9 至 16 行 :调用 #getApplicationsFromMultipleRegions(...) 方法,获得注册的应用集合,实现代码如下: // ... 省略代码,超过微信文章长度
    • 第 2 至 第 10 行 :TODO[0009]:RemoteRegionRegistry
    • 第 11 至 29 行 :获得获得注册的应用集合。
    • 第 30 至 59 行 :TODO[0009]:RemoteRegionRegistry
    • 第 61 行 :计算应用集合 hashcode 。该变量用于校验增量获取的注册信息和 Eureka-Server 全量的注册信息是否一致( 完整 ),在 《Eureka 源码解析 —— 应用实例注册发现 (七)之增量获取》 详细解析。

3.3.2 转换成缓存值

调用 #getPayLoad() 方法,将注册的应用集合转换成缓存值,实现代码如下:

// ... 省略代码,超过微信文章长度

3.4 主动过期读写缓存

应用实例注册、下线、过期时,调用 ResponseCacheImpl#invalidate() 方法,主动过期读写缓存( readWriteCacheMap ),实现代码如下:

// ... 省略代码,超过微信文章长度
  • 调用 #invalidate(keys) 方法,逐个过期每个缓存键值,实现代码如下: // ... 省略代码,超过微信文章长度

3.5 被动过期读写缓存

读写缓存( readWriteCacheMap ) 写入后,一段时间自动过期,实现代码如下:

expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds())
  • 配置 eureka.responseCacheAutoExpirationInSeconds ,设置写入过期时长。默认值 :180 秒。

3.6 定时刷新只读缓存

定时任务对比 readWriteCacheMapreadOnlyCacheMap 的缓存值,若不一致,以前者为主。通过这样的方式,实现了 readOnlyCacheMap 的定时过期。实现代码如下:

// ... 省略代码,超过微信文章长度
  • 第 7 至 12 行 :初始化定时任务。配置 eureka.responseCacheUpdateIntervalMs,设置任务执行频率,默认值 :30 * 1000 毫秒。
  • 第 17 至 39 行 :创建定时任务。
    • 第 22 行 :循环 readOnlyCacheMap 的缓存键。为什么不循环 readWriteCacheMapreadOnlyCacheMap 的缓存过期依赖 readWriteCacheMap,因此缓存键会更多。
    • 第 28 行 至 33 行 :对比 readWriteCacheMapreadOnlyCacheMap 的缓存值,若不一致,以前者为主。通过这样的方式,实现了 readOnlyCacheMap 的定时过期。
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-06-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 芋道源码 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 2. Eureka-Client 发起全量获取
    • 2.1 初始化全量获取
      • 2.2 定时获取
        • 2.3 刷新注册信息缓存
          • 2.4 发起获取注册信息
            • 2.4.1 全量获取注册信息,并设置到本地缓存
        • 3. Eureka-Server 接收全量获取
          • 3.1 接收全量获取请求
            • 3.2 响应缓存 ResponseCache
              • 3.2.1 缓存键
              • 3.2.2 响应缓存实现类
            • 3.3 缓存读取
              • 3.3.1 获得注册的应用集合
              • 3.3.2 转换成缓存值
            • 3.4 主动过期读写缓存
              • 3.5 被动过期读写缓存
                • 3.6 定时刷新只读缓存
                相关产品与服务
                微服务引擎 TSE
                微服务引擎(Tencent Cloud Service Engine)提供开箱即用的云上全场景微服务解决方案。支持开源增强的云原生注册配置中心(Zookeeper、Nacos 和 Apollo),北极星网格(腾讯自研并开源的 PolarisMesh)、云原生 API 网关(Kong)以及微服务应用托管的弹性微服务平台。微服务引擎完全兼容开源版本的使用方式,在功能、可用性和可运维性等多个方面进行增强。
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档