前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >SpringCloud源码学习(二) 面试官问我Eurake服务注册的实现细节?

SpringCloud源码学习(二) 面试官问我Eurake服务注册的实现细节?

作者头像
写一点笔记
发布2020-11-02 11:39:58
5690
发布2020-11-02 11:39:58
举报
文章被收录于专栏:程序员备忘录程序员备忘录

上期我们大概得了解了Eurake向Spring上下文中注册EurekaServiceRegistry和EurekaRegistration的详细过程,其中总调度类EurekaAutoServiceRegistration还专门采用lifeCycle的方式实现。昨天的分析中我们指出EurekaServiceRegistry是分析的重点,因此今天我们就重点突破一下这块的具体逻辑。

代码语言:javascript
复制
public void register(EurekaRegistration reg) {
    this.maybeInitializeClient(reg);
    if (log.isInfoEnabled()) {
    //打印服务的状态,UP表示正常启动
        log.info("Registering application " + reg.getInstanceConfig().getAppname() + " with eureka with status " + reg.getInstanceConfig().getInitialStatus());
    }
    //设置启动状态
    reg.getApplicationInfoManager().setInstanceStatus(reg.getInstanceConfig().getInitialStatus());
    if (reg.getHealthCheckHandler() != null) {
        reg.getEurekaClient().registerHealthCheck(reg.getHealthCheckHandler());
    }
}

我们原来以为EurekaServiceRegistry有大量的动作,但是发现这个类中什么也没有。等于与一个壳子,重点全是EurekaRegistration既然如此我们就重新分析EurekaRegistration吧。

发现这里有两个Eurake的客户端,但是cloudEurekaClient并没有怎么用唉,好像这里的eurekaClient才是真的老大。现在这里注入进来了,我们看看它是如何初始化的。这块我们直接DiscoveryClient的实现。

在DiscoveryClient的构造方法中,有几个线程相关的,分别是定时任务处理器、心跳线程池、缓存线程池。我们做过SpringCloud的同志都知道服务注册是通过定时任务去拉取服务信息,通过心态检测是否有服务宕机的。除此之外如果注册中心宕机了也会采用缓存。

代码语言:javascript
复制
private void initScheduledTasks() {
    int renewalIntervalInSecs;
    int expBackOffBound;
    //判断是否拉取配置,默认为true
    if (this.clientConfig.shouldFetchRegistry()) {
    //多久拿一次,默认为30秒
        renewalIntervalInSecs = this.clientConfig.getRegistryFetchIntervalSeconds();
        //超时容许时间的倍数,和获取心跳或者服务信息失败的时间差的扩容有关系 默认10秒
        expBackOffBound = this.clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        //这里采用了代理模式,都是runnable接口哦,比较绕
        //启动刷新缓存的线程,也就是获取服务信息,全部打偶放到了定时任务中。
        this.scheduler.schedule(new TimedSupervisorTask("cacheRefresh", this.scheduler, this.cacheRefreshExecutor, renewalIntervalInSecs, 
        TimeUnit.SECONDS, expBackOffBound, new DiscoveryClient.CacheRefreshThread()), (long)renewalIntervalInSecs, TimeUnit.SECONDS);
    }
//判断是否向eureka注册 默认为true
    if (this.clientConfig.shouldRegisterWithEureka()) {
    //多久获取一次服务信息 默认30秒
        renewalIntervalInSecs = this.instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
         //超时容许时间的倍数,和获取心跳或者服务信息失败的时间差的扩容有关系 默认10秒
        expBackOffBound = this.clientConfig.getHeartbeatExecutorExponentialBackOffBound();
        logger.info("Starting heartbeat executor: renew interval is: " + renewalIntervalInSecs);
        //启动心跳线程
        this.scheduler.schedule(new TimedSupervisorTask("heartbeat", this.scheduler, this.heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new DiscoveryClient.HeartbeatThread()), (long)renewalIntervalInSecs, TimeUnit.SECONDS);
        this.instanceInfoReplicator = new InstanceInfoReplicator(this, this.instanceInfo, this.clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2);
        //定义一个监听器,用来看获取定时任务还在吗
        this.statusChangeListener = new StatusChangeListener() {
            public String getId() {
                return "statusChangeListener";
            }
            public void notify(StatusChangeEvent statusChangeEvent) {
                if (InstanceStatus.DOWN != statusChangeEvent.getStatus() && InstanceStatus.DOWN != statusChangeEvent.getPreviousStatus()) {
                    DiscoveryClient.logger.info("Saw local status change event {}", statusChangeEvent);
                } else {
                    DiscoveryClient.logger.warn("Saw local status change event {}", statusChangeEvent);
                }
                DiscoveryClient.this.instanceInfoReplicator.onDemandUpdate();
            }
        };
        if (this.clientConfig.shouldOnDemandUpdateStatusChange()) {
            this.applicationInfoManager.registerStatusChangeListener(this.statusChangeListener);
        }
        //启动监听
        this.instanceInfoReplicator.start(this.clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
    } else {
        logger.info("Not registering with Eureka server per configuration");
    }

}

上述代码只要用来获取服务的注册和发现。也对上述功能的线程进行了监听。上边的代理模式,我们打开看看TimedSupervisorTask

代码语言:javascript
复制
public void run() {
    Future future = null;
    try {
    //这里task是注册刷新的任务 
        future = this.executor.submit(this.task);
        //获取活动线程的个数
        this.threadPoolLevelGauge.set((long)this.executor.getActiveCount());
        //使用future任务类型,这里获取拿到的服务信息
        future.get(this.timeoutMillis, TimeUnit.MILLISECONDS);
        this.delay.set(this.timeoutMillis);
        this.threadPoolLevelGauge.set((long)this.executor.getActiveCount());
    } catch (TimeoutException var12) {
    //如果超时了,这里还会将当前的时间间隔扩大一倍
        logger.error("task supervisor timed out", var12);
        this.timeoutCounter.increment();
        long currentDelay = this.delay.get();
        long newDelay = Math.min(this.maxDelay, currentDelay * 2L);
        this.delay.compareAndSet(currentDelay, newDelay);
    } catch (RejectedExecutionException var13) {
        if (!this.executor.isShutdown() && !this.scheduler.isShutdown()) {
            logger.error("task supervisor rejected the task", var13);
        } else {
            logger.warn("task supervisor shutting down, reject the task", var13);
        }
        this.rejectedCounter.increment();
    } catch (Throwable var14) {
    //如果异常服务处理,就直接中断了
        if (!this.executor.isShutdown() && !this.scheduler.isShutdown()) {
            logger.error("task supervisor threw an exception", var14);
        } else {
            logger.warn("task supervisor shutting down, can't accept the task");
        }
        this.throwableCounter.increment();
    } finally {
        if (future != null) {
            future.cancel(true);
        }
        if (!this.scheduler.isShutdown()) {
        //如果没有启动的话就重新创建一个
            this.scheduler.schedule(this, this.delay.get(), TimeUnit.MILLISECONDS);
        }
    }
}
代码语言:javascript
复制
    @VisibleForTesting
    void refreshRegistry() {
        try {
            boolean isFetchingRemoteRegionRegistries = this.isFetchingRemoteRegionRegistries();
            boolean remoteRegionsModified = false;
            //先会判断这个值是否为空,如果为空话就从这里获取配置
            String latestRemoteRegions = this.clientConfig.fetchRegistryForRemoteRegions();
            if (null != latestRemoteRegions) {
                String currentRemoteRegions = (String)this.remoteRegionsToFetch.get();
                if (!latestRemoteRegions.equals(currentRemoteRegions)) {
                    synchronized(this.instanceRegionChecker.getAzToRegionMapper()) {
                        if (this.remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
                            //设置最新的服务地址,用来获取服务配置
                            String[] remoteRegions = latestRemoteRegions.split(",");
                            this.remoteRegionsRef.set(remoteRegions);
                            this.instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
                            remoteRegionsModified = true;
                        } else {
                            logger.info("Remote regions to fetch modified concurrently, ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
                        }
                    }
                } else {
                    this.instanceRegionChecker.getAzToRegionMapper().refreshMapping();
                }
            }
           //查看从eureka服务上获取配置是否成功, 真正的获取服务信息
            boolean success = this.fetchRegistry(remoteRegionsModified);
            if (success) {
                this.registrySize = ((Applications)this.localRegionApps.get()).size();
                this.lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
            }
            if (logger.isDebugEnabled()) {
                StringBuilder allAppsHashCodes = new StringBuilder();
                allAppsHashCodes.append("Local region apps hashcode: ");
                allAppsHashCodes.append(((Applications)this.localRegionApps.get()).getAppsHashCode());
                allAppsHashCodes.append(", is fetching remote regions? ");
                allAppsHashCodes.append(isFetchingRemoteRegionRegistries);
                Iterator var11 = this.remoteRegionVsApps.entrySet().iterator();
                while(var11.hasNext()) {
                    Entry<String, Applications> entry = (Entry)var11.next();
                    allAppsHashCodes.append(", Remote region: ");
                    allAppsHashCodes.append((String)entry.getKey());
                    allAppsHashCodes.append(" , apps hashcode: ");
                    allAppsHashCodes.append(((Applications)entry.getValue()).getAppsHashCode());
                }
                logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ", allAppsHashCodes.toString());
            }
        } catch (Throwable var9) {
            logger.error("Cannot fetch registry from server", var9);
        }
    }

在从Eureka服务器上获取服务所有信息的时候代码是这样写的

代码语言:javascript
复制
    private void getAndStoreFullRegistry() throws Throwable {
        long currentUpdateGeneration = this.fetchRegistryGeneration.get();
        logger.info("Getting all instance registry info from the eureka server");
        Applications apps = null;
        //这块就是从拉取服务所有信息的
        EurekaHttpResponse<Applications> httpResponse = 
        this.clientConfig.getRegistryRefreshSingleVipAddress() == null ?
        this.eurekaTransport.queryClient.getApplications((String[])this.remoteRegionsRef.get()) : this.eurekaTransport.queryClient.getVip(this.clientConfig.getRegistryRefreshSingleVipAddress(), (String[])this.remoteRegionsRef.get());
        if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
            //处理成实体类型
            apps = (Applications)httpResponse.getEntity();
        }
        logger.info("The response status is {}", httpResponse.getStatusCode());
        if (apps == null) {
            logger.error("The application is null for some reason. Not storing this information");
        } else if (this.fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1L)) {
            this.localRegionApps.set(this.filterAndShuffle(apps));
            logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
        } else {
            logger.warn("Not updating applications as another thread is updating it already");
        }
    }

看到这里,我也蒙了。本来一串也走下来了,但是让我疑惑的是我的serviceurl在哪里设置进去的?找了一圈remoteRegionsRef也没找到。fetch-remote-regions-registry这个配置是找到了,当我把这两个都配置上的时候就报错了。但是用Idea看的时候确实没有serviceUrl的踪迹。所以我是觉得是反编译的问题?有知道的朋友可以给我留言哦!谢谢

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-10-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 程序员备忘录 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
微服务引擎 TSE
微服务引擎(Tencent Cloud Service Engine)提供开箱即用的云上全场景微服务解决方案。支持开源增强的云原生注册配置中心(Zookeeper、Nacos 和 Apollo),北极星网格(腾讯自研并开源的 PolarisMesh)、云原生 API 网关(Kong)以及微服务应用托管的弹性微服务平台。微服务引擎完全兼容开源版本的使用方式,在功能、可用性和可运维性等多个方面进行增强。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档