前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spring Cloud Eureka 全解 (3) - 核心流程-服务注册与取消详解

Spring Cloud Eureka 全解 (3) - 核心流程-服务注册与取消详解

作者头像
干货满满张哈希
发布2021-04-12 15:36:26
7050
发布2021-04-12 15:36:26
举报
文章被收录于专栏:干货满满张哈希

本文基于SpringCloud-Dalston.SR5

关于服务注册

开启/关闭服务注册配置:eureka.client.register-with-eureka = true (默认)

什么时候注册?

  1. 应用第一次启动时,初始化EurekaClient时,应用状态改变:从STARTING变为UP会触发这个Listener,调用instanceInfoReplicator.onDemandUpdate(); 可以推测出,实例状态改变时,也会通过注册接口更新实例状态信息
代码语言:javascript
复制
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
    @Override
    public String getId() {
        return "statusChangeListener";
    }

    @Override
    public void notify(StatusChangeEvent statusChangeEvent) {
        if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
            // log at warn level if DOWN was involved
            logger.warn("Saw local status change event {}", statusChangeEvent);
        } else {
            logger.info("Saw local status change event {}", statusChangeEvent);
        }
        instanceInfoReplicator.onDemandUpdate();
    }
};
  1. 定时任务,如果InstanceInfo发生改变,也会通过注册接口更新信息
代码语言:javascript
复制
public void run() {
    try {
        discoveryClient.refreshInstanceInfo();
        //如果实例信息发生改变,则需要调用register更新InstanceInfo
        Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
        if (dirtyTimestamp != null) {
            discoveryClient.register();
            instanceInfo.unsetIsDirty(dirtyTimestamp);
        }
    } catch (Throwable t) {
        logger.warn("There was a problem with the instance info replicator", t);
    } finally {
        Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
        scheduledPeriodicRef.set(next);
    }
}
  1. 在定时renew时,如果renew接口返回404(代表这个实例在EurekaServer上面找不到),可能是之前注册失败或者注册过期导致的。这时需要调用register重新注册
代码语言:javascript
复制
boolean renew() {
    EurekaHttpResponse httpResponse;
    try {
        httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
        logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
        //如果renew接口返回404(代表这个实例在EurekaServer上面找不到),可能是之前注册失败或者注册过期导致的
        if (httpResponse.getStatusCode() == 404) {
            REREGISTER_COUNTER.increment();
            logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
            long timestamp = instanceInfo.setIsDirtyWithTime();
            boolean success = register();
            if (success) {
                instanceInfo.unsetIsDirty(timestamp);
            }
            return success;
        }
        return httpResponse.getStatusCode() == 200;
    } catch (Throwable e) {
        logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
        return false;
    }
}

向Eureka发送注册请求EurekaServer发生了什么?

主要有两个存储,一个是之前提到过的registry,还有一个最近变化队列,后面我们会知道,这个最近变化队列里面就是客户端获取增量实例信息的内容:

代码语言:javascript
复制
# 整体注册信息缓存
private final ConcurrentHashMap<String, Map<String, Lease>> registry = new ConcurrentHashMap<String, Map<String, Lease>>();
# 最近变化队列
private ConcurrentLinkedQueue recentlyChangedQueue = new ConcurrentLinkedQueue(); 

EurekaServer收到实例注册主要分两步:

  • 调用父类方法注册
  • 同步到其他EurekaServer实例
代码语言:javascript
复制
public void register(InstanceInfo info, boolean isReplication) {
    int leaseDuration = 90;
    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
        leaseDuration = info.getLeaseInfo().getDurationInSecs();
    }
    //调用父类方法注册
    super.register(info, leaseDuration, isReplication);
    //同步到其他EurekaServer实例
    this.replicateToPeers(PeerAwareInstanceRegistryImpl.Action.Register, info.getAppName(), info.getId(), info, (InstanceStatus)null, isReplication);
}

我们先看同步到其他EurekaServer实例

其实就是,注册到的EurekaServer再依次调用其他集群内的EurekaServer的Register方法将实例信息同步过去

代码语言:javascript
复制
private void replicateToPeers(Action action, String appName, String id,
                              InstanceInfo info /* optional */,
                              InstanceStatus newStatus /* optional */, boolean isReplication) {
    Stopwatch tracer = action.getTimer().start();
    try {
        if (isReplication) {
            numberOfReplicationsLastMin.increment();
        }
        // If it is a replication already, do not replicate again as this will create a poison replication
        if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
            return;
        }

        for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
            // If the url represents this host, do not replicate to yourself.
            if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                continue;
            }
            replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
        }
    } finally {
        tracer.stop();
    }
}

private void replicateInstanceActionsToPeers(Action action, String appName,
                                             String id, InstanceInfo info, InstanceStatus newStatus,
                                             PeerEurekaNode node) {
    try {
        InstanceInfo infoFromRegistry = null;
        CurrentRequestVersion.set(Version.V2);
        switch (action) {
            case Cancel:
                node.cancel(appName, id);
                break;
            case Heartbeat:
                InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                break;
            case Register:
                node.register(info);
                break;
            case StatusUpdate:
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                break;
            case DeleteStatusOverride:
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.deleteStatusOverride(appName, id, infoFromRegistry);
                break;
        }
    } catch (Throwable t) {
        logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
    }
}

然后看看调用父类方法注册:

代码语言:javascript
复制
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
    try {
        //register虽然看上去好像是修改,但是这里用的是读锁,后面会解释
        read.lock();
        //从registry中查看这个app是否存在
        Map> gMap = registry.get(registrant.getAppName());
        //不存在就创建
        if (gMap == null) {
            final ConcurrentHashMap> gNewMap = new ConcurrentHashMap>();
            gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
            if (gMap == null) {
                gMap = gNewMap;
            }
        }
        //查看这个app的这个实例是否已存在
        Lease existingLease = gMap.get(registrant.getId());

        if (existingLease != null && (existingLease.getHolder() != null)) {
            //如果已存在,对比时间戳,保留比较新的实例信息......
        } else {
            // 如果不存在,证明是一个新的实例
            //更新自我保护监控变量的值的代码.....

        }
        Lease lease = new Lease(registrant, leaseDuration);
        if (existingLease != null) {
            lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
        }
        //放入registry
        gMap.put(registrant.getId(), lease);

        //加入最近修改的记录队列
        recentlyChangedQueue.add(new RecentlyChangedItem(lease));
        //初始化状态,记录时间等相关代码......

        //主动让Response缓存失效
        invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
    } finally {
        read.unlock();
    }
}

总结起来,就是主要三件事:

1.将实例注册信息放入或者更新registry

2.将实例注册信息加入最近修改的记录队列

3.主动让Response缓存失效

我们来类比下服务取消

服务取消CANCEL

代码语言:javascript
复制
protected boolean internalCancel(String appName, String id, boolean isReplication) {
    try {
        //cancel虽然看上去好像是修改,但是这里用的是读锁,后面会解释
        read.lock();

        //从registry中剔除这个实例
        Map> gMap = registry.get(appName);
        Lease leaseToCancel = null;
        if (gMap != null) {
            leaseToCancel = gMap.remove(id);
        }
        if (leaseToCancel == null) {
            logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
            return false;
        } else {
            //改变状态,记录状态修改时间等相关代码......
            if (instanceInfo != null) {
                instanceInfo.setActionType(ActionType.DELETED);
                //加入最近修改的记录队列
                recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
            }
            //主动让Response缓存失效
            invalidateCache(appName, vip, svip);
            logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
            return true;
        }
    } finally {
        read.unlock();
    }
}

总结起来,也是主要三件事:

1.从registry中剔除这个实例

2.将实例注册信息加入最近修改的记录队列

3.主动让Response缓存失效

这里我们注意到了这个最近修改队列,我们来详细看看

最近修改队列

这个最近修改队列和消费者定时获取服务实例列表有着密切的关系

代码语言:javascript
复制
private TimerTask getDeltaRetentionTask() {
    return new TimerTask() {

        @Override
        public void run() {
            Iterator it = recentlyChangedQueue.iterator();
            while (it.hasNext()) {
                if (it.next().getLastUpdateTime() <
                        System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {
                    it.remove();
                } else {
                    break;
                }
            }
        }

    };
}

这个RetentionTimeInMSInDeltaQueue默认是180s(配置是eureka.server.retention-time-in-m-s-in-delta-queue,默认是180s,官网写错了),可以看出这个队列是一个长度为180s的滑动窗口,保存最近180s以内的应用实例信息修改,后面我们会看到,客户端调用获取增量信息,实际上就是从这个queue中读取,所以可能一段时间内读取到的信息都是一样的。

Response缓存

Response缓存的实现类是ResponseCacheImpl,主要包括如下缓存field:

代码语言:javascript
复制
private final ConcurrentMap readOnlyCacheMap = new ConcurrentHashMap();
private final LoadingCache readWriteCacheMap;

一个是guava的loadingcache,一个是普通的ConcurrentHashMap

这个loadingcache的初始化:

代码语言:javascript
复制
this.readWriteCacheMap = CacheBuilder.newBuilder().initialCapacity(1000)
    .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
    .removalListener(new RemovalListener() {
        @Override
        public void onRemoval(RemovalNotification notification) {
            Key removedKey = notification.getKey();
            if (removedKey.hasRegions()) {
                Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
                regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
            }
        }
    })
    .build(new CacheLoader() {
        @Override
        public Value load(Key key) throws Exception {
            if (key.hasRegions()) {
                Key cloneWithNoRegions = key.cloneWithoutRegions();
                regionSpecificKeys.put(cloneWithNoRegions, key);
            }
            Value value = generatePayload(key);
            return value;
        }
    });

对于每个不存在的Key,会首先初始化,主要是调用generatePayload这个方法:

代码语言:javascript
复制
private Value generatePayload(Key key) {
    Stopwatch tracer = null;
    try {
        String payload;
        switch (key.getEntityType()) {
            case Application:
                boolean isRemoteRegionRequested = key.hasRegions();

                if (ALL_APPS.equals(key.getName())) {
                    //获取所有应用信息
                    if (isRemoteRegionRequested) {
                        tracer = serializeAllAppsWithRemoteRegionTimer.start();
                        payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
                    } else {
                        tracer = serializeAllAppsTimer.start();
                        payload = getPayLoad(key, registry.getApplications());
                    }
                } else if (ALL_APPS_DELTA.equals(key.getName())) {
                    //获取所有应用增量信息
                    if (isRemoteRegionRequested) {
                        tracer = serializeDeltaAppsWithRemoteRegionTimer.start();
                        versionDeltaWithRegions.incrementAndGet();
                        versionDeltaWithRegionsLegacy.incrementAndGet();
                        payload = getPayLoad(key,
                                registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
                    } else {
                        tracer = serializeDeltaAppsTimer.start();
                        versionDelta.incrementAndGet();
                        versionDeltaLegacy.incrementAndGet();
                        payload = getPayLoad(key, registry.getApplicationDeltas());
                    }
                } else {
                    //获取单个应用信息
                    tracer = serializeOneApptimer.start();
                    payload = getPayLoad(key, registry.getApplication(key.getName()));
                }
                break;

            //其他类型我们不关心,先忽略掉相关代码
        }
        return new Value(payload);
    } finally {
        if (tracer != null) {
            tracer.stop();
        }
    }
}

获取所有应用信息,是从registry中直接拿registry.getApplications(),核心方法是getApplicationsFromMultipleRegions,看下简化过的源码:

代码语言:javascript
复制
public Applications getApplicationsFromMultipleRegions(String[] remoteRegions) {

    boolean includeRemoteRegion = null != remoteRegions && remoteRegions.length != 0;

    Applications apps = new Applications();
    apps.setVersion(1L);
    //将registry中的信息封装好放入Applications
    for (Entry<String, Map<String, Lease>> entry : registry.entrySet()) {
        Application app = null;

        if (entry.getValue() != null) {
            for (Entry<String, Lease> stringLeaseEntry : entry.getValue().entrySet()) {
                Lease lease = stringLeaseEntry.getValue();
                if (app == null) {
                    app = new Application(lease.getHolder().getAppName());
                }
                app.addInstance(decorateInstanceInfo(lease));
            }
        }
        if (app != null) {
            apps.addApplication(app);
        }
    }
    //读取其他Region的Apps信息,我们目前不关心,略过这部分代码......

    //设置AppsHashCode,在之后的介绍中,我们会提到,客户端读取到之后会对比这个AppsHashCode
    apps.setAppsHashCode(apps.getReconcileHashCode());
    return apps;
}

获取所有应用增量信息,registry.getApplicationDeltas():

代码语言:javascript
复制
public Applications getApplicationDeltas() {
    Applications apps = new Applications();
    apps.setVersion(responseCache.getVersionDelta().get());
    Map applicationInstancesMap = new HashMap();
    try {
        //这里读取用的是写锁,下面我们就会解释为何这么用
        write.lock();

        //遍历recentlyChangedQueue,获取所有增量信息
        Iterator iter = this.recentlyChangedQueue.iterator();
        logger.debug("The number of elements in the delta queue is :"
                + this.recentlyChangedQueue.size());
        while (iter.hasNext()) {
            Lease lease = iter.next().getLeaseInfo();
            InstanceInfo instanceInfo = lease.getHolder();
            Object[] args = {instanceInfo.getId(),
                    instanceInfo.getStatus().name(),
                    instanceInfo.getActionType().name()};
            logger.debug(
                    "The instance id %s is found with status %s and actiontype %s",
                    args);
            Application app = applicationInstancesMap.get(instanceInfo
                    .getAppName());
            if (app == null) {
                app = new Application(instanceInfo.getAppName());
                applicationInstancesMap.put(instanceInfo.getAppName(), app);
                apps.addApplication(app);
            }
            app.addInstance(decorateInstanceInfo(lease));
        }

        //读取其他Region的Apps信息,我们目前不关心,略过这部分代码......

        Applications allApps = getApplications(!disableTransparentFallback);
        //设置AppsHashCode,在之后的介绍中,我们会提到,客户端读取到之后更新好自己的Apps缓存之后会对比这个AppsHashCode,如果不一样,就会进行一次全量Apps信息请求
        apps.setAppsHashCode(allApps.getReconcileHashCode());
        return apps;
    } finally {
        write.unlock();
    }
}

为何这里读写锁这么用,首先我们来分析下这个锁保护的对象是谁,可以很明显的看出,是recentlyChangedQueue这个队列。那么谁在修改这个队列,谁又在读取呢? 每个服务实例注册,取消的时候,都会修改这个队列,这个队列是多线程修改的。但是读取,只有loadingcache的ALL_APPS_DELTAkey初始化线程会读取,而且在缓存失效前都不会再有线程读取。所以可以归纳为,多线程频繁修改,但是单线程不频繁读取。 如果没有锁,那么recentlyChangedQueue在遍历读取时如果遇到修改,就会抛出并发修改异常。如果用writeLock锁住多线程修改,那么同一时间只有一个线程能修改,效率不好。所以。利用读锁锁住多线程修改,利用写锁锁住单线程读取正好符合这里的场景。

前面提到,EurekaClient的查询请求,都是从ResponseCache中获取(从ResponseCache本身缓存的就是请求)。ResponseCache还包括readOnlyCacheMap,这个默认时启用的,就是用户请求会先从readOnlyCacheMap读取,如果readOnlyCacheMap中不存在,则从上面介绍的readWriteCacheMap中获取,之后再放入readOnlyCacheMap。

代码语言:javascript
复制
Value getValue(final Key key, boolean useReadOnlyCache) {
    Value payload = null;
    try {
        if (useReadOnlyCache) {
            final Value currentPayload = readOnlyCacheMap.get(key);
            if (currentPayload != null) {
                payload = currentPayload;
            } else {
                payload = readWriteCacheMap.get(key);
                readOnlyCacheMap.put(key, payload);
            }
        } else {
            payload = readWriteCacheMap.get(key);
        }
    } catch (Throwable t) {
        logger.error("Cannot get value for key :" + key, t);
    }
    return payload;
}

还有个定时任务:每隔只读缓存刷新时间将ReadWriteMap的信息复制到ReadOnlyMap上面:这个readOnlyCacheMap里面数据是定时从readWriteCacheMap中拷贝出来的:

代码语言:javascript
复制
 private TimerTask getCacheUpdateTask() {
    return new TimerTask() {
        @Override
        public void run() {
            logger.debug("Updating the client cache from response cache");
            for (Key key : readOnlyCacheMap.keySet()) {
                if (logger.isDebugEnabled()) {
                    Object[] args = {key.getEntityType(), key.getName(), key.getVersion(), key.getType()};
                    logger.debug("Updating the client cache from response cache for key : {} {} {} {}", args);
                }
                try {
                    CurrentRequestVersion.set(key.getVersion());
                    Value cacheValue = readWriteCacheMap.get(key);
                    Value currentCacheValue = readOnlyCacheMap.get(key);
                    if (cacheValue != currentCacheValue) {
                        readOnlyCacheMap.put(key, cacheValue);
                    }
                } catch (Throwable th) {
                    logger.error("Error while updating the client cache from response cache", th);
                }
            }
        }
    };
}

在本篇最开始的时候提到register和cancel都会主动失效对应的ResponseCache,这个主动失效的源代码是:

代码语言:javascript
复制
public void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) {
    for (Key.KeyType type : Key.KeyType.values()) {
        for (Version v : Version.values()) {

            //对于任意一个APP缓存失效,都要让对应的APP请求响应,全量APP信息请求响应,增量APP信息请求响应失效
            invalidate(
                    new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.full),
                    new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.compact),
                    new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.full),
                    new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.compact),
                    new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.full),
                    new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.compact)
            );
            if (null != vipAddress) {
                invalidate(new Key(Key.EntityType.VIP, vipAddress, type, v, EurekaAccept.full));
            }
            if (null != secureVipAddress) {
                invalidate(new Key(Key.EntityType.SVIP, secureVipAddress, type, v, EurekaAccept.full));
            }
        }
    }
}

public void invalidate(Key... keys) {
    for (Key key : keys) {
        logger.debug("Invalidating the response cache key : {} {} {} {}, {}",
                key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());

        readWriteCacheMap.invalidate(key);
        Collection<Key> keysWithRegions = regionSpecificKeys.get(key);
        if (null != keysWithRegions && !keysWithRegions.isEmpty()) {
            for (Key keysWithRegion : keysWithRegions) {
                logger.debug("Invalidating the response cache key : {} {} {} {} {}",
                        key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());
                readWriteCacheMap.invalidate(keysWithRegion);
            }
        }
    }
}

在readWriteCacheMap中使对应的APP请求响应,全量APP信息请求响应,增量APP信息请求响应失效后,下次请求,就会再读取registry生成。对于registry,新加入的应用或者实例会被读取到。对于cancel,退出的应用或者实例也会被去除掉

所以,总结起来,用下面这张图展示下EurekaServer 重要缓存和对应的请求:

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2017/11/29 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 关于服务注册
    • 什么时候注册?
      • 向Eureka发送注册请求EurekaServer发生了什么?
      • 服务取消CANCEL
        • 最近修改队列
        • Response缓存
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档