注册中心 Eureka 源码解析 —— 应用实例注册发现(一)之注册

点击上方“芋道源码”,选择“置顶公众号”

技术文章第一时间送达!

源码精品专栏

本文主要基于 Eureka 1.8.X 版本

  • 1. 概述
  • 2. Eureka-Client 发起注册
  • 2.1 应用实例信息复制器
  • 2.2 刷新应用实例信息
  • 2.3 发起注册应用实例
  • 3. Eureka-Server 接收注册
  • 3.1 接收注册请求
  • 3.2 Lease
  • 3.3 注册应用实例信息
  • 666. 彩蛋

1. 概述

本文主要分享 Eureka-Client 向 Eureka-Server 注册应用实例的过程

FROM 《深度剖析服务发现组件Netflix Eureka》 二次编辑

  • 蓝框部分,为本文重点。
  • 蓝框部分,Eureka-Server 集群间复制注册的应用实例信息,不在本文内容范畴。

推荐 Spring Cloud 书籍

  • 请支持正版。下载盗版,等于主动编写低级 BUG
  • 程序猿DD —— 《Spring Cloud微服务实战》
  • 周立 —— 《Spring Cloud与Docker微服务架构实战》
  • 两书齐买,京东包邮。

推荐 Spring Cloud 视频

  • Java 微服务实践 - Spring Boot
  • Java 微服务实践 - Spring Cloud
  • Java 微服务实践 - Spring Boot / Spring Cloud

2. Eureka-Client 发起注册

Eureka-Client 向 Eureka-Server 发起注册应用实例需要符合如下条件:

  • 配置 eureka.registration.enabled = true,Eureka-Client 向 Eureka-Server 发起注册应用实例的开关
  • InstanceInfo 在 Eureka-Client 和 Eureka-Server 数据不一致。

每次 InstanceInfo 发生属性变化时,标记 isInstanceInfoDirty 属性为 true,表示 InstanceInfo 在 Eureka-Client 和 Eureka-Server 数据不一致,需要注册。另外,InstanceInfo 刚被创建时,在 Eureka-Server 不存在,也会被注册。

当符合条件时,InstanceInfo 不会立即向 Eureka-Server 注册,而是后台线程定时注册。

当 InstanceInfo 的状态( status ) 属性发生变化时,并且配置 eureka.shouldOnDemandUpdateStatusChange = true 时,立即向 Eureka-Server 注册。因为状态属性非常重要,一般情况下建议开启,当然默认情况也是开启的

Let's Go。让我们看看代码的实现。

2.1 应用实例信息复制器

// DiscoveryClient.java
public class DiscoveryClient implements EurekaClient {

    /**
     * 应用实例状态变更监听器
     */
    private ApplicationInfoManager.StatusChangeListener statusChangeListener;
    /**
     * 应用实例信息复制器
     */
    private InstanceInfoReplicator instanceInfoReplicator;

    private void initScheduledTasks() {
        // ... 省略无关代码

        if (clientConfig.shouldRegisterWithEureka()) {

            // ... 省略无关代码

            // 创建 应用实例信息复制器
            // InstanceInfo replicator
            instanceInfoReplicator = new InstanceInfoReplicator(
                    this,
                    instanceInfo,
                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                    2); // burstSize

            // 创建 应用实例状态变更监听器
            statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                @Override
                public String getId() {
                    return "statusChangeListener";
                }

                @Override
                public void notify(StatusChangeEvent statusChangeEvent) {
                    if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                            InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                        // log at warn level if DOWN was involved
                        logger.warn("Saw local status change event {}", statusChangeEvent);
                    } else {
                        logger.info("Saw local status change event {}", statusChangeEvent);
                    }
                    instanceInfoReplicator.onDemandUpdate();
                }
            };

