前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Eureka的InstanceInfoReplicator类(服务注册辅助工具)

Eureka的InstanceInfoReplicator类(服务注册辅助工具)

作者头像
程序员欣宸
发布2019-05-26 21:49:47
9900
发布2019-05-26 21:49:47
举报
文章被收录于专栏:实战docker实战docker

关于服务注册

以下图片来自Netflix官方,图中显示Eureka Client会向注册中心发起Get Registry请求来获取服务列表:

在这里插入图片描述
在这里插入图片描述

以Spring Cloud的Edgware.RELEASE版本为例,Eureka client的注册动作是在com.netflix.discovery.DiscoveryClient类的initScheduledTasks方法中执行的,相关代码片段如下所示,请注意中文注释:

代码语言:javascript
复制
//略去不相关代码
...
//实例化InstanceInfoReplicator对象
instanceInfoReplicator = new InstanceInfoReplicator(
                    this,
                    instanceInfo,
                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                    2); // burstSize

            //监听器,用来监听作为Eureka client的自身的状态变化
            statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                @Override
                public String getId() {
                    return "statusChangeListener";
                }

                @Override
                public void notify(StatusChangeEvent statusChangeEvent) {
                    if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                            InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                        // log at warn level if DOWN was involved
                        logger.warn("Saw local status change event {}", statusChangeEvent);
                    } else {
                        logger.info("Saw local status change event {}", statusChangeEvent);
                    }
                    //状态变化时notify方法会被执行,此时上报最新状态到Eureka server
                    instanceInfoReplicator.onDemandUpdate();
                }
            };

            if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                //注册监听器
                applicationInfoManager.registerStatusChangeListener(statusChangeListener);
            }
            //服务注册
            instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());

上述代码表明,将自身信息上报到Eureka server的工作是通过调用instanceInfoReplicator的api完成的;

InstanceInfoReplicator的作用

先看InstanceInfoReplicator源码的注释:

代码语言:javascript
复制
/**
 * A task for updating and replicating the local instanceinfo to the remote server. Properties of this task are:
 * - configured with a single update thread to guarantee sequential update to the remote server
 * - update tasks can be scheduled on-demand via onDemandUpdate()
 * - task processing is rate limited by burstSize
 * - a new update task is always scheduled automatically after an earlier update task. However if an on-demand task
 *   is started, the scheduled automatic update task is discarded (and a new one will be scheduled after the new
 *   on-demand update).
 *
 *   @author dliu
 */

我的理解:

  1. InstanceInfoReplicator是个任务类,负责将自身的信息周期性的上报到Eureka server;
  2. 有两个场景触发上报:周期性任务、服务状态变化(onDemandUpdate被调用),因此,在同一时刻有可能有两个上报的任务同时出现;
  3. 单线程执行上报的操作,如果有多个上报任务,也能确保是串行的;
  4. 有频率限制,通过burstSize参数来控制;
  5. 先创建的任务总是先执行,但是onDemandUpdate方法中创建的任务会将周期性任务给丢弃掉;

源码分析

以前面对注释的理解作为主线,去看源码:

  1. 先看构造方法,如下,中文注释位置需要注意:
代码语言:javascript
复制
InstanceInfoReplicator(DiscoveryClient discoveryClient, InstanceInfo instanceInfo, int replicationIntervalSeconds, int burstSize) {
        this.discoveryClient = discoveryClient;
        this.instanceInfo = instanceInfo;
		//线程池,core size为1,使用DelayedWorkQueue队列
        this.scheduler = Executors.newScheduledThreadPool(1,
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-InstanceInfoReplicator-%d")
                        .setDaemon(true)
                        .build());

        this.scheduledPeriodicRef = new AtomicReference<Future>();

        this.started = new AtomicBoolean(false);
        //RateLimiter是个限制频率的工具类,用来限制单位时间内的任务次数
        this.rateLimiter = new RateLimiter(TimeUnit.MINUTES);
        this.replicationIntervalSeconds = replicationIntervalSeconds;
        this.burstSize = burstSize;
        //通过周期间隔,和burstSize参数,计算每分钟允许的任务数
        this.allowedRatePerMinute = 60 * this.burstSize / this.replicationIntervalSeconds;
        logger.info("InstanceInfoReplicator onDemand update allowed rate per min is {}", allowedRatePerMinute);
    }

从以上代码可见,构造方法中准备好了线程池和频率限制工具,再算好了每分钟允许的任务数; 2. 在com.netflix.discovery.DiscoveryClient类的initScheduledTasks方法中,通过调用instanceInfoReplicator.start方法启动了周期性任务,现在来看此方法:

代码语言:javascript
复制
public void start(int initialDelayMs) {
		//CAS操作,不但保证了只执行一次,多线程场景也能保证
		if (started.compareAndSet(false, true)) {
		        instanceInfo.setIsDirty();  // for initial register
		        //提交一个任务,延时执行,注意第一个参数是this,因此延时结束时,InstanceInfoReplicator的run方法会被执行
		        Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
		        //这个任务的Feature对象放在成员变量scheduledPeriodicRef中
		        scheduledPeriodicRef.set(next);
		}
}
  1. 延时时间到达时,会执行run方法:
代码语言:javascript
复制
public void run() {
        try {
            //更新信息,用于稍后的上报
            discoveryClient.refreshInstanceInfo();

            Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
            if (dirtyTimestamp != null) {
                //上报
                discoveryClient.register();
                instanceInfo.unsetIsDirty(dirtyTimestamp);
            }
        } catch (Throwable t) {
            logger.warn("There was a problem with the instance info replicator", t);
        } finally {
            //每次执行完毕都会创建一个延时执行的任务,就这样实现了周期性执行的逻辑
            Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
            //每次创建的周期性任务,都要放入scheduledPeriodicRef,
            //如果外部调用了onDemandUpdate,就能通过onDemandUpdate取得当前要执行的任务
            scheduledPeriodicRef.set(next);
        }
    }
  1. 以上代码汇总起来,就完成了周期性任务的逻辑,接下来看看被外部调用的onDemandUpdate方法:
代码语言:javascript
复制
public boolean onDemandUpdate() {
        //没有达到频率限制才会执行
        if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
            //提交一个任务
            scheduler.submit(new Runnable() {
                @Override
                public void run() {
                    logger.debug("Executing on-demand update of local InstanceInfo");
                    //取出之前已经提交的任务
                    Future latestPeriodic = scheduledPeriodicRef.get();
                    //如果此任务未完成,就立即取消
                    if (latestPeriodic != null && !latestPeriodic.isDone()) {
                        logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
                        latestPeriodic.cancel(false);
                    }
					//通过调用run方法,令任务在延时后执行,相当于周期性任务中的一次
                    InstanceInfoReplicator.this.run();
                }
            });
            return true;
        } else {
            //如果超过了设置的频率限制,本次onDemandUpdate方法就提交任务了
            logger.warn("Ignoring onDemand update due to rate limiter");
            return false;
        }
    }

如上述代码所示,可见之前注释中提到的功能都已实现;

至此,InstanceInfoReplicator已分析完毕,可见这是个功能强大的辅助类,在应用信息上报到Eureka server时发挥了重要的作用,业务逻辑可以放心的提交上报请求,并发、频率超限等情况都被InstanceInfoReplicator处理好了;

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018年09月30日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 关于服务注册
  • InstanceInfoReplicator的作用
  • 源码分析
相关产品与服务
微服务引擎 TSE
微服务引擎(Tencent Cloud Service Engine)提供开箱即用的云上全场景微服务解决方案。支持开源增强的云原生注册配置中心(Zookeeper、Nacos 和 Apollo),北极星网格(腾讯自研并开源的 PolarisMesh)、云原生 API 网关(Kong)以及微服务应用托管的弹性微服务平台。微服务引擎完全兼容开源版本的使用方式,在功能、可用性和可运维性等多个方面进行增强。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档