上期我们大概得了解了Eurake向Spring上下文中注册EurekaServiceRegistry和EurekaRegistration的详细过程,其中总调度类EurekaAutoServiceRegistration还专门采用lifeCycle的方式实现。昨天的分析中我们指出EurekaServiceRegistry是分析的重点,因此今天我们就重点突破一下这块的具体逻辑。
public void register(EurekaRegistration reg) {
this.maybeInitializeClient(reg);
if (log.isInfoEnabled()) {
//打印服务的状态,UP表示正常启动
log.info("Registering application " + reg.getInstanceConfig().getAppname() + " with eureka with status " + reg.getInstanceConfig().getInitialStatus());
}
//设置启动状态
reg.getApplicationInfoManager().setInstanceStatus(reg.getInstanceConfig().getInitialStatus());
if (reg.getHealthCheckHandler() != null) {
reg.getEurekaClient().registerHealthCheck(reg.getHealthCheckHandler());
}
}
我们原来以为EurekaServiceRegistry有大量的动作,但是发现这个类中什么也没有。等于与一个壳子,重点全是EurekaRegistration既然如此我们就重新分析EurekaRegistration吧。
发现这里有两个Eurake的客户端,但是cloudEurekaClient并没有怎么用唉,好像这里的eurekaClient才是真的老大。现在这里注入进来了,我们看看它是如何初始化的。这块我们直接DiscoveryClient的实现。
在DiscoveryClient的构造方法中,有几个线程相关的,分别是定时任务处理器、心跳线程池、缓存线程池。我们做过SpringCloud的同志都知道服务注册是通过定时任务去拉取服务信息,通过心态检测是否有服务宕机的。除此之外如果注册中心宕机了也会采用缓存。
private void initScheduledTasks() {
int renewalIntervalInSecs;
int expBackOffBound;
//判断是否拉取配置,默认为true
if (this.clientConfig.shouldFetchRegistry()) {
//多久拿一次,默认为30秒
renewalIntervalInSecs = this.clientConfig.getRegistryFetchIntervalSeconds();
//超时容许时间的倍数,和获取心跳或者服务信息失败的时间差的扩容有关系 默认10秒
expBackOffBound = this.clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
//这里采用了代理模式,都是runnable接口哦,比较绕
//启动刷新缓存的线程,也就是获取服务信息,全部打偶放到了定时任务中。
this.scheduler.schedule(new TimedSupervisorTask("cacheRefresh", this.scheduler, this.cacheRefreshExecutor, renewalIntervalInSecs,
TimeUnit.SECONDS, expBackOffBound, new DiscoveryClient.CacheRefreshThread()), (long)renewalIntervalInSecs, TimeUnit.SECONDS);
}
//判断是否向eureka注册 默认为true
if (this.clientConfig.shouldRegisterWithEureka()) {
//多久获取一次服务信息 默认30秒
renewalIntervalInSecs = this.instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
//超时容许时间的倍数,和获取心跳或者服务信息失败的时间差的扩容有关系 默认10秒
expBackOffBound = this.clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: renew interval is: " + renewalIntervalInSecs);
//启动心跳线程
this.scheduler.schedule(new TimedSupervisorTask("heartbeat", this.scheduler, this.heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new DiscoveryClient.HeartbeatThread()), (long)renewalIntervalInSecs, TimeUnit.SECONDS);
this.instanceInfoReplicator = new InstanceInfoReplicator(this, this.instanceInfo, this.clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2);
//定义一个监听器,用来看获取定时任务还在吗
this.statusChangeListener = new StatusChangeListener() {
public String getId() {
return "statusChangeListener";
}
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN != statusChangeEvent.getStatus() && InstanceStatus.DOWN != statusChangeEvent.getPreviousStatus()) {
DiscoveryClient.logger.info("Saw local status change event {}", statusChangeEvent);
} else {
DiscoveryClient.logger.warn("Saw local status change event {}", statusChangeEvent);
}
DiscoveryClient.this.instanceInfoReplicator.onDemandUpdate();
}
};
if (this.clientConfig.shouldOnDemandUpdateStatusChange()) {
this.applicationInfoManager.registerStatusChangeListener(this.statusChangeListener);
}
//启动监听
this.instanceInfoReplicator.start(this.clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
上述代码只要用来获取服务的注册和发现。也对上述功能的线程进行了监听。上边的代理模式,我们打开看看TimedSupervisorTask
public void run() {
Future future = null;
try {
//这里task是注册刷新的任务
future = this.executor.submit(this.task);
//获取活动线程的个数
this.threadPoolLevelGauge.set((long)this.executor.getActiveCount());
//使用future任务类型,这里获取拿到的服务信息
future.get(this.timeoutMillis, TimeUnit.MILLISECONDS);
this.delay.set(this.timeoutMillis);
this.threadPoolLevelGauge.set((long)this.executor.getActiveCount());
} catch (TimeoutException var12) {
//如果超时了,这里还会将当前的时间间隔扩大一倍
logger.error("task supervisor timed out", var12);
this.timeoutCounter.increment();
long currentDelay = this.delay.get();
long newDelay = Math.min(this.maxDelay, currentDelay * 2L);
this.delay.compareAndSet(currentDelay, newDelay);
} catch (RejectedExecutionException var13) {
if (!this.executor.isShutdown() && !this.scheduler.isShutdown()) {
logger.error("task supervisor rejected the task", var13);
} else {
logger.warn("task supervisor shutting down, reject the task", var13);
}
this.rejectedCounter.increment();
} catch (Throwable var14) {
//如果异常服务处理,就直接中断了
if (!this.executor.isShutdown() && !this.scheduler.isShutdown()) {
logger.error("task supervisor threw an exception", var14);
} else {
logger.warn("task supervisor shutting down, can't accept the task");
}
this.throwableCounter.increment();
} finally {
if (future != null) {
future.cancel(true);
}
if (!this.scheduler.isShutdown()) {
//如果没有启动的话就重新创建一个
this.scheduler.schedule(this, this.delay.get(), TimeUnit.MILLISECONDS);
}
}
}
@VisibleForTesting
void refreshRegistry() {
try {
boolean isFetchingRemoteRegionRegistries = this.isFetchingRemoteRegionRegistries();
boolean remoteRegionsModified = false;
//先会判断这个值是否为空,如果为空话就从这里获取配置
String latestRemoteRegions = this.clientConfig.fetchRegistryForRemoteRegions();
if (null != latestRemoteRegions) {
String currentRemoteRegions = (String)this.remoteRegionsToFetch.get();
if (!latestRemoteRegions.equals(currentRemoteRegions)) {
synchronized(this.instanceRegionChecker.getAzToRegionMapper()) {
if (this.remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
//设置最新的服务地址,用来获取服务配置
String[] remoteRegions = latestRemoteRegions.split(",");
this.remoteRegionsRef.set(remoteRegions);
this.instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
remoteRegionsModified = true;
} else {
logger.info("Remote regions to fetch modified concurrently, ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
}
}
} else {
this.instanceRegionChecker.getAzToRegionMapper().refreshMapping();
}
}
//查看从eureka服务上获取配置是否成功, 真正的获取服务信息
boolean success = this.fetchRegistry(remoteRegionsModified);
if (success) {
this.registrySize = ((Applications)this.localRegionApps.get()).size();
this.lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
}
if (logger.isDebugEnabled()) {
StringBuilder allAppsHashCodes = new StringBuilder();
allAppsHashCodes.append("Local region apps hashcode: ");
allAppsHashCodes.append(((Applications)this.localRegionApps.get()).getAppsHashCode());
allAppsHashCodes.append(", is fetching remote regions? ");
allAppsHashCodes.append(isFetchingRemoteRegionRegistries);
Iterator var11 = this.remoteRegionVsApps.entrySet().iterator();
while(var11.hasNext()) {
Entry<String, Applications> entry = (Entry)var11.next();
allAppsHashCodes.append(", Remote region: ");
allAppsHashCodes.append((String)entry.getKey());
allAppsHashCodes.append(" , apps hashcode: ");
allAppsHashCodes.append(((Applications)entry.getValue()).getAppsHashCode());
}
logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ", allAppsHashCodes.toString());
}
} catch (Throwable var9) {
logger.error("Cannot fetch registry from server", var9);
}
}
在从Eureka服务器上获取服务所有信息的时候代码是这样写的
private void getAndStoreFullRegistry() throws Throwable {
long currentUpdateGeneration = this.fetchRegistryGeneration.get();
logger.info("Getting all instance registry info from the eureka server");
Applications apps = null;
//这块就是从拉取服务所有信息的
EurekaHttpResponse<Applications> httpResponse =
this.clientConfig.getRegistryRefreshSingleVipAddress() == null ?
this.eurekaTransport.queryClient.getApplications((String[])this.remoteRegionsRef.get()) : this.eurekaTransport.queryClient.getVip(this.clientConfig.getRegistryRefreshSingleVipAddress(), (String[])this.remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
//处理成实体类型
apps = (Applications)httpResponse.getEntity();
}
logger.info("The response status is {}", httpResponse.getStatusCode());
if (apps == null) {
logger.error("The application is null for some reason. Not storing this information");
} else if (this.fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1L)) {
this.localRegionApps.set(this.filterAndShuffle(apps));
logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
} else {
logger.warn("Not updating applications as another thread is updating it already");
}
}
看到这里,我也蒙了。本来一串也走下来了,但是让我疑惑的是我的serviceurl在哪里设置进去的?找了一圈remoteRegionsRef也没找到。fetch-remote-regions-registry这个配置是找到了,当我把这两个都配置上的时候就报错了。但是用Idea看的时候确实没有serviceUrl的踪迹。所以我是觉得是反编译的问题?有知道的朋友可以给我留言哦!谢谢