            // 注册 应用实例状态变更监听器
            if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                applicationInfoManager.registerStatusChangeListener(statusChangeListener);
            }

            // 开启 应用实例信息复制器
            instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
        }

    }

}
  • com.netflix.discovery.InstanceInfoReplicator,应用实例信息复制器。
    • 调用 DiscoveryClient#refreshInstanceInfo() 方法,刷新应用实例信息。此处可能导致应用实例信息数据不一致,在「2.2」刷新应用实例信息 详细解析。
    • 调用 DiscoveryClient#register() 方法,Eureka-Client 向 Eureka-Server 注册应用实例
    • 调用 ScheduledExecutorService#schedule(...) 方法,再次延迟执行任务,并设置 scheduledPeriodicRef。通过这样的方式,不断循环定时执行任务。
    • 执行 instanceInfo.setIsDirty() 代码块,因为 InstanceInfo 刚被创建时,在 Eureka-Server 不存在,也会被注册
    • 调用 ScheduledExecutorService#schedule(...) 方法,延迟 initialDelayMs 毫秒执行一次任务。为什么此处设置 scheduledPeriodicRef ?在 InstanceInfoReplicator#onDemandUpdate() 方法会看到具体用途。
    • 调用 InstanceInfoReplicator#start(...) 方法,开启应用实例信息复制器。实现代码如下: // InstanceInfoReplicator.java class InstanceInfoReplicator implements Runnable {private static final Logger logger = LoggerFactory.getLogger(InstanceInfoReplicator.class); private final DiscoveryClient discoveryClient; /** * 应用实例信息 */ private final InstanceInfo instanceInfo; /** * 定时执行频率,单位:秒 */ private final int replicationIntervalSeconds; /** * 定时执行器 */ private final ScheduledExecutorService scheduler; /** * 定时执行任务的 Future */ private final AtomicReference<Future> scheduledPeriodicRef; /** * 是否开启调度 */ private final AtomicBoolean started; private final RateLimiter rateLimiter; // 限流相关,跳过 private final int burstSize; // 限流相关,跳过 private final int allowedRatePerMinute; // 限流相关,跳过 InstanceInfoReplicator(DiscoveryClient discoveryClient, InstanceInfo instanceInfo, int replicationIntervalSeconds, int burstSize) { this.discoveryClient = discoveryClient; this.instanceInfo = instanceInfo; this.scheduler = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-InstanceInfoReplicator-%d") .setDaemon(true) .build());this.scheduledPeriodicRef = new AtomicReference<Future>(); this.started = new AtomicBoolean(false); this.rateLimiter = new RateLimiter(TimeUnit.MINUTES); this.replicationIntervalSeconds = replicationIntervalSeconds; this.burstSize = burstSize; this.allowedRatePerMinute = 60 * this.burstSize / this.replicationIntervalSeconds; logger.info("InstanceInfoReplicator onDemand update allowed rate per min is {}", allowedRatePerMinute); } public void start(int initialDelayMs) { if (started.compareAndSet(false, true)) { // 设置 应用实例信息 数据不一致 instanceInfo.setIsDirty(); // for initial register // 提交任务,并设置该任务的 Future Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } } // ... 省略无关方法 } // InstanceInfo.java private volatile boolean isInstanceInfoDirty = false; private volatile Long lastDirtyTimestamp; public synchronized void setIsDirty() { isInstanceInfoDirty = true; lastDirtyTimestamp = System.currentTimeMillis(); }
    • 定时检查 InstanceInfo 的状态( status ) 属性是否发生变化。若是,发起注册。实现代码如下: // InstanceInfoReplicator.java @Override public void run() { try { // 刷新 应用实例信息 discoveryClient.refreshInstanceInfo(); // 判断 应用实例信息 是否数据不一致 Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { // 发起注册 discoveryClient.register(); // 设置 应用实例信息 数据一致 instanceInfo.unsetIsDirty(dirtyTimestamp); } } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { // 提交任务,并设置该任务的 Future Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } } // InstanceInfo.java public synchronized long setIsDirtyWithTime() { setIsDirty(); return lastDirtyTimestamp; } public synchronized void unsetIsDirty(long unsetDirtyTimestamp) { if (lastDirtyTimestamp <= unsetDirtyTimestamp) { isInstanceInfoDirty = false; } else { } }
  • com.netflix.appinfo.ApplicationInfoManager.StatusChangeListener 内部类,监听应用实例信息状态的变更。
    • 调用 Future#cancel(false) 方法,取消定时任务,避免无用的注册
    • 调用 InstanceInfoReplicator#run() 方法,发起注册。
    • 调用 ApplicationInfoManager#registerStatusChangeListener(...) 方法,注册应用实例状态变更监听器。实现代码如下: public class ApplicationInfoManager {/** * 状态变更监听器 */ protected final Map&lt;String, StatusChangeListener&gt; listeners; public void registerStatusChangeListener(StatusChangeListener listener) { listeners.put(listener.getId(), listener); } }
    • 业务里,调用 ApplicationInfoManager#setInstanceStatus(...) 方法,设置应用实例信息的状态,从而通知 InstanceInfoReplicator#onDemandUpdate() 方法的调用。实现代码如下: // ApplicationInfoManager.java public synchronized void setInstanceStatus(InstanceStatus status) { InstanceStatus next = instanceStatusMapper.map(status); if (next == null) { return; } InstanceStatus prev = instanceInfo.setStatus(next); if (prev != null) { for (StatusChangeListener listener : listeners.values()) { try { listener.notify(new StatusChangeEvent(prev, next)); } catch (Exception e) { logger.warn("failed to notify listener: {}", listener.getId(), e); } } } } // InstanceInfo.java public synchronized InstanceStatus setStatus(InstanceStatus status) { if (this.status != status) { InstanceStatus prev = this.status; this.status = status; // 设置 应用实例信息 数据一致 setIsDirty(); return prev; } return null; }
    • InstanceInfoReplicator#onDemandUpdate(),实现代码如下: // InstanceInfoReplicator.java public boolean onDemandUpdate() { if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) { // 限流相关,跳过 scheduler.submit(new Runnable() { @Override public void run() { logger.debug("Executing on-demand update of local InstanceInfo"); // 取消任务 Future latestPeriodic = scheduledPeriodicRef.get(); if (latestPeriodic != null && !latestPeriodic.isDone()) { logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update"); latestPeriodic.cancel(false); } // 再次调用 InstanceInfoReplicator.this.run(); } }); return true; } else { logger.warn("Ignoring onDemand update due to rate limiter"); return false; } }

2.2 刷新应用实例信息

调用 DiscoveryClient#refreshInstanceInfo() 方法,刷新应用实例信息。此处可能导致应用实例信息数据不一致,实现代码如下:

void refreshInstanceInfo() {
   // 刷新 数据中心信息
   applicationInfoManager.refreshDataCenterInfoIfRequired();
   // 刷新 租约信息
   applicationInfoManager.refreshLeaseInfoIfRequired();
   // 健康检查
   InstanceStatus status;
   try {
       status = getHealthCheckHandler().getStatus(instanceInfo.getStatus());
   } catch (Exception e) {
       logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e);
       status = InstanceStatus.DOWN;
   }
   if (null != status) {
       applicationInfoManager.setInstanceStatus(status);
   }
}
  • 调用 ApplicationInfoManager#refreshDataCenterInfoIfRequired() 方法,刷新数据中心相关信息,实现代码如下: // ApplicationInfoManager.java public void refreshDataCenterInfoIfRequired() { // hostname String existingAddress = instanceInfo.getHostName(); String newAddress; if (config instanceof RefreshableInstanceConfig) { // Refresh data center info, and return up to date address newAddress = ((RefreshableInstanceConfig) config).resolveDefaultAddress(true); } else { newAddress = config.getHostName(true); } // ip String newIp = config.getIpAddress(); if (newAddress != null && !newAddress.equals(existingAddress)) { logger.warn("The address changed from : {} => {}", existingAddress, newAddress); // :( in the legacy code here the builder is acting as a mutator. // This is hard to fix as this same instanceInfo instance is referenced elsewhere. // We will most likely re-write the client at sometime so not fixing for now. InstanceInfo.Builder builder = new InstanceInfo.Builder(instanceInfo); builder.setHostName(newAddress) // hostname .setIPAddr(newIp) // ip .setDataCenterInfo(config.getDataCenterInfo()); // dataCenterInfo instanceInfo.setIsDirty(); } } public abstract class AbstractInstanceConfig implements EurekaInstanceConfig {private static final Pair&lt;String, String&gt; hostInfo = getHostInfo(); @Override public String getHostName(boolean refresh) { return hostInfo.second(); } @Override public String getIpAddress() { return hostInfo.first(); } private static Pair&lt;String, String&gt; getHostInfo() { Pair&lt;String, String&gt; pair; try { InetAddress localHost = InetAddress.getLocalHost(); pair = new Pair&lt;String, String&gt;(localHost.getHostAddress(), localHost.getHostName()); } catch (UnknownHostException e) { logger.error("Cannot get host info", e); pair = new Pair&lt;String, String&gt;("", ""); } return pair; } }
    • 关注应用实例信息的 hostNameipAddrdataCenterInfo 属性的变化。
    • 一般情况下,我们使用的是非 RefreshableInstanceConfig 实现的配置类( 一般是 MyDataCenterInstanceConfig ),因为 AbstractInstanceConfig.hostInfo静态属性即使本机修改了 IP 等信息,Eureka-Client 进程也不会感知到。TODO[0022]:看下springcloud 的实现
  • 调用 ApplicationInfoManager#refreshLeaseInfoIfRequired() 方法,刷新租约相关信息,实现代码如下: public void refreshLeaseInfoIfRequired() { LeaseInfo leaseInfo = instanceInfo.getLeaseInfo(); if (leaseInfo == null) { return; } int currentLeaseDuration = config.getLeaseExpirationDurationInSeconds(); int currentLeaseRenewal = config.getLeaseRenewalIntervalInSeconds(); if (leaseInfo.getDurationInSecs() != currentLeaseDuration // 租约过期时间 改变 || leaseInfo.getRenewalIntervalInSecs() != currentLeaseRenewal) { // 租约续约频率 改变 LeaseInfo newLeaseInfo = LeaseInfo.Builder.newBuilder() .setRenewalIntervalInSecs(currentLeaseRenewal) .setDurationInSecs(currentLeaseDuration) .build(); instanceInfo.setLeaseInfo(newLeaseInfo); instanceInfo.setIsDirty(); } }
    • 关注应用实例信息的 renewalIntervalInSecsdurationInSecs 属性的变化。
  • 调用 HealthCheckHandler#getStatus() 方法,健康检查。这里先暂时跳过,我们在TODO[0004]:健康检查 详细解析。

2.3 发起注册应用实例

调用 DiscoveryClient#register() 方法,Eureka-Client 向 Eureka-Server 注册应用实例,实现代码如下:

// DiscoveryClient.java
boolean register() throws Throwable {
   logger.info(PREFIX + appPathIdentifier + ": registering service...");
   EurekaHttpResponse<Void> httpResponse;
   try {
       httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
   } catch (Exception e) {
       logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
       throw e;
   }
   if (logger.isInfoEnabled()) {
       logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
   }
   return httpResponse.getStatusCode() == 204;
}

// AbstractJerseyEurekaHttpClient.java
@Override
public EurekaHttpResponse<Void> register(InstanceInfo info) {
   String urlPath = "apps/" + info.getAppName();
   ClientResponse response = null;
   try {
       Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
       addExtraHeaders(resourceBuilder);
       response = resourceBuilder
               .header("Accept-Encoding", "gzip")
               .type(MediaType.APPLICATION_JSON_TYPE)
               .accept(MediaType.APPLICATION_JSON)
               .post(ClientResponse.class, info);
       return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
   } finally {
       if (logger.isDebugEnabled()) {
           logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                   response == null ? "N/A" : response.getStatus());
       }
       if (response != null) {
           response.close();
       }
   }
}
  • 调用 AbstractJerseyEurekaHttpClient#register(...) 方法,POST 请求 Eureka-Server 的 apps/${APP_NAME} 接口,参数为 InstanceInfo ,实现注册实例信息的注册。

3. Eureka-Server 接收注册

3.1 接收注册请求

com.netflix.eureka.resources.ApplicationResource,处理单个应用的请求操作的 Resource ( Controller )。

注册应用实例信息的请求,映射 ApplicationResource#addInstance() 方法,实现代码如下:

@Produces({"application/xml", "application/json"})
public class ApplicationResource {

    @POST
    @Consumes({"application/json", "application/xml"})
    public Response addInstance(InstanceInfo info,
                                @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
        // 校验参数是否合法
        logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
        // validate that the instanceinfo contains all the necessary required fields
        if (isBlank(info.getId())) {
            return Response.status(400).entity("Missing instanceId").build();
        } else if (isBlank(info.getHostName())) {
            return Response.status(400).entity("Missing hostname").build();
        } else if (isBlank(info.getIPAddr())) {
            return Response.status(400).entity("Missing ip address").build();
        } else if (isBlank(info.getAppName())) {
            return Response.status(400).entity("Missing appName").build();
        } else if (!appName.equals(info.getAppName())) {
            return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
        } else if (info.getDataCenterInfo() == null) {
            return Response.status(400).entity("Missing dataCenterInfo").build();
        } else if (info.getDataCenterInfo().getName() == null) {
            return Response.status(400).entity("Missing dataCenterInfo Name").build();
        }

        // AWS 相关,跳过
        // handle cases where clients may be registering with bad DataCenterInfo with missing data
        DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
        if (dataCenterInfo instanceof UniqueIdentifier) {
            String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
            if (isBlank(dataCenterInfoId)) {
                boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
                if (experimental) {
                    String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
                    return Response.status(400).entity(entity).build();
                } else if (dataCenterInfo instanceof AmazonInfo) {
                    AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
                    String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
                    if (effectiveId == null) {
                        amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
                    }
                } else {
                    logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
                }
            }
        }

        // 注册应用实例信息
        registry.register(info, "true".equals(isReplication));

        // 返回 204 成功
        return Response.status(204).build();  // 204 to be backwards compatible
    }

}
  • 请求头 isReplication 参数,和 Eureka-Server 集群复制相关,暂时跳过。
  • 调用 PeerAwareInstanceRegistryImpl#register(...) 方法,注册应用实例信息。实现代码如下: @Override public void register(final InstanceInfo info, final boolean isReplication) { // 租约过期时间 int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) { leaseDuration = info.getLeaseInfo().getDurationInSecs(); } // 注册应用实例信息 super.register(info, leaseDuration, isReplication); // Eureka-Server 复制 replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication); }
    • 调用父类 AbstractInstanceRegistry#register(...) 方法,注册应用实例信息。

3.2 Lease

在看具体的注册应用实例信息的逻辑之前,我们先来看下 com.netflix.eureka.lease.Lease,租约。实现代码如下:

public class Lease<T> {

    /**
     * 实体
     */
    private T holder;
    /**
     * 注册时间戳
     */
    private long registrationTimestamp;
    /**
     * 开始服务时间戳
     */
    private long serviceUpTimestamp;
    /**
     * 取消注册时间戳
     */
    private long evictionTimestamp;
    /**
     * 最后更新时间戳
     */
    // Make it volatile so that the expiration task would see this quicker
    private volatile long lastUpdateTimestamp;
    /**
     * 租约持续时长,单位:毫秒
     */
    private long duration;

    public Lease(T r, int durationInSecs) {
        holder = r;
        registrationTimestamp = System.currentTimeMillis();
        lastUpdateTimestamp = registrationTimestamp;
        duration = (durationInSecs * 1000);
    }

}
  • holder 属性,租约的持有者。在 Eureka-Server 里,暂时只有 InstanceInfo 使用。
  • registrationTimestamp 属性,注册( 创建 )租约时间戳。在构造方法里可以看租约对象的创建时间戳即为注册租约时间戳。
  • serviceUpTimestamp 属性,开始服务时间戳。注册应用实例信息会使用到它如下两个方法,实现代码如下: public void serviceUp() { if (serviceUpTimestamp == 0) { // 第一次有效 serviceUpTimestamp = System.currentTimeMillis(); } } public void setServiceUpTimestamp(long serviceUpTimestamp) { this.serviceUpTimestamp = serviceUpTimestamp; }
  • lastUpdatedTimestamp 属性,最后更新租约时间戳。每次续租时,更新该时间戳。注册应用实例信息会使用到它如下方法,实现代码如下: public void setLastUpdatedTimestamp() { this.lastUpdatedTimestamp = System.currentTimeMillis(); }
  • duration 属性,租约持续时间,单位:毫秒。当租约过久未续租,即当前时间 - lastUpdatedTimestamp > duration 时,租约过期。
  • evictionTimestamp 属性,租约过期时间戳。

3.3 注册应用实例信息

调用 AbstractInstanceRegistry#register(...) 方法,注册应用实例信息,实现代码如下:

  1: public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
  2:     try {
  3:         // 获取读锁
  4:         read.lock();
  5:         Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
  6:         // 增加 注册次数 到 监控
  7:         REGISTER.increment(isReplication);
  8:         // 获得 应用实例信息 对应的 租约
  9:         if (gMap == null) {
 10:             final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
 11:             gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap); // 添加 应用
 12:             if (gMap == null) { // 添加 应用 成功
 13:                 gMap = gNewMap;
 14:             }
 15:         }
 16:         Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
 17:         // Retain the last dirty timestamp without overwriting it, if there is already a lease
 18:         if (existingLease != null && (existingLease.getHolder() != null)) { // 已存在时,使用数据不一致的时间大的应用注册信息为有效的
 19:             Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); // Server 注册的 InstanceInfo
 20:             Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); // Client 请求的 InstanceInfo
 21:             logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
 22: 
 23:             // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
 24:             // InstanceInfo instead of the server local copy.
 25:             if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
 26:                 logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
 27:                         " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
 28:                 logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
 29:                 registrant = existingLease.getHolder();
 30:             }
 31:         } else {
 32:             // The lease does not exist and hence it is a new registration
 33:             // 【自我保护机制】增加 `numberOfRenewsPerMinThreshold` 、`expectedNumberOfRenewsPerMin`
 34:             synchronized (lock) {
 35:                 if (this.expectedNumberOfRenewsPerMin > 0) {
 36:                     // Since the client wants to cancel it, reduce the threshold
 37:                     // (1
 38:                     // for 30 seconds, 2 for a minute)
 39:                     this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
 40:                     this.numberOfRenewsPerMinThreshold =
 41:                             (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
 42:                 }
 43:             }
 44:             logger.debug("No previous lease information found; it is new registration");
 45:         }
 46:         // 创建 租约
 47:         Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
 48:         if (existingLease != null) { // 若租约已存在,设置 租约的开始服务的时间戳
 49:             lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
 50:         }
 51:         // 添加到 租约映射
 52:         gMap.put(registrant.getId(), lease);
 53:         // 添加到 最近注册的调试队列
 54:         synchronized (recentRegisteredQueue) {
 55:             recentRegisteredQueue.add(new Pair<Long, String>(
 56:                     System.currentTimeMillis(),
 57:                     registrant.getAppName() + "(" + registrant.getId() + ")"));
 58:         }
 59:         // 添加到 应用实例覆盖状态映射(Eureka-Server 初始化使用)
 60:         // This is where the initial state transfer of overridden status happens
 61:         if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
 62:             logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
 63:                             + "overrides", registrant.getOverriddenStatus(), registrant.getId());
 64:             if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
 65:                 logger.info("Not found overridden id {} and hence adding it", registrant.getId());
 66:                 overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
 67:             }
 68:         }
 69:         InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
 70:         if (overriddenStatusFromMap != null) {
 71:             logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
 72:             registrant.setOverriddenStatus(overriddenStatusFromMap);
 73:         }
 74: 
 75:         // 获得应用实例最终状态,并设置应用实例的状态
 76:         // Set the status based on the overridden status rules
 77:         InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
 78:         registrant.setStatusWithoutDirty(overriddenInstanceStatus);
 79: 
 80:         // 设置 租约的开始服务的时间戳(只有第一次有效)
 81:         // If the lease is registered with UP status, set lease service up timestamp
 82:         if (InstanceStatus.UP.equals(registrant.getStatus())) {
 83:             lease.serviceUp();
 84:         }
 85:         // 设置 应用实例信息的操作类型 为 添加
 86:         registrant.setActionType(ActionType.ADDED);
 87:         // 添加到 最近租约变更记录队列
 88:         recentlyChangedQueue.add(new RecentlyChangedItem(lease));
 89:         // 设置 租约的最后更新时间戳
 90:         registrant.setLastUpdatedTimestamp();
 91:         // 设置 响应缓存 过期
 92:         invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
 93:         logger.info("Registered instance {}/{} with status {} (replication={})",
 94:                 registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
 95:     } finally {
 96:         // 释放锁
 97:         read.unlock();
 98:     }
 99: }
  • 第 3 行 :添加到应用实例覆盖状态映射,在 《Eureka 源码解析 —— Eureka-Server 集群同步》 详细解析。
  • 第 6 至 7 行 :增加注册次数到监控。配合 Netflix Servo 实现监控信息采集。
  • 第 5 至 16 行 :获得应用实例信息对应的租约registry 实现代码如下: /** * 租约映射 * key1 :应用名 {@link InstanceInfo#appName} * key2 :应用实例信息编号 {@link InstanceInfo#instanceId} * value :租约 */ private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
  • 第 17 至 30 行 :当租约已存在,判断 Server 已存在的 InstanceInfo 的 lastDirtyTimestamp 是否大于( 不包括等于 ) Client 请求的 InstanceInfo ,若是,使用 Server 的 InstanceInfo 进行替代
  • 第 31 至 44 行 :增加 numberOfRenewsPerMinThresholdexpectedNumberOfRenewsPerMin,自我保护机制相关,在 《Eureka 源码解析 —— 应用实例注册发现(四)之自我保护机制》 有详细解析。
  • 第 45 至 52 行 :创建租约,并添加到租约映射( registry )。
  • 第 53 至 58 行 :添加到最近注册的调试队列( recentRegisteredQueue ),用于 Eureka-Server 运维界面的显示,无实际业务逻辑使用。实现代码如下: /** * 最近注册的调试队列 * key :添加时的时间戳 * value :字符串 = 应用名(应用实例信息编号) */ private final CircularQueue<Pair<Long, String>> recentRegisteredQueue; /** * 循环队列 * * @param <E> 泛型 */ private class CircularQueue<E> extends ConcurrentLinkedQueue<E> { /** * 队列大小 */ private int size = 0; public CircularQueue(int size) { this.size = size; } @Override public boolean add(E e) { this.makeSpaceIfNotAvailable(); return super.add(e); } /** * 保证空间足够 * * 当空间不够时,移除首元素 */ private void makeSpaceIfNotAvailable() { if (this.size() == size) { this.remove(); } } public boolean offer(E e) { this.makeSpaceIfNotAvailable(); return super.offer(e); } }
  • 第 59 至 68 行 :添加到应用实例覆盖状态映射,在 《Eureka 源码解析 —— Eureka-Server 集群同步》 详细解析。
  • 第 69 至 73 行 :设置应用实例的覆盖状态( overridestatus ),避免注册应用实例后,丢失覆盖状态。在《应用实例注册发现 (八)之覆盖状态》详细解析。
  • 第 75 至 78 行 : 获得应用实例最终状态,并设置应用实例的状态。在《应用实例注册发现 (八)之覆盖状态》详细解析。
  • 第 80 至 84 行 :设置租约的开始服务的时间戳( 只有第一次有效 )。
  • 第 85 至 88 行 :设置应用实例信息的操作类型为添加,并添加到最近租约变更记录队列( recentlyChangedQueue )。recentlyChangedQueue 用于注册信息的增量获取,在《应用实例注册发现 (七)之增量获取》详细解析。实现代码如下: /** * 最近租约变更记录队列 */ private ConcurrentLinkedQueue<RecentlyChangedItem> recentlyChangedQueue = new ConcurrentLinkedQueue<RecentlyChangedItem>();
  • 第 89 至 90 行 :设置租约的最后更新时间戳。
  • 第 91 至 92 行 :设置响应缓存( ResponseCache )过期,在《Eureka 源码解析 —— 应用实例注册发现 (六)之全量获取》详细解析。
  • 第 96 至 97 行 :释放锁。

原文发布于微信公众号 - 芋道源码(YunaiV)

原文发表时间:2018-04-23

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Android知识点总结

4-SII--☆Android缓存文件(带有效时长)封装

902
来自专栏别先生

Spark入门,概述,部署,以及学习(Spark是一种快速、通用、可扩展的大数据分析引擎)

1:Spark的官方网址:http://spark.apache.org/ 1:Spark生态系统已经发展成为一个包含多个子项目的集合,其中包含SparkSQL...

4454
来自专栏wannshan(javaer,RPC)

dubbo监控机制之监控中心实现分析

这里的监控中心以dubbo-ops\dubbo-monitor-simple项目说 总的来说是监控中心启动一个sevlet容器,通过web页面向用户多维度的展...

5036
来自专栏技术墨客

Spring核心——Profile管理环境 原

在介绍Spring核心模块为运行环境管理提供的功能之前,咱们先得解释清楚“运行环境”是什么。

623
来自专栏JadePeng的技术博客

RPC框架原理与实现

RPC,全称 Remote Procedure Call(远程过程调用),即调用远程计算机上的服务,就像调用本地服务一样。那么RPC的原理是什么呢?了解一个技术...

5307
来自专栏酷玩时刻

支付宝支付-PC电脑网站支付

支付产品全面升级(更新时间:2017/05/05 ),若您使用的是老接口,请移步老版本即时到账文档。

905
来自专栏JadePeng的技术博客

XNginx - nginx 集群可视化管理工具

之前团队的nginx管理,都是运维同学每次去修改配置文件,然后重启,非常不方便,一直想找一个可以方便管理nginx集群的工具,翻遍web,未寻到可用之物,于是自...

1053
来自专栏Ryan Miao

在dropwizard中使用feign,使用hystrix

前言 用惯了spring全家桶之后,试试dropwizard的Hello World也别有一帆风味。为了增强对外访问API的能力,需要引入open feign...

36412
来自专栏纯洁的微笑

springboot(十一):Spring boot中mongodb的使用

mongodb是最早热门非关系数据库的之一,使用也比较普遍,一般会用做离线数据分析来使用,放到内网的居多。由于很多公司使用了云服务,服务器默认都开放了外网地址,...

2816
来自专栏一个会写诗的程序员的博客

Spring Boot 应用监控:Actuator与 AdminSpring Boot 应用监控:Actuator与 Admin

在企业级应用中,对系统进行运行状态监控通常是必不可少的。Spring Boot提供了 Actuator 模块实现应用的监控与管理,对应的起步依赖是spring-...

612

扫码关注云+社区