前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Eureka Server过源码

Eureka Server过源码

作者头像
叔叔
发布2018-04-11 13:22:53
5690
发布2018-04-11 13:22:53
举报

Eureka Server启动

  1. 入口EurekaServerInitializerConfiguration
@Configuration
class EurekaServerInitializerConfiguration implements ServletContextAware
@Override
public void start() {
    new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
                publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
                EurekaServerInitializerConfiguration.this.running = true;
                publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
            }
            catch (Exception ex) {
            }
        }
    }).start();
}
  1. 进入EurekaServerBootstrap.contextInitialized()
public void contextInitialized(ServletContext context) {
    try {
        initEurekaEnvironment();
        initEurekaServerContext();
        context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
    }
    catch (Throwable e) {
        log.error("Cannot bootstrap eureka server :", e);
        throw new RuntimeException("Cannot bootstrap eureka server :", e);
    }
}
  1. initEurekaServerContext()
int registryCount = this.registry.syncUp();
  1. PeerAwareInstanceRegistryImpl.syncUp()

如果存在已启动的server,会复制已启动server的注册信息。

Applications apps = eurekaClient.getApplications();
    for (Application app : apps.getRegisteredApplications()) {
        for (InstanceInfo instance : app.getInstances()) {
            try {
                if (isRegisterable(instance)) {
                    register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                    count++;
                }
            } catch (Throwable t) {
                logger.error("During DS init copy", t);
            }
        }
}
  1. AbstractInstanceRegistry.register()
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
  • 注册信息放在了registry这个map中
  • key是registrant.getAppName(),value的map的key是server-id
  • expectedNumberOfRenewsPerMin更新
if (this.expectedNumberOfRenewsPerMin > 0) {
    // Since the client wants to cancel it, reduce the threshold
    // (1
    // for 30 seconds, 2 for a minute)
    this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
    this.numberOfRenewsPerMinThreshold =
            (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
}
这里是Eureka的自我保护机制,每注册上一个实例,重新算一下。
默认是每30秒一个client心跳一次,一分钟就是两次,这里加一个实例,就是加2次,硬编码了,所以都说尽量不要改心跳时间配置。
  • invalidateCache,清除缓存

二、服务注册Register

  1. 入口ApplicationResource.addInstance(..)
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
                            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
    ...
    registry.register(info, "true".equals(isReplication));
    return Response.status(204).build();  // 204 to be backwards compatible
}

ApplicationResource类似于Controller,通过Jersey实现RESTful。

  1. 进入InstanceRegistry.register()
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
    handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication);
    super.register(info, isReplication);
}

第一步发布事件publishEvent,第二部进入主要register实现

  1. PeerAwareInstanceRegistryImpl.register(..)
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
    ..
    super.register(info, leaseDuration, isReplication);
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
  • 调用父类的register方法
  • 复制到其他peer

三、服务续约renew

  1. REST入口InstanceResource.renewLease
@PUT
public Response renewLease(
        @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
        @QueryParam("overriddenstatus") String overriddenStatus,
        @QueryParam("status") String status,
        @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
    boolean isFromReplicaNode = "true".equals(isReplication);
    boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
    ...
}
  1. 进入到AbstractInstanceRegistry的renew

由InstanceRegistry -> AbstractInstanceRegistry -> PeerAwareInstanceRegistryImpl

最后标记为renewed,默认续60s

public void renew() {
    lastUpdateTimestamp = System.currentTimeMillis() + duration;
}
duration默认为90

服务取消cancel流程类似。

四、获取注册信息

入口ApplicationsResource.getContainers

主要方法

response = Response.ok(responseCache.get(cacheKey)).build();

从缓存中取信息,默认缓存时间为30s,设置在ResponseCacheImpl的构造器中。

五、清理服务evict

  1. AbstractInstanceRegistry.EvictionTask extends TimerTask

Eureka Server定期清理服务的定时器在AbstractInstanceRegistry的内部类EvictionTask执行的,使用的java.util.TimerTask定时器。

 @Override
    public void run() {
        try {
            long compensationTimeMs = getCompensationTimeMs();
            logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
            evict(compensationTimeMs);
        } catch (Throwable e) {
            logger.error("Could not run the evict task", e);
        }
    }

默认的evictionIntervalTimerInMs是60秒,每60s执行一次清理任务。

  1. 进入evict()处理
public void evict(long additionalLeaseMs) {
        // We collect first all expired items, to evict them in random order. For large eviction sets,
        // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
        // the impact should be evenly distributed across all applications.
        List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
        for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
            Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
            if (leaseMap != null) {
                for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                    Lease<InstanceInfo> lease = leaseEntry.getValue();
                    if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                        expiredLeases.add(lease);
                    }
                }
            }
        }
        // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
        // triggering self-preservation. Without that we would wipe out full registry.
        int registrySize = (int) getLocalRegistrySize();
        int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
        int evictionLimit = registrySize - registrySizeThreshold;
        int toEvict = Math.min(expiredLeases.size(), evictionLimit);
        if (toEvict > 0) {
            logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
            Random random = new Random(System.currentTimeMillis());
            for (int i = 0; i < toEvict; i++) {
                // Pick a random item (Knuth shuffle algorithm)
                int next = i + random.nextInt(expiredLeases.size() - i);
                Collections.swap(expiredLeases, i, next);
                Lease<InstanceInfo> lease = expiredLeases.get(i);
                String appName = lease.getHolder().getAppName();
                String id = lease.getHolder().getId();
                EXPIRED.increment();
                logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
                internalCancel(appName, id, false);
            }
        }
    }

大致步骤是,先循环服务找出过期的服务,判断原则是:

public boolean isExpired(long additionalLeaseMs) {
    return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
}

duration租长默认是90s,也就是服务90s内没发送心跳就要去除(心跳默认时间是30s,eureka.Instance.lease-renewal-interval-in-seconds)。

去除的方法是internalCancel(appName, id, false),进行cancel操作。

六、Peer信息的更新

  1. 入口DefaultEurekaServerContext
public class DefaultEurekaServerContext implements EurekaServerContext
@PostConstruct
@Override
public void initialize() throws Exception {
    logger.info("Initializing ...");
    peerEurekaNodes.start();
    registry.init(peerEurekaNodes);
    logger.info("Initialized");
}
  1. 进入PeerEurekaNodes.start()
public void start() {
        taskExecutor = Executors.newSingleThreadScheduledExecutor(
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
                        thread.setDaemon(true);
                        return thread;
                    }
                }
        );
        try {
            updatePeerEurekaNodes(resolvePeerUrls());
            Runnable peersUpdateTask = new Runnable() {
                @Override
                public void run() {
                    try {
                        updatePeerEurekaNodes(resolvePeerUrls());
                    } catch (Throwable e) {
                        logger.error("Cannot update the replica Nodes", e);
                    }
                }
            };
            taskExecutor.scheduleWithFixedDelay(
                    peersUpdateTask,
                    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                    TimeUnit.MILLISECONDS
            );
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
        for (PeerEurekaNode node : peerEurekaNodes) {
            logger.info("Replica node URL:  " + node.getServiceUrl());
        }
    }

实现在updatePeerEurekaNodes()中,通过比对serviceUrl和discoveryServiceUrls

  • 去除不再有用的peer
  • 添加新的peer

定时时间默认是10分钟

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-04-03,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 叔叔的博客 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Eureka Server启动
  • 二、服务注册Register
  • 三、服务续约renew
  • 四、获取注册信息
  • 五、清理服务evict
  • 六、Peer信息的更新
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档