专栏首页JavaEdge开发SpringCloud微服务三年,我才知道@EnableEurekaServer注解到底做了什么
原创

开发SpringCloud微服务三年,我才知道@EnableEurekaServer注解到底做了什么

全是干货的 Java 技术仓库:https://github.com/Wasabi1234/Java-Interview-Tutorial

  • 当你在启动类上添加了启动 Eureka 服务注册中心注解时,到底发生了什么呢?
  • 激活eureka服务器相关配置EurekaServerAutoConfiguration的注解
  • EurekaServerMarkerConfiguration
    点击到这里,我们发现 spring.factories文件
  • 注意到如下注解
    @ConditionalOnBean - 条件注入也就是当有EurekaServerMarkerConfiguration.Marker.class时,才会注入

所以@EnableEurekaServer就是个开关,只要写了该注解,spring 就会帮你把EurekaServerAutoConfiguration类注入进来。

那么为什么注入他就行了?

EurekaServerAutoConfiguration#jerseyFilterRegistration

注意如下类:

ApplicationResource#addInstance

相当于 MVC 中的 controller

   /**
     * Registers information about a particular instance for an
     * {@link com.netflix.discovery.shared.Application}.
     *
     * @param info
     *            {@link InstanceInfo} information of the instance.
     * @param isReplication
     *            a header parameter containing information whether this is
     *            replicated from other nodes.
     */
    @POST
    @Consumes({"application/json", "application/xml"})
    public Response addInstance(InstanceInfo info,
                                @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
        logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
        // validate that the instanceinfo contains all the necessary required fields
        if (isBlank(info.getId())) {
            return Response.status(400).entity("Missing instanceId").build();
        } else if (isBlank(info.getHostName())) {
            return Response.status(400).entity("Missing hostname").build();
        } else if (isBlank(info.getIPAddr())) {
            return Response.status(400).entity("Missing ip address").build();
        } else if (isBlank(info.getAppName())) {
            return Response.status(400).entity("Missing appName").build();
        } else if (!appName.equals(info.getAppName())) {
            return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
        } else if (info.getDataCenterInfo() == null) {
            return Response.status(400).entity("Missing dataCenterInfo").build();
        } else if (info.getDataCenterInfo().getName() == null) {
            return Response.status(400).entity("Missing dataCenterInfo Name").build();
        }

        // handle cases where clients may be registering with bad DataCenterInfo with missing data
        DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
        if (dataCenterInfo instanceof UniqueIdentifier) {
            String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
            if (isBlank(dataCenterInfoId)) {
                boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
                if (experimental) {
                    String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
                    return Response.status(400).entity(entity).build();
                } else if (dataCenterInfo instanceof AmazonInfo) {
                    AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
                    String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
                    if (effectiveId == null) {
                        amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
                    }
                } else {
                    logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
                }
            }
        }

        registry.register(info, "true".equals(isReplication));
        return Response.status(204).build();  // 204 to be backwards compatible
    }

注意如下继承体系图

InstanceRegistry#register

PeerAwareInstanceRegistryImpl#register

这里取得微服务过期时间 90s,服务之间心跳请求 30s 一次,如果 90s 还没发生,就说明挂了

这就是责任链模式!!!

AbstractInstanceRegistry#register 真正的注册

  • 注册表 K:微服务集群 V:微服务集群内的各个节点

当发生注册信息冲突时,咋办?

根据最后活跃时间,确定覆盖哪个

    public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        try {
        	// 并发读锁
            read.lock();
            // 先获得微服务名,然后获得实例
            Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
            REGISTER.increment(isReplication);
            // 第一次注册时还不存在,所以 new 一个
            if (gMap == null) {
                final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                // 没有时才 put,有就不更新! 因为registry还是可能被写的!毕竟他不在读锁范围内!
                gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
                if (gMap == null) {
                    gMap = gNewMap;
                }
            }
            // 已存在的注册节点
            Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
            // Retain the last dirty timestamp without overwriting it, if there is already a lease
            if (existingLease != null && (existingLease.getHolder() != null)) {
            	// 拿到已存在节点的注册时间
                Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
                // 当前正在注册的节点的注册时间
                Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
                logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);

                // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
                // InstanceInfo instead of the server local copy.
                // 注册ing 时,有更加新的节点注册了!
                if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                    logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                            " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                    logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                    // 那么就用已存在的替换当前注册节点,因为考虑到尽量保证可用性,因为既然还在活跃说明老的还能用
                    registrant = existingLease.getHolder();
                }
            } else {
                // The lease does not exist and hence it is a new registration
                synchronized (lock) {
                    if (this.expectedNumberOfRenewsPerMin > 0) {
                        // Since the client wants to cancel it, reduce the threshold
                        // (1
                        // for 30 seconds, 2 for a minute)
                        this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
                        this.numberOfRenewsPerMinThreshold =
                                (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
                    }
                }
                logger.debug("No previous lease information found; it is new registration");
            }
            // 新的心跳续约对象,包括注册信息,最后操作时间,注册事件,过期时间,剔除时间
            Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
            if (existingLease != null) {
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }
            // 将新对象放入注册表
            gMap.put(registrant.getId(), lease);
            synchronized (recentRegisteredQueue) {
            	// 加入注册队列
                recentRegisteredQueue.add(new Pair<Long, String>(
                        System.currentTimeMillis(),
                        registrant.getAppName() + "(" + registrant.getId() + ")"));
            }
            // This is where the initial state transfer of overridden status happens
            if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
                logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                                + "overrides", registrant.getOverriddenStatus(), registrant.getId());
                if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                    logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                    overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
                }
            }
            InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
            if (overriddenStatusFromMap != null) {
                logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
                registrant.setOverriddenStatus(overriddenStatusFromMap);
            }

            // Set the status based on the overridden status rules
            InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
            registrant.setStatusWithoutDirty(overriddenInstanceStatus);

            // If the lease is registered with UP status, set lease service up timestamp
            if (InstanceStatus.UP.equals(registrant.getStatus())) {
                lease.serviceUp();
            }
            registrant.setActionType(ActionType.ADDED);
            recentlyChangedQueue.add(new RecentlyChangedItem(lease));
            registrant.setLastUpdatedTimestamp();
            invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
            logger.info("Registered instance {}/{} with status {} (replication={})",
                    registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
        } finally {
            read.unlock();
        }
    }

服务续约

通知Eureka Server Service Provider还活着,避免服务被剔除。

和register实现思路基本一致

  • 更新自身状态
  • 再同步到其它Peer
   public boolean renew(String appName, String id, boolean isReplication) {
        RENEW.increment(isReplication);
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToRenew = null;
        if (gMap != null) {
        	// 拿到具体的注册节点
            leaseToRenew = gMap.get(id);
        }
        if (leaseToRenew == null) {
        	// 不存在,则重新注册续约
            RENEW_NOT_FOUND.increment(isReplication);
            logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
            return false;
        } else {
        //存在
        	// 先得到节点信息
            InstanceInfo instanceInfo = leaseToRenew.getHolder();
            if (instanceInfo != null) {
                // touchASGCache(instanceInfo.getASGName());
                InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                        instanceInfo, leaseToRenew, isReplication);
                // 看是否处于宕机状态
                if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                    logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                            + "; re-register required", instanceInfo.getId());
                    RENEW_NOT_FOUND.increment(isReplication);
                    return false;
                }
                if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                    logger.info(
                            "The instance status {} is different from overridden instance status {} for instance {}. "
                                    + "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
                                    instanceInfo.getOverriddenStatus().name(),
                                    instanceInfo.getId());
                    instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);

                }
            }
            renewsLastMin.increment();
            // 正常则续约
            leaseToRenew.renew();
            return true;
        }
    }
    public enum InstanceStatus {
        UP, // Ready to receive traffic
        DOWN, // Do not send traffic- healthcheck callback failed
        STARTING, // Just about starting- initializations to be done - do not
        // send traffic
        OUT_OF_SERVICE, // Intentionally shutdown for traffic
        UNKNOWN;

        public static InstanceStatus toEnum(String s) {
            if (s != null) {
                try {
                    return InstanceStatus.valueOf(s.toUpperCase());
                } catch (IllegalArgumentException e) {
                    // ignore and fall through to unknown
                    logger.debug("illegal argument supplied to InstanceStatus.valueOf: {}, defaulting to {}", s, UNKNOWN);
                }
            }
            return UNKNOWN;
        }
    }

续约,利用更新的持续时间,如果它是由相关联的指定T注册期间,否则默认持续时间是DEFAULT_DURATION_IN_SECS,即 90s

服务失效剔除

AbstractInstanceRegistry#postInit

  • 剔除定时任务
    public void evict(long additionalLeaseMs) {
        logger.debug("Running the evict task");

		// 判断是否打开自我保护机制
        if (!isLeaseExpirationEnabled()) {
            logger.debug("DS: lease expiration is currently disabled.");
            return;
        }

        // We collect first all expired items, to evict them in random order. For large eviction sets,
        // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
        // the impact should be evenly distributed across all applications.
        List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
        for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
            Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
            if (leaseMap != null) {
                for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                    Lease<InstanceInfo> lease = leaseEntry.getValue();
                    if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                    	// 集合保存所有剔除节点
                        expiredLeases.add(lease);
                    }
                }
            }
        }

        // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
        // triggering self-preservation. Without that we would wipe out full registry.
        // 注册表大小,即所有注册节点个数
        int registrySize = (int) getLocalRegistrySize();
        // 阈值,0.85
        int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
        // 直接相减,得到剔除阈值
        int evictionLimit = registrySize - registrySizeThreshold;

		// 将需要剔除节点个数 和 剔除阈值取最小值,作为自我保护机制下要剔除的节点个数。删多了还是 85 个开启,少了就降低点,这很合理!
        int toEvict = Math.min(expiredLeases.size(), evictionLimit);
        if (toEvict > 0) {
            logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
            // 随机剔除这么多个
            Random random = new Random(System.currentTimeMillis());
            for (int i = 0; i < toEvict; i++) {
                // Pick a random item (Knuth shuffle algorithm)
                int next = i + random.nextInt(expiredLeases.size() - i);
                Collections.swap(expiredLeases, i, next);
                Lease<InstanceInfo> lease = expiredLeases.get(i);

                String appName = lease.getHolder().getAppName();
                String id = lease.getHolder().getId();
                EXPIRED.increment();
                logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
                internalCancel(appName, id, false);
            }
        }
    }
  • 是否过期,该被剔除 当前时间是否大于 最后操作时间+持续时间+服务集群之间同步的预留时间
    自我保护机制eureka 短时间内大量微服务被删除,会打开自我保护机制,避免自己宕机时疯狂删除

检查是否尤里卡服务器的自我保护功能。

当启用时,服务器跟踪它应该从服务器接收更新的数量。 任何时候,续期次数低于按规定的阈值百分比getRenewalPercentThreshold() ,服务器关闭到期,以避免身处险境将有助于服务器维护的客户端和服务器之间的网络出现问题时注册表的信息。

这些变化是在运行时有效

15min 85%宕机则打开该机制

同步给其他peer

传入各种动作

  • InstanceInfo的状态有以下几种 com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl
    PeerAwareInstanceRegistryImpl#register
  • InstanceInfo写入到本地registry之后,然后同步给其他peer节点
    private void replicateToPeers(Action action, String appName, String id,
                                  InstanceInfo info /* 可选 */,
                                  InstanceStatus newStatus /* 可选 */, boolean isReplication) {
        Stopwatch tracer = action.getTimer().start();
        try {
            if (isReplication) {
                numberOfReplicationsLastMin.increment();
            }
            // 如果已经是副本,则不要再复制,因为这将创建有毒的复制!
            // 如果当前节点接收到的实例信息本就是另一个节点同步来的,则不会继续同步给其他节点,避免形成“广播效应”,造成死循环
            if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                return;
            }

            for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
                // If the url represents this host, do not replicate to yourself.
                if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                    continue;
                }
                replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
            }
        } finally {
            tracer.stop();
        }
    }
    private void replicateInstanceActionsToPeers(Action action, String appName,
                                                 String id, InstanceInfo info, InstanceStatus newStatus,
                                                 PeerEurekaNode node) {
        try {
            InstanceInfo infoFromRegistry = null;
            CurrentRequestVersion.set(Version.V2);
            switch (action) {
                case Cancel:
                    node.cancel(appName, id);
                    break;
                case Heartbeat:
                    InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                    break;
                case Register:
                    node.register(info);
                    break;
                case StatusUpdate:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                    break;
                case DeleteStatusOverride:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.deleteStatusOverride(appName, id, infoFromRegistry);
                    break;
            }
        } catch (Throwable t) {
            logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
        }
    }

我们看注册动作对应方法

这里最终还是调用如下:

只不过此时关键在于是否为副本节点

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 玩转算法面试(一)1算法面试意义234 优化算法

    JavaEdge
  • Spring 5.0中文版-3.9

    JavaEdge
  • Java语法糖1 泛型与类型擦除

    JavaEdge
  • 使Spring.NET的IOC容器支持动态加载的程序集

    当我们发布系统时,有时候希望不用关掉应用程序就能完成发布,但Spring.NET的ApplicationContext是从AppDomain.Curr...

    明年我18
  • 绘制SVG内容到Canvas的HTML5应用

    SVG与Canvas是HTML5上绘制图形应用的两种完全不同模式的技术,两种绘制图形方式各有优缺点,但两者并非水火不容,尤其是SVG内容可直接绘制在Canvas...

    HT for Web
  • 绘制SVG内容到Canvas的HTML5应用

    HT_hightopo
  • 微信网易合作推小程序课程 / 腾讯 8.2 亿投资优必选

    知晓君
  • 人工智能简史(一)——萌芽

    人工智能(AI)和计算机之间有着非常紧密的联系,这从计算机在中文里也被翻译成“电脑”(插电的大脑)可以看出来。事实上现在的人工智能就是基于计算机这个载体来实现核...

    企鹅号小编
  • 修改操作系统用户密码,造成SQL Server服务启动失败

    修改操作系统密码,导致SQL不能启动的解决办法: 方法一. 我的电脑–控制面板–管理工具–服务–右键   MSSQLSERVER–属性–登陆–登陆身份–选择"本...

    Tony老师
  • 对象存储服务-腾讯云对象存储服务COS资源包

    对象存储服务-腾讯云对象存储服务COS资源包介绍,腾讯云对象存储服务 COS 具有高扩展性、低成本、可靠和安全等特点,能为您提供专业的数据存储服务。您可以使用控...

    用户1361591

扫码关注云+社区

领取腾讯云代金券