前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >eureka 原理_什么是swot分析方法

eureka 原理_什么是swot分析方法

作者头像
全栈程序员站长
发布2022-09-27 10:40:12
6430
发布2022-09-27 10:40:12
举报

大家好,又见面了,我是你们的朋友全君。

#总结 eureka内部实际上是分为三个实例的,分别是Client,Server和Instance, Eureka-Client启动,创建Instance实例,封装成对象,推送给server,server接收Instance对象,返回实例集合,并签订租约,client定时发动续租请求到server,server维护各个instance实例,服务之间调用不通过eureka。 #Eureka-Client client启动类实例顺序

ce4cfb65-fe72-474e-9e0b-295f3ec22e76.jpg
ce4cfb65-fe72-474e-9e0b-295f3ec22e76.jpg

EurekaInstanceConfigBean读取配置-> InstanceInfoFactory创建实例-> EurekaClientAutoConfiguration封装实例转换成-> ApplicationInfoManager 结合 EurekaClientConfigBean -> DiscoveryClient300行,调构造方法,并且发起注册

图解eureka-client创建
图解eureka-client创建

eureka原生Instance实例配置类

eureka原生Instance实例配置类
eureka原生Instance实例配置类

spring cloud 实现了这个接口也就是EurekaInstanceConfigBean读取配置文件,

spring-cloud自己实现了他的两种实例
spring-cloud自己实现了他的两种实例

与此同时 EurekaClientConfig spring cloud 提供的实现是EurekaClientConfigBean获取注册Instance的信息

EurekaClientConfigBean
EurekaClientConfigBean

配置初始化之后,EurekaClientAutoConfiguration(也是spring cloud 自己实现的一个类)会调用eurekaApplicationInfoManager来创建一个ApplicationInfoManager实例

生成ApplicationInfoManager实例
生成ApplicationInfoManager实例

创建完实例ApplicationInfoManager之后就到了client中最重要的类DiscoveryClient,这个类就是client向sever注册的直通车,他的构造方法内部

DiscoveryClient的构造方法
DiscoveryClient的构造方法

完成了一系列的操作,例如注册,启动心跳线程池,获取server端Instance实例集合。 接下来我们重点说说这个类: DiscoveryClient类的父类EurekaClient继承了LookupService接口,该接口定义了client从server获取实例的一系列方法

代码语言:javascript
复制
public interface LookupService<T> { 
   

    /** *通过appname获取实例 */
    Application getApplication(String appName);

    /** *获取实例列表,这个方法就是client端缓存服务数据的来源 */
    Applications getApplications();

    /** * 通过serverId获取实例 */
    List<InstanceInfo> getInstancesById(String id);

    /** * 从下一个eureka获取实例 */
    InstanceInfo getNextServerFromEureka(String virtualHostname, boolean secure);
}

启动client debug之,发现DiscoveryClient实现类里它里面存放了从server同步过来的Instance实例

server端同步过来的实例列表
server端同步过来的实例列表

他还实现了

eureka 原理_什么是swot分析方法
eureka 原理_什么是swot分析方法

DiscoveryClient的

eureka 原理_什么是swot分析方法
eureka 原理_什么是swot分析方法

方法,定时去刷server的Instance实例列表, DiscoveryClient的构造方法

eureka 原理_什么是swot分析方法
eureka 原理_什么是swot分析方法

会去把当前服务的实例注册上去,并启动定时任务

eureka 原理_什么是swot分析方法
eureka 原理_什么是swot分析方法
eureka 原理_什么是swot分析方法
eureka 原理_什么是swot分析方法
eureka 原理_什么是swot分析方法
eureka 原理_什么是swot分析方法

注册服务到server

eureka 原理_什么是swot分析方法
eureka 原理_什么是swot分析方法

这里的register实际上调用的是AbstractJerseyEurekaHttpClient的register

eureka 原理_什么是swot分析方法
eureka 原理_什么是swot分析方法

可以看到他实际上是给server发送了一个注册的请求,这样client端的启动就结束了。 #Eureka-Server sever启动首先就是用EurekaServerConfigBean,EurekaInstanceConfigBean,EurekaClientConfigBean 从配置文件读取配置,由于server端自己也是一个client,所以他也会走一遍client端的注册流程 配置完这些后,spring cloud 的 EurekaServerInitializerConfiguration会启动调用start方法,调用EurekaServerBootstrap的contextInitialized方法

eureka 原理_什么是swot分析方法
eureka 原理_什么是swot分析方法

执行完这个,server端就算是启动完成了。由于server端这两个方法比较复杂,并且对于我们理解原理没什么帮助,主需要理解他的字面意思就行。 #租约相关 LeaseManager这个接口定义了租约相关的操作,Lease类定义租约对象

代码语言:javascript
复制
public interface LeaseManager<T> { 
   
    //注册
    void register(T r, int leaseDuration, boolean isReplication);
    //注销
    boolean cancel(String appName, String id, boolean isReplication);
    //续租
    boolean renew(String appName, String id, boolean isReplication);
    //删除过期服务
    void evict();
   
}

spring cloud 的实现 InstanceRegistry这个类实现了租约相关的操作如注册,注销等等,接下来我们来谈谈注册 #注册

eureka 原理_什么是swot分析方法
eureka 原理_什么是swot分析方法

#Eureka-Client端 Eureka-Client 向 Eureka-Server 发起注册应用实例需要符合如下条件: 1.配置 eureka.registration.enabled = true,Eureka-Client 向 Eureka-Server 发起注册应用实例的开关。 2.InstanceInfo 在 Eureka-Client 和 Eureka-Server 数据不一致。 每次 InstanceInfo 发生属性变化时,标记 isInstanceInfoDirty 属性为 true,表示 InstanceInfo 在 Eureka-Client 和 Eureka-Server 数据不一致,需要注册。另外,InstanceInfo 刚被创建时,在 Eureka-Server 不存在,也会被注册。 当符合条件时,InstanceInfo 不会立即向 Eureka-Server 注册,而是后台线程定时注册。 当 InstanceInfo 的状态( status ) 属性发生变化时,并且配置 eureka.shouldOnDemandUpdateStatusChange = true 时,立即向 Eureka-Server 注册。因为状态属性非常重要,一般情况下建议开启,当然默认情况也是开启的。

注册的方法在DiscoveryClient

eureka 原理_什么是swot分析方法
eureka 原理_什么是swot分析方法

会创建一个InstanceInfoReplicator应用实例复制器 定时检查 InstanceInfo 的状态( status ) 属性是否发生变化。若是,发起注册。实现代码如下:

代码语言:javascript
复制
// InstanceInfoReplicator.java
@Override
public void run() { 
   
   try { 
   
       // 刷新 应用实例信息
       discoveryClient.refreshInstanceInfo();
       // 判断 应用实例信息 是否数据不一致
       Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
       if (dirtyTimestamp != null) { 
   
           // 发起注册
           discoveryClient.register();
           // 设置 应用实例信息 数据一致
           instanceInfo.unsetIsDirty(dirtyTimestamp);
       }
   } catch (Throwable t) { 
   
       logger.warn("There was a problem with the instance info replicator", t);
   } finally { 
   
       // 提交任务,并设置该任务的 Future
       Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
       scheduledPeriodicRef.set(next);
   }
}

// InstanceInfo.java
public synchronized long setIsDirtyWithTime() { 
   
   setIsDirty();
   return lastDirtyTimestamp;
}

public synchronized void unsetIsDirty(long unsetDirtyTimestamp) { 
   
   if (lastDirtyTimestamp <= unsetDirtyTimestamp) { 
   
       isInstanceInfoDirty = false;
   } else { 
   
   }
}

如果发现状态变化 会调用DiscoveryClient的

eureka 原理_什么是swot分析方法
eureka 原理_什么是swot分析方法

默认调用AbstractJerseyEurekaHttpClient的注册方法,发送请求给server发起注册

代码语言:javascript
复制
@Override
    public EurekaHttpResponse<Void> register(InstanceInfo info) { 
   
        String urlPath = "apps/" + info.getAppName();
        ClientResponse response = null;
        try { 
   
            Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
            addExtraHeaders(resourceBuilder);
            response = resourceBuilder
                    .header("Accept-Encoding", "gzip")
                    .type(MediaType.APPLICATION_JSON_TYPE)
                    .accept(MediaType.APPLICATION_JSON)
                    .post(ClientResponse.class, info);
            return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
        } finally { 
   
            if (logger.isDebugEnabled()) { 
   
                logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                        response == null ? "N/A" : response.getStatus());
            }
            if (response != null) { 
   
                response.close();
            }
        }
    }

并且注册完成后,server会返回当前注册到eureka的实例的集合,client会把他缓存在自己的内存中,定时刷新 #eureka-server端 ApplicationResource 类用来接收client端发来的请求

代码语言:javascript
复制
@Produces({ 
   "application/xml", "application/json"})
public class ApplicationResource { 
   

    @POST
    @Consumes({ 
   "application/json", "application/xml"})
    public Response addInstance(InstanceInfo info,
                                @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { 
   
        // 校验参数是否合法
        logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
        // validate that the instanceinfo contains all the necessary required fields
        if (isBlank(info.getId())) { 
   
            return Response.status(400).entity("Missing instanceId").build();
        } else if (isBlank(info.getHostName())) { 
   
            return Response.status(400).entity("Missing hostname").build();
        } else if (isBlank(info.getIPAddr())) { 
   
            return Response.status(400).entity("Missing ip address").build();
        } else if (isBlank(info.getAppName())) { 
   
            return Response.status(400).entity("Missing appName").build();
        } else if (!appName.equals(info.getAppName())) { 
   
            return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
        } else if (info.getDataCenterInfo() == null) { 
   
            return Response.status(400).entity("Missing dataCenterInfo").build();
        } else if (info.getDataCenterInfo().getName() == null) { 
   
            return Response.status(400).entity("Missing dataCenterInfo Name").build();
        }

        // AWS 相关,跳过
        // handle cases where clients may be registering with bad DataCenterInfo with missing data
        DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
        if (dataCenterInfo instanceof UniqueIdentifier) { 
   
            String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
            if (isBlank(dataCenterInfoId)) { 
   
                boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
                if (experimental) { 
   
                    String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
                    return Response.status(400).entity(entity).build();
                } else if (dataCenterInfo instanceof AmazonInfo) { 
   
                    AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
                    String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
                    if (effectiveId == null) { 
   
                        amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
                    }
                } else { 
   
                    logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
                }
            }
        }

        // 注册应用实例信息
        registry.register(info, "true".equals(isReplication));

        // 返回 204 成功
        return Response.status(204).build();  // 204 to be backwards compatible
    }

}

最终调用到PeerAwareInstanceRegistryImpl的register方法

代码语言:javascript
复制
@Override
public void register(final InstanceInfo info, final boolean isReplication) { 
   
   // 租约过期时间
   int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
   if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) { 
   
       leaseDuration = info.getLeaseInfo().getDurationInSecs();
   }
   // 注册应用实例信息
   super.register(info, leaseDuration, isReplication);
   // Eureka-Server 复制
   replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}

实现注册需要用到租约的Lease是定义租约的实例

代码语言:javascript
复制
public class Lease<T> { 
   

    /** * 实体 */
    private T holder;
    /** * 注册时间戳 */
    private long registrationTimestamp;
    /** * 开始服务时间戳 */
    private long serviceUpTimestamp;
    /** * 取消注册时间戳 */
    private long evictionTimestamp;
    /** * 最后更新时间戳 */
    // Make it volatile so that the expiration task would see this quicker
    private volatile long lastUpdateTimestamp;
    /** * 租约持续时长,单位:毫秒 */
    private long duration;

    public Lease(T r, int durationInSecs) { 
   
        holder = r;
        registrationTimestamp = System.currentTimeMillis();
        lastUpdateTimestamp = registrationTimestamp;
        duration = (durationInSecs * 1000);
    }
    
}

最终AbstractInstanceRegistry的register长这样

代码语言:javascript
复制
 1:  public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { 
   
 2:     try { 
   
 3:         // 获取读锁
 4:         read.lock();
 5:         Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
 6:         // 增加 注册次数 到 监控
 7:         REGISTER.increment(isReplication);
 8:         // 获得 应用实例信息 对应的 租约
 9:         if (gMap == null) { 
   
10:             final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
11:             gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap); // 添加 应用
12:             if (gMap == null) { 
    // 添加 应用 成功
13:                 gMap = gNewMap;
14:             }
15:         }
16:         Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
17:         // Retain the last dirty timestamp without overwriting it, if there is already a lease
18:         if (existingLease != null && (existingLease.getHolder() != null)) { 
    // 已存在时,使用数据不一致的时间大的应用注册信息为有效的
19:             Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); // Server 注册的 InstanceInfo
20:             Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); // Client 请求的 InstanceInfo
21:             logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
22: 
23:             // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
24:             // InstanceInfo instead of the server local copy.
25:             if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) { 
   
26:                 logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
27:                         " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
28:                 logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
29:                 registrant = existingLease.getHolder();
30:             }
31:         } else { 
   
32:             // The lease does not exist and hence it is a new registration
33:             // 【自我保护机制】增加 `numberOfRenewsPerMinThreshold` 、`expectedNumberOfRenewsPerMin`
34:             synchronized (lock) { 
   
35:                 if (this.expectedNumberOfRenewsPerMin > 0) { 
   
36:                     // Since the client wants to cancel it, reduce the threshold
37:                     // (1
38:                     // for 30 seconds, 2 for a minute)
39:                     this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
40:                     this.numberOfRenewsPerMinThreshold =
41:                             (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
42:                 }
43:             }
44:             logger.debug("No previous lease information found; it is new registration");
45:         }
46:         // 创建 租约
47:         Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
48:         if (existingLease != null) { 
    // 若租约已存在,设置 租约的开始服务的时间戳
49:             lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
50:         }
51:         // 添加到 租约映射
52:         gMap.put(registrant.getId(), lease);
53:         // 添加到 最近注册的调试队列
54:         synchronized (recentRegisteredQueue) { 
   
55:             recentRegisteredQueue.add(new Pair<Long, String>(
56:                     System.currentTimeMillis(),
57:                     registrant.getAppName() + "(" + registrant.getId() + ")"));
58:         }
59:         // 添加到 应用实例覆盖状态映射(Eureka-Server 初始化使用)
60:         // This is where the initial state transfer of overridden status happens
61:         if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) { 
   
62:             logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
63:                             + "overrides", registrant.getOverriddenStatus(), registrant.getId());
64:             if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) { 
   
65:                 logger.info("Not found overridden id {} and hence adding it", registrant.getId());
66:                 overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
67:             }
68:         }
69:         InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
70:         if (overriddenStatusFromMap != null) { 
   
71:             logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
72:             registrant.setOverriddenStatus(overriddenStatusFromMap);
73:         }
74: 
75:         // 获得应用实例最终状态,并设置应用实例的状态
76:         // Set the status based on the overridden status rules
77:         InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
78:         registrant.setStatusWithoutDirty(overriddenInstanceStatus);
79: 
80:         // 设置 租约的开始服务的时间戳(只有第一次有效)
81:         // If the lease is registered with UP status, set lease service up timestamp
82:         if (InstanceStatus.UP.equals(registrant.getStatus())) { 
   
83:             lease.serviceUp();
84:         }
85:         // 设置 应用实例信息的操作类型 为 添加
86:         registrant.setActionType(ActionType.ADDED);
87:         // 添加到 最近租约变更记录队列
88:         recentlyChangedQueue.add(new RecentlyChangedItem(lease));
89:         // 设置 租约的最后更新时间戳
90:         registrant.setLastUpdatedTimestamp();
91:         // 设置 响应缓存 过期
92:         invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
93:         logger.info("Registered instance {}/{} with status {} (replication={})",
94:                 registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
95:     } finally { 
   
96:         // 释放锁
97:         read.unlock();
98:     }
99: }

#续租 注册了租约以后 Eureka-Client 固定间隔向 Eureka-Server 发起续租( renew ),避免租约过期。默认情况下,租约有效期为 90 秒,续租频率为 30 秒。两者比例为 1 : 3 ,保证在网络异常等情况下,有三次重试的机会。 client端的续租任务

代码语言:javascript
复制
private void initScheduledTasks() { 
   

    // 向 Eureka-Server 心跳(续租)执行器
    if (clientConfig.shouldRegisterWithEureka()) { 
   
        int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); // 续租频率
        int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); //
        logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);

        // Heartbeat timer 这个heart线程就是发送续租请求的
        scheduler.schedule(
               new TimedSupervisorTask(
                       "heartbeat",
                       scheduler,
                       heartbeatExecutor,
                       renewalIntervalInSecs,
                       TimeUnit.SECONDS,
                       expBackOffBound,
                       new HeartbeatThread()
               ),
               renewalIntervalInSecs, TimeUnit.SECONDS);
               
          // ... 省略无关代码
     }
     // ... 省略无关代码
}

这个定时任务在DiscoveryClient的构造方法中启动按照配置定时向server发送续租请求。 调用这个线程run方法

代码语言:javascript
复制
// DiscoveryClient.java
/** * 最后成功向 Eureka-Server 心跳时间戳 */
private volatile long lastSuccessfulHeartbeatTimestamp = -1;

private class HeartbeatThread implements Runnable { 
   

   public void run() { 
   
       if (renew()) { 
   
           lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
       }
   }
}

renew方法

代码语言:javascript
复制
// DiscoveryClient.java
boolean renew() { 
   
   EurekaHttpResponse<InstanceInfo> httpResponse;
   try { 
   
       httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
       logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
       if (httpResponse.getStatusCode() == 404) { 
   
           REREGISTER_COUNTER.increment();
           logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
           long timestamp = instanceInfo.setIsDirtyWithTime();
           // 发起注册
           boolean success = register();
           if (success) { 
   
               instanceInfo.unsetIsDirty(timestamp);
           }
           return success;
       }
       return httpResponse.getStatusCode() == 200;
   } catch (Throwable e) { 
   
       logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
       return false;
   }
}

sendHeartBeat

代码语言:javascript
复制
// AbstractJerseyEurekaHttpClient.java
@Override
public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) { 
   
   String urlPath = "apps/" + appName + '/' + id;
   ClientResponse response = null;
   try { 
   
       WebResource webResource = jerseyClient.resource(serviceUrl)
               .path(urlPath)
               .queryParam("status", info.getStatus().toString())
               .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());
       if (overriddenStatus != null) { 
   
           webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name());
       }
       Builder requestBuilder = webResource.getRequestBuilder();
       addExtraHeaders(requestBuilder);
       response = requestBuilder.put(ClientResponse.class);
       EurekaHttpResponseBuilder<InstanceInfo> eurekaResponseBuilder = anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response));
       if (response.hasEntity()) { 
   
           eurekaResponseBuilder.entity(response.getEntity(InstanceInfo.class));
       }
       return eurekaResponseBuilder.build();
   } finally { 
   
       if (logger.isDebugEnabled()) { 
   
           logger.debug("Jersey HTTP PUT {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
       }
       if (response != null) { 
   
           response.close();
       }
   }
}

#Eureka-Server 接收续租 接收续租调用的是InstanceResource的renewLease方法

代码语言:javascript
复制
@PUT
    public Response renewLease(
            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
            @QueryParam("overriddenstatus") String overriddenStatus,
            @QueryParam("status") String status,
            @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) { 
   
        boolean isFromReplicaNode = "true".equals(isReplication);
        boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);

        // Not found in the registry, immediately ask for a register
        if (!isSuccess) { 
   
            logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
            return Response.status(Status.NOT_FOUND).build();
        }
        // Check if we need to sync based on dirty time stamp, the client
        // instance might have changed some value
        Response response = null;
        if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) { 
   
            response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
            // Store the overridden status since the validation found out the node that replicates wins
            if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
                    && (overriddenStatus != null)
                    && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
                    && isFromReplicaNode) { 
   
                registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
            }
        } else { 
   
            response = Response.ok().build();
        }
        logger.debug("Found (Renew): {} - {}; reply status={}" + app.getName(), id, response.getStatus());
        return response;
    }

这里调用了LeaseManager#renew方法这个接口在spring cloud里的实现是InstanceRegistry 最终调用PeerAwareInstanceRegistryImpl#renew方法

代码语言:javascript
复制
// PeerAwareInstanceRegistryImpl.java
public boolean renew(final String appName, final String id, final boolean isReplication) { 
   
   if (super.renew(appName, id, isReplication)) { 
    // 续租
       // Eureka-Server 复制
       replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
       return true;
   }
   return false;
}

这里继续往上调AbstractInstanceRegistry的renew方法

代码语言:javascript
复制
public boolean renew(String appName, String id, boolean isReplication) { 
   
        RENEW.increment(isReplication);
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToRenew = null;
        if (gMap != null) { 
   
            leaseToRenew = gMap.get(id);
        }
        if (leaseToRenew == null) { 
   
            RENEW_NOT_FOUND.increment(isReplication);
            logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
            return false;
        } else { 
   
            InstanceInfo instanceInfo = leaseToRenew.getHolder();
            if (instanceInfo != null) { 
   
                // touchASGCache(instanceInfo.getASGName());
                InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                        instanceInfo, leaseToRenew, isReplication);
                if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) { 
   
                    logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                            + "; re-register required", instanceInfo.getId());
                    RENEW_NOT_FOUND.increment(isReplication);
                    return false;
                }
                if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) { 
   
                    Object[] args = { 
   
                            instanceInfo.getStatus().name(),
                            instanceInfo.getOverriddenStatus().name(),
                            instanceInfo.getId()
                    };
                    logger.info(
                            "The instance status {} is different from overridden instance status {} for instance {}. "
                                    + "Hence setting the status to overridden status", args);
                    instanceInfo.setStatus(overriddenInstanceStatus);
                }
            }
            renewsLastMin.increment();
            leaseToRenew.renew();
            return true;
        }
    }

#下线 Eureka-Client 发起下线,需要满足如下条件才可发起: 配置 eureka.registration.enabled = true ,应用实例开启注册开关。默认为 false 。 配置 eureka.shouldUnregisterOnShutdown = true ,应用实例开启关闭时下线开关。默认为 true 。 调用的是DiscoveryClient

代码语言:javascript
复制
// DiscoveryClient.java
public synchronized void shutdown() { 
   

    // ... 省略无关代码

    // If APPINFO was registered
    if (applicationInfoManager != null
         && clientConfig.shouldRegisterWithEureka() // eureka.registration.enabled = true
         && clientConfig.shouldUnregisterOnShutdown()) { 
    // eureka.shouldUnregisterOnShutdown = true
        applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
        unregister();//这里设置实例的status为DOWN
    }
}

Eureka-Server 接收下线,InstanceResource#cancelLease

代码语言:javascript
复制
@DELETE
public Response cancelLease(
       @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { 
   
   // 下线
   boolean isSuccess = registry.cancel(app.getName(), id, "true".equals(isReplication));

   if (isSuccess) { 
    // 下线成功
       logger.debug("Found (Cancel): " + app.getName() + " - " + id);
       return Response.ok().build();
   } else { 
    // 下线成功
       logger.info("Not Found (Cancel): " + app.getName() + " - " + id);
       return Response.status(Status.NOT_FOUND).build();
   }
}

最终调用到AbstractInstanceRegistry的internalCancel方法

代码语言:javascript
复制
 protected boolean internalCancel(String appName, String id, boolean isReplication) { 
   
        try { 
   
            read.lock();
            CANCEL.increment(isReplication);
            Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
            Lease<InstanceInfo> leaseToCancel = null;
            if (gMap != null) { 
   
                leaseToCancel = gMap.remove(id);
            }
            synchronized (recentCanceledQueue) { 
   
                recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
            }
            InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
            if (instanceStatus != null) { 
   
                logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
            }
            if (leaseToCancel == null) { 
   
                CANCEL_NOT_FOUND.increment(isReplication);
                logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
                return false;
            } else { 
   
                leaseToCancel.cancel();
                InstanceInfo instanceInfo = leaseToCancel.getHolder();
                String vip = null;
                String svip = null;
                if (instanceInfo != null) { 
   
                    instanceInfo.setActionType(ActionType.DELETED);
                    recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
                    instanceInfo.setLastUpdatedTimestamp();
                    vip = instanceInfo.getVIPAddress();
                    svip = instanceInfo.getSecureVipAddress();
                }
                invalidateCache(appName, vip, svip);
                logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
                return true;
            }
        } finally { 
   
            read.unlock();
        }
    }

#过期 正常情况下,应用实例下线时候会主动向 Eureka-Server 发起下线请求。但实际情况下,应用实例可能异常崩溃,又或者是网络异常等原因,导致下线请求无法被成功提交。 介于这种情况,通过 Eureka-Client 心跳延长租约,配合 Eureka-Server 清理超时的租约解决上述异常。 AbstractInstanceRegistry.EvictionTask在eureka-server启动时初始化定时任务

代码语言:javascript
复制
// AbstractInstanceRegistry.java
/** * 清理租约过期任务 */
private final AtomicReference<EvictionTask> evictionTaskRef = new AtomicReference<EvictionTask>();

protected void postInit() { 
   
   // .... 省略无关代码

   // 初始化 清理租约过期任务
   if (evictionTaskRef.get() != null) { 
   
       evictionTaskRef.get().cancel();
   }
   evictionTaskRef.set(new EvictionTask());
   evictionTimer.schedule(evictionTaskRef.get(),
           serverConfig.getEvictionIntervalTimerInMs(),
           serverConfig.getEvictionIntervalTimerInMs());
}

配置 eureka.evictionIntervalTimerInMs ,清理租约过期任务执行频率,单位:毫秒。默认,60000 毫秒。 eureka启动流程大体上就是这样,如有补充欢迎指正。

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/179149.html原文链接:https://javaforall.cn

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档