前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊eureka的PeerAwareInstanceRegistryImpl

聊聊eureka的PeerAwareInstanceRegistryImpl

作者头像
code4it
发布2018-09-17 16:40:54
9560
发布2018-09-17 16:40:54
举报

本文主要研究一下eureka的PeerAwareInstanceRegistryImpl

EurekaServerAutoConfiguration

@Configuration
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
        InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {
    //......
    @Bean
    public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
            ServerCodecs serverCodecs) {
        this.eurekaClient.getApplications(); // force initialization
        return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
                serverCodecs, this.eurekaClient,
                this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(),
                this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
    }

    @Bean
    @ConditionalOnMissingBean
    public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,
            ServerCodecs serverCodecs) {
        return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig,
                this.eurekaClientConfig, serverCodecs, this.applicationInfoManager);
    }

    @Bean
    public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,
            PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
        return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,
                registry, peerEurekaNodes, this.applicationInfoManager);
    }

    @Bean
    public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
            EurekaServerContext serverContext) {
        return new EurekaServerBootstrap(this.applicationInfoManager,
                this.eurekaClientConfig, this.eurekaServerConfig, registry,
                serverContext);
    }
    //......
}

这里主要关注PeerAwareInstanceRegistry,EurekaClient的配置见EurekaClientAutoConfiguration

InstanceRegistry

spring-cloud-netflix-eureka-server-2.0.0.RC1-sources.jar!/org/springframework/cloud/netflix/eureka/server/InstanceRegistry.java

public class InstanceRegistry extends PeerAwareInstanceRegistryImpl
        implements ApplicationContextAware {
        //......
    private void handleCancelation(String appName, String id, boolean isReplication) {
        log("cancel " + appName + ", serverId " + id + ", isReplication " + isReplication);
        publishEvent(new EurekaInstanceCanceledEvent(this, appName, id, isReplication));
    }

    private void handleRegistration(InstanceInfo info, int leaseDuration,
            boolean isReplication) {
        log("register " + info.getAppName() + ", vip " + info.getVIPAddress()
                + ", leaseDuration " + leaseDuration + ", isReplication "
                + isReplication);
        publishEvent(new EurekaInstanceRegisteredEvent(this, info, leaseDuration,
                isReplication));
    }

    private void log(String message) {
        if (log.isDebugEnabled()) {
            log.debug(message);
        }
    }

    private void publishEvent(ApplicationEvent applicationEvent) {
        this.ctxt.publishEvent(applicationEvent);
    }
}

这个继承了PeerAwareInstanceRegistryImpl,重写方法主要是在执行之前发布相关的事件

PeerAwareInstanceRegistryImpl

eureka-core-1.8.8-sources.jar!/com/netflix/eureka/registry/PeerAwareInstanceRegistryImpl.java

@Singleton
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {
    //......
}

这个类实现了InstanceRegistry的openForTraffic、shutdown、statusUpdate、deleteStatusOverride等方法,以及PeerAwareInstanceRegistry的init、syncUp、shouldAllowAccess、register(InstanceInfo info, boolean isReplication)、statusUpdate(final String asgName, final ASGResource.ASGStatus newStatus, final boolean isReplication)等方法。 而AbstractInstanceRegistry主要是实现了LeaseManager的关于register、cancel、renew、evict等基本操作,以及LookupService的一些查询操作 PeerAwareInstanceRegistryImpl主要是在AbstractInstanceRegistry的基础上,新增了peer相关的处理以及RenewalThreshold的更新

replicateToPeers

如下图所示,PeerAwareInstanceRegistryImpl重写了cancel、register、renew、statusUpdate、deleteStatusOverride方法,首先调用super的操作,然后调用replicateToPeers

    /**
     * Replicates all eureka actions to peer eureka nodes except for replication
     * traffic to this node.
     *
     */
    private void replicateToPeers(Action action, String appName, String id,
                                  InstanceInfo info /* optional */,
                                  InstanceStatus newStatus /* optional */, boolean isReplication) {
        Stopwatch tracer = action.getTimer().start();
        try {
            if (isReplication) {
                numberOfReplicationsLastMin.increment();
            }
            // If it is a replication already, do not replicate again as this will create a poison replication
            if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                return;
            }

            for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
                // If the url represents this host, do not replicate to yourself.
                if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                    continue;
                }
                replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
            }
        } finally {
            tracer.stop();
        }
    }

这个方法就是把instance信息复制给eureka server peers。其中isReplication用来标识这次请求是不是其他节点复制过来的。 可以看到如果peerEurekaNodes为空,或者isReplication为true的话,则不继续往下

replicateInstanceActionsToPeers

    /**
     * Replicates all instance changes to peer eureka nodes except for
     * replication traffic to this node.
     *
     */
    private void replicateInstanceActionsToPeers(Action action, String appName,
                                                 String id, InstanceInfo info, InstanceStatus newStatus,
                                                 PeerEurekaNode node) {
        try {
            InstanceInfo infoFromRegistry = null;
            CurrentRequestVersion.set(Version.V2);
            switch (action) {
                case Cancel:
                    node.cancel(appName, id);
                    break;
                case Heartbeat:
                    InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                    break;
                case Register:
                    node.register(info);
                    break;
                case StatusUpdate:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                    break;
                case DeleteStatusOverride:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.deleteStatusOverride(appName, id, infoFromRegistry);
                    break;
            }
        } catch (Throwable t) {
            logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
        }
    }

这里根据不同的Action来调用PeerEurekaNode的不同方法

PeerEurekaNode

eureka-core-1.8.8-sources.jar!/com/netflix/eureka/cluster/PeerEurekaNode.java

HttpReplicationClient

eureka-core-1.8.8-sources.jar!/com/netflix/eureka/cluster/PeerEurekaNodes.java

    protected PeerEurekaNode createPeerEurekaNode(String peerEurekaNodeUrl) {
        HttpReplicationClient replicationClient = JerseyReplicationClient.createReplicationClient(serverConfig, serverCodecs, peerEurekaNodeUrl);
        String targetHost = hostFromUrl(peerEurekaNodeUrl);
        if (targetHost == null) {
            targetHost = "host";
        }
        return new PeerEurekaNode(registry, targetHost, peerEurekaNodeUrl, replicationClient, serverConfig);
    }

eureka-core-1.8.8-sources.jar!/com/netflix/eureka/transport/JerseyReplicationClient.java

public class JerseyReplicationClient extends AbstractJerseyEurekaHttpClient implements HttpReplicationClient {

    private static final Logger logger = LoggerFactory.getLogger(JerseyReplicationClient.class);

    private final EurekaJerseyClient jerseyClient;
    private final ApacheHttpClient4 jerseyApacheClient;

    public JerseyReplicationClient(EurekaJerseyClient jerseyClient, String serviceUrl) {
        super(jerseyClient.getClient(), serviceUrl);
        this.jerseyClient = jerseyClient;
        this.jerseyApacheClient = jerseyClient.getClient();
    }

    @Override
    protected void addExtraHeaders(Builder webResource) {
        webResource.header(PeerEurekaNode.HEADER_REPLICATION, "true");
    }

//......
}

这里显示设置了PeerEurekaNode.HEADER_REPLICATION的值为true,也就是标识从这个client触发的请求都是属于replication行为的,告诉目标eureka server不用再replicate避免死循环。

InstanceResource

eureka-core-1.8.8-sources.jar!/com/netflix/eureka/resources/InstanceResource.java

    /**
     * Handles cancellation of leases for this particular instance.
     *
     * @param isReplication
     *            a header parameter containing information whether this is
     *            replicated from other nodes.
     * @return response indicating whether the operation was a success or
     *         failure.
     */
    @DELETE
    public Response cancelLease(
            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
        try {
            boolean isSuccess = registry.cancel(app.getName(), id,
                "true".equals(isReplication));

            if (isSuccess) {
                logger.debug("Found (Cancel): {} - {}", app.getName(), id);
                return Response.ok().build();
            } else {
                logger.info("Not Found (Cancel): {} - {}", app.getName(), id);
                return Response.status(Status.NOT_FOUND).build();
            }
        } catch (Throwable e) {
            logger.error("Error (cancel): {} - {}", app.getName(), id, e);
            return Response.serverError().build();
        }

    }

这里以cancel为例,再串一下replication。这里可以发现把PeerEurekaNode.HEADER_REPLICATION的值传递到registry.cancel方法,而这个registry就是PeerAwareInstanceRegistryImpl,这样就循环串起来了。

小结

PeerAwareInstanceRegistryImpl主要是在AbstractInstanceRegistry的基础上,新增了peer相关的处理以及RenewalThreshold的更新。replicate给其他peer的时候传递了PeerEurekaNode.HEADER_REPLICATION为true的header,表示这个是replicate请求,这样接收方就不会再replicate给他的peer。

doc

  • 聊聊springcloud的EurekaClientAutoConfiguration
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-05-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • EurekaServerAutoConfiguration
  • InstanceRegistry
  • PeerAwareInstanceRegistryImpl
    • replicateToPeers
      • replicateInstanceActionsToPeers
      • PeerEurekaNode
        • HttpReplicationClient
        • InstanceResource
        • 小结
        • doc
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档