前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >SpringCloud源码:客户端分析(二)- 客户端源码分析

SpringCloud源码:客户端分析(二)- 客户端源码分析

原创
作者头像
后台技术汇
发布2024-10-03 09:41:45
1160
发布2024-10-03 09:41:45
举报
文章被收录于专栏:后台技术汇

背景

我们继续分析EurekaClient的两个自动化配置类:

自动化配置类

功能

职责

EurekaClientAutoConfiguration

配置EurekaClient

确保了Eureka客户端能够正确地:- 注册到Eureka服务端- 周期性地发送心跳信息来更新服务租约- 下线时通知Eureka服务端- 获取服务实例列表;更侧重于Eureka客户端的基本配置和功能实现

EurekaDiscoveryClientConfiguration

配置EurekaDiscoveryClient

创建RefreshScopeRefreshedEvent事件的监听类,用于重启注册;更多地涉及到服务的自动注册、健康检查以及事件处理等方面

CloudEurekaClient分析

原理

客户端本质就是4个动作:

  1. 获取服务列表
  2. 注册服务实例
  3. 租约续约
  4. 取消租约

源码

让我们继续关注 第一个自动装配类 EurekaClientAutoConfiguration 对CloudEurekaClient 的构造封装,即如下代码块:

代码语言:txt
复制

@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class,
      search = SearchStrategy.CURRENT)
public EurekaClient eurekaClient(ApplicationInfoManager manager,
      EurekaClientConfig config) {
   return new CloudEurekaClient(manager, config, this.optionalArgs,
         this.context);
}

分析代码:

  • CloudEurekaClient对象,并交给容器管理bean

CloudEurekaClient   

代码语言:txt
复制

public class CloudEurekaClient extends DiscoveryClient {
    public CloudEurekaClient(ApplicationInfoManager applicationInfoManager,
          EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs<?> args,
          ApplicationEventPublisher publisher) {
       super(applicationInfoManager, config, args);
       this.applicationInfoManager = applicationInfoManager;
       this.publisher = publisher;
       this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class,
             "eurekaTransport");
       ReflectionUtils.makeAccessible(this.eurekaTransportField);
    }
}

分析代码:

  • 实际上CloudEurekaClient调用了父类DiscoveryClient的构造器

DiscoveryClient

经历了多个重载构造器的嵌套,我们进入了最终的构造器:

代码语言:txt
复制

private final ScheduledExecutorService scheduler;
// additional executors for supervised subtasks
private final ThreadPoolExecutor heartbeatExecutor;
private final ThreadPoolExecutor cacheRefreshExecutor;

@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
    // .... 一些初始化工作

    logger.info("Initializing Eureka in region {}", clientConfig.getRegion());

    try {
        // default size of 2 - 1 each for heartbeat and cacheRefresh
        scheduler = Executors.newScheduledThreadPool(2,
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-%d")
                        .setDaemon(true)
                        .build());

        heartbeatExecutor = new ThreadPoolExecutor(
                1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(),
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                        .setDaemon(true)
                        .build()
        );  // use direct handoff

        cacheRefreshExecutor = new ThreadPoolExecutor(
                1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(),
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                        .setDaemon(true)
                        .build()
        );  // use direct handoff

        eurekaTransport = new EurekaTransport();
        scheduleServerEndpointTask(eurekaTransport, args);

        AzToRegionMapper azToRegionMapper;
        if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
            azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
        } else {
            azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
        }
        if (null != remoteRegionsToFetch.get()) {
            azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
        }
        instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
    } catch (Throwable e) {
        throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
    }

    // .......

    if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
        try {
            if (!register() ) {
                throw new IllegalStateException("Registration error at startup. Invalid server response.");
            }
        } catch (Throwable th) {
            logger.error("Registration error at startup: {}", th.getMessage());
            throw new IllegalStateException(th);
        }
    }

    // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
    initScheduledTasks();

    // ...其他初始化工作
}
    

代码分析:

  • 这里初始化了3个异步线程池:scheduler、heartbeatExecutor、cacheRefreshExecutor
    • scheduler:coreSize=2的周期任务线程池,线程名命名是DiscoveryClient-%s
    • heartbeatExecutor:coreSize=1的异步线程池,线程名命名是DiscoveryClient-HeartbeatExecutor-%d
    • cacheRefreshExecutor:coreSize=1的异步线程池,线程名命名是DiscoveryClient-CacheRefreshExecutor-%d
  • 这三个线程池,是怎么配合工作的呢?不着急,慢慢往下看

initScheduledTasks()的代码如下:
代码语言:txt
复制
private void initScheduledTasks() {
    if (clientConfig.shouldFetchRegistry()) {
        // registry cache refresh timer
        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        cacheRefreshTask = new TimedSupervisorTask(
                "cacheRefresh",
                scheduler,
                cacheRefreshExecutor,
                registryFetchIntervalSeconds,
                TimeUnit.SECONDS,
                expBackOffBound,
                new CacheRefreshThread()
        );
        // 【1】
        scheduler.schedule(
                cacheRefreshTask,
                registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }

    if (clientConfig.shouldRegisterWithEureka()) {
        int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
        int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
        logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);

        // Heartbeat timer
        heartbeatTask = new TimedSupervisorTask(
                "heartbeat",
                scheduler,
                heartbeatExecutor,
                renewalIntervalInSecs,
                TimeUnit.SECONDS,
                expBackOffBound,
                new HeartbeatThread()
        );

        // 【2】
        scheduler.schedule(
                heartbeatTask,
                renewalIntervalInSecs, TimeUnit.SECONDS);

    } else {
        logger.info("Not registering with Eureka server per configuration");
    }
}

分析代码:

  • 【1】检查是否需要获取注册表信息(配置项eureka.client.fetchRegistry默认=true)
    • 用注入的异步线程池cacheRefreshExecutor,按指定时间间隔registryFetchIntervalSeconds,去执行CacheRefreshThread,即缓存刷新refreshRegistry()任务
    • 缓存刷新任务cacheRefreshTask
    • 使用调度器 scheduler 安排任务
  • 【2】检查是否需要注册入Eureka(配置项eureka.client.registerWithEureka默认=true)
    • 用注入的异步线程池heartbeatExecutor,按指定时间间隔renewalIntervalInSecs,去执行HeartbeatThread,即执行续租renew()任务
    • 心跳续租任务heartbeatTask
    • 使用调度器 scheduler 安排任务

CacheRefreshThread - 缓存刷新
代码语言:txt
复制
class CacheRefreshThread implements Runnable {
    public void run() {
        refreshRegistry();
    }
}

@VisibleForTesting
void refreshRegistry() {
    try {
        //.....

        //【1】刷新本地注册服务列表
        boolean success = fetchRegistry(remoteRegionsModified);
        //.....
    } catch (Throwable e) {
        logger.error("Cannot fetch registry from server", e);
    }
}

private boolean fetchRegistry(boolean forceFullRegistryFetch) {
    try {
        // 【2】获取本地localRegionApps的服务列表
        Applications applications = getApplications();

        // 【3】获取远程数据并更新服务列表
        getAndUpdateDelta(applications);
    }

    // registry was fetched successfully, so return true
    return true;
}

private void getAndUpdateDelta(Applications applications) throws Throwable {
    // .....

    //【4】检查缓存delta的服务注册列表
    Applications delta = null;
    if (delta == null) {
        // 【4.1】如果缓存为空,就再去拉取一次EurekaServer的数据
        getAndStoreFullRegistry();
    } else {
        if (fetchRegistryUpdateLock.tryLock()) {
            try {
                //【5】获取EurekaServer最新的服务注册表,并执行delta更新
                getAndStoreFullRegistry()
                updateDelta(delta);
            } finally {
                fetchRegistryUpdateLock.unlock();
            }
        }    
    }
} 

private void getAndStoreFullRegistry() throws Throwable {
    long currentUpdateGeneration = fetchRegistryGeneration.get();
    Applications apps = null;
    EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
            ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
            : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
        apps = httpResponse.getEntity();
    }
}

private void updateDelta(Applications delta) {
    int deltaCount = 0;
    //【6】遍历服务注册列表的每个app
    for (Application app : delta.getRegisteredApplications()) {
        //【7】遍历每个服务的所有实例instance
        for (InstanceInfo instance : app.getInstances()) {
            //【8】获取本地cache的服务注册信息
            Applications applications = getApplications();
            String instanceRegion = instanceRegionChecker.getInstanceRegion(instance);
            if (!instanceRegionChecker.isLocalRegion(instanceRegion)) {
                Applications remoteApps = remoteRegionVsApps.get(instanceRegion);
                if (null == remoteApps) {
                    remoteApps = new Applications();
                    remoteRegionVsApps.put(instanceRegion, remoteApps);
                }
                applications = remoteApps;
            }

            ++deltaCount;
            //【9】如果实例是新增的类型
            if (ActionType.ADDED.equals(instance.getActionType())) {
                Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                if (existingApp == null) {
                    //【10】执行实例添加
                    applications.addApplication(app);
                }
                applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
            } 
            //【11】如果实例是修改的类型
            else if (ActionType.MODIFIED.equals(instance.getActionType())) {
                Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                if (existingApp == null) {
                    //【12】没有已有实例,执行添加操作
                    applications.addApplication(app);
                }
                //【13】存在已有实例,则注册新的实例信息
                applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
            } 
            //【14】如果实例是删除的类型
            else if (ActionType.DELETED.equals(instance.getActionType())) {
                Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                if (existingApp != null) {
                    //【15】删除这个服务的实例
                    existingApp.removeInstance(instance);
                    //【16】如果这个服务的实例数量=0,则直接删除服务信息app
                    if (existingApp.getInstancesAsIsFromEureka().isEmpty()) {
                        applications.removeApplication(existingApp);
                    }
                }
            }
        }
    }
}

代码分析:见下面流程图

HeartbeatThread - 心跳续租

代码语言:txt
复制
private final Counter REREGISTER_COUNTER = Monitors.newCounter(PREFIX
        + "Reregister");

private class HeartbeatThread implements Runnable {

    public void run() {
        //【1】更新操作
        if (renew()) {
            lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
        }
    }
}

boolean renew() {
    EurekaHttpResponse<InstanceInfo> httpResponse;
    try {
        //【2】客户端发送心跳包,获取响应
        httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
        //【3】响应码=404,说明服务在EurekaServer不存在
        if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
            REREGISTER_COUNTER.increment();
            long timestamp = instanceInfo.setIsDirtyWithTime();
            //【4】客户端重新发起一次register操作,给EurekaServer
            boolean success = register();
            if (success) {
                instanceInfo.unsetIsDirty(timestamp);
            }
             //【5】EurekaServer注册成功,则续约成功
            return success;
        }
        //【6】响应码=200,则在EurekaServer侧续约成功了
        return httpResponse.getStatusCode() == Status.OK.getStatusCode();
    } catch (Throwable e) {
        logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
        return false;
    }
}

代码分析:见下面流程图

图片
图片
取消租约
代码语言:txt
复制
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class,
      search = SearchStrategy.CURRENT)
public EurekaClient eurekaClient(ApplicationInfoManager manager,
      EurekaClientConfig config) {
   return new CloudEurekaClient(manager, config, this.optionalArgs,
         this.context);
}

@PreDestroy
@Override
public synchronized void shutdown() {
    if (isShutdown.compareAndSet(false, true)) {
        logger.info("Shutting down DiscoveryClient ...");

        if (statusChangeListener != null && applicationInfoManager != null) {
            applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
        }

        cancelScheduledTasks();

        // If APPINFO was registered
        if (applicationInfoManager != null
                && clientConfig.shouldRegisterWithEureka()
                && clientConfig.shouldUnregisterOnShutdown()) {
            applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
            unregister();
        }

        if (eurekaTransport != null) {
            eurekaTransport.shutdown();
        }

        heartbeatStalenessMonitor.shutdown();
        registryStalenessMonitor.shutdown();

        Monitors.unregisterObject(this);

        logger.info("Completed shut down of DiscoveryClient");
    }
}

private void cancelScheduledTasks() {
    if (instanceInfoReplicator != null) {
        instanceInfoReplicator.stop();
    }
    if (heartbeatExecutor != null) {
        heartbeatExecutor.shutdownNow();
    }
    if (cacheRefreshExecutor != null) {
        cacheRefreshExecutor.shutdownNow();
    }
    if (scheduler != null) {
        scheduler.shutdownNow();
    }
    if (cacheRefreshTask != null) {
        cacheRefreshTask.cancel();
    }
    if (heartbeatTask != null) {
        heartbeatTask.cancel();
    }
}

代码分析:见下面流程图

图片
图片

小结

我们回到开头的原理,知道EurekaClient客户端本质就是4个动作:

  1. 获取服务列表:在CacheRefreshThread里有实现,即CacheRefreshThread的【4.1】步骤的eurekaTransport.queryClient.getApplications
  2. 注册服务实例:在HeartbeatThread里有实现,即HeartbeatThread的【4】步骤的eurekaTransport.registrationClient.register
  3. 租约续约:在HeartbeatThread里有实现,即HeartbeatThread的【2】步骤的eurekaTransport.registrationClient.sendHeartBeat
  4. 取消租约:在定义CloudEurekaClient的@Bean(destroyMethod = "shutdown")注解有生命

但我们还想知道,CacheRefreshThread 和 HeartbeatThread的背后通信,以及在EurekaServer的原理细节。可以,我们放到下一个章节再讲。

其他文章

Kafka消息堆积问题排查

基于SpringMVC的API灰度方案

理解到位:灾备和只读数据库

SQL治理经验谈:索引覆盖

Mybatis链路分析:JDK动态代理和责任链模式的应用

大模型安装部署、测试、接入SpringCloud应用体系

Mybatis插件-租户ID的注入&拦截应用

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 背景
  • CloudEurekaClient分析
    • 源码
      • CloudEurekaClient   
      • DiscoveryClient
    • 小结
      • 其他文章
      相关产品与服务
      腾讯云代码分析
      腾讯云代码分析(内部代号CodeDog)是集众多代码分析工具的云原生、分布式、高性能的代码综合分析跟踪管理平台,其主要功能是持续跟踪分析代码,观测项目代码质量,支撑团队传承代码文化。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档