前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Nacos8# 集群中节点之间健康检查

Nacos8# 集群中节点之间健康检查

作者头像
瓜农老梁
发布2021-07-14 11:11:01
2.2K0
发布2021-07-14 11:11:01
举报
文章被收录于专栏:瓜农老梁瓜农老梁

引言

当新的节点加入集群或者集群中有节点下线了,集群之间可以通过健康检查发现。健康检查的频率是怎么样的?节点的状态又是如何变动的?状态的变动又会触发什么动作。带着这些问题本文捋一捋。

一、内容提要

内容提要

健康检查

  • Nacos节点会向集群其他节点发送健康检查心跳,每一轮频率为2秒
  • 当健康检查异常时设置为不信任「SUSPICIOUS」状态,超过失败最大次数3次设置为下线「DOWN」状态
  • 健康检查成功设置该节点为科通信「UP」状态
  • 无论成功还是失败当节点状态变更时均发布MembersChangeEvent事件

成员变更事件

  • 当集群节点成员变更时,MemberChangeListener会收到该事件
  • 例如回调ClusterRpcClientProxy#onEvent触发refresh
  • 刷新本节点与集群中其他节点的RPC状态,关闭无效的或者增加新的RPC连接

二、健康检查

代码翻到ServerMemberManager#onApplicationEvent,在Nacos启动的时候会启动一个定时任务,第一次延迟5秒执行,该定时任务即负责节点之间的心跳。

代码语言:javascript
复制
@Override
public void onApplicationEvent(WebServerInitializedEvent event) {
    getSelf().setState(NodeState.UP);
    if (!EnvUtil.getStandaloneMode()) { // 注解@1
        GlobalExecutor.scheduleByCommon(this.infoReportTask, 5_000L);
    }
    EnvUtil.setPort(event.getWebServer().getPort());
    EnvUtil.setLocalAddress(this.localAddress);
    Loggers.CLUSTER.info("This node is ready to provide external services");
}

注解@1 非单机模式延迟5秒执行,执行的infoReportTask为MemberInfoReportTask。

代码语言:javascript
复制
public abstract class Task implements Runnable {
    
    protected volatile boolean shutdown = false;
    
    @Override
    public void run() { // 注解@2
        if (shutdown) {
            return;
        }
        try {
            executeBody();
        } catch (Throwable t) {
            Loggers.CORE.error("this task execute has error : {}", ExceptionUtil.getStackTrace(t));
        } finally {
            if (!shutdown) {
                after();
            }
        }
    }
  
}

注解@2 看下这个Task执行逻辑,先执行 executeBody(),执行结束后执行after()。

代码语言:javascript
复制
class MemberInfoReportTask extends Task {

    private final GenericType<RestResult<String>> reference = new GenericType<RestResult<String>>() {
    };

    private int cursor = 0;

    @Override
    protected void executeBody() {
        // ----------注解@1 start---------------
       // 获取集群中除了自身以外的其他节点列表
        List<Member> members = ServerMemberManager.this.allMembersWithoutSelf();
        if (members.isEmpty()) {
            return;
        }
        // 定义一个游标
        this.cursor = (this.cursor + 1) % members.size();
        // 获取每个节信息
        Member target = members.get(cursor);
    //-----------注解@1 end-----------------
        Loggers.CLUSTER.debug("report the metadata to the node : {}", target.getAddress());
        // 注解@2
        final String url = HttpUtils
                .buildUrl(false, target.getAddress(), EnvUtil.getContextPath(), Commons.NACOS_CORE_CONTEXT,
                        "/cluster/report");
        try {
           // 注解@3
            asyncRestTemplate
                    .post(url, Header.newInstance().addParam(Constants.NACOS_SERVER_HEADER, VersionUtils.version),
                            Query.EMPTY, getSelf(), reference.getType(), new Callback<String>() { 
                                @Override
                                public void onReceive(RestResult<String> result) { // 注解@4
                                    // 注解@5 返回版本不一致
                                    if (result.getCode() == HttpStatus.NOT_IMPLEMENTED.value()
                                            || result.getCode() == HttpStatus.NOT_FOUND.value()) {
                                        // ...
                                        Member memberNew = target.copy();
                                        if (memberNew.getAbilities() != null
                                                && memberNew.getAbilities().getRemoteAbility() != null && memberNew
                                                .getAbilities().getRemoteAbility().isSupportRemoteConnection()) {
                                            memberNew.getAbilities().getRemoteAbility()
                                                    .setSupportRemoteConnection(false);
                                            update(memberNew); // 更新节点属性
                                        }
                                        return;
                                    }
                                    // 注解@6
                                    if (result.ok()) {
                                        MemberUtil.onSuccess(ServerMemberManager.this, target);
                                    } else {
                                     // 注解@7 处理失败上报
                                        MemberUtil.onFail(ServerMemberManager.this, target);
                                    }
                                }

                                @Override
                                public void onError(Throwable throwable) {
                                   // 注解@8 处理失败上报
                                     MemberUtil.onFail(ServerMemberManager.this, target, throwable);
                                }

                                @Override
                                public void onCancel() {

                                }
                            });
        } catch (Throwable ex) {
            // ...
        }
    }

    @Override
    protected void after() {
        GlobalExecutor.scheduleByCommon(this, 2_000L); // 注解@9
    }
}

注解@1 获取集群中除了自身以外的其他节点列表,通过游标循环每个节点。

注解@2 构造每个节点的上报url请求路径为「/cluster/report」

注解@3 发起Post健康检查请求,请求内容为自身信息Member

注解@4 处理健康检查返回结果,有以下三种类型

注解@5 版本过低错误,这个可能在集群中版本不一致出现

注解@6 处理成功上报,更新该节点member的状态为UP表示科通信,设置失败次数为0,并发布成员变更事件

代码语言:javascript
复制
public static void onSuccess(final ServerMemberManager manager, final Member member) {
    final NodeState old = member.getState();
    manager.getMemberAddressInfos().add(member.getAddress());
    member.setState(NodeState.UP); // 状态为UP可通信状态
    member.setFailAccessCnt(0); // 失败次数为0
    if (!Objects.equals(old, member.getState())) {
        manager.notifyMemberChange(); // 发布成员变更事件
    }
}

注解@7&注解@8 均为处理失败的上报,例如:集群中一个节点被kill -9 杀掉后。在nacos-cluster.log日志文件中会打印如下日志,并发布成员变更事件

代码语言:javascript
复制
2021-07-0x 16:30:24,994 ERROR failed to report new info to target node : x.x.x.x:8848, error : caused: Connection refused;

:2021-07-0x 16:30:30,995 ERROR failed to report new info to target node : x.x.x.x:8848, error : caused: Connection refused;
代码语言:javascript
复制
public static void onFail(final ServerMemberManager manager, final Member member, Throwable ex) {
    manager.getMemberAddressInfos().remove(member.getAddress());
    final NodeState old = member.getState();

    // 设置该节点为「不信任」
    member.setState(NodeState.SUSPICIOUS);
    // 失败次数递增+1
    member.setFailAccessCnt(member.getFailAccessCnt() + 1);
    // 默认最大失败重试次数为3
    int maxFailAccessCnt = EnvUtil.getProperty("nacos.core.member.fail-access-cnt", Integer.class, 3);

    // If the number of consecutive failures to access the target node reaches
    // a maximum, or the link request is rejected, the state is directly down
    // 超过重试次数设置节点状态为「下线」
    if (member.getFailAccessCnt() > maxFailAccessCnt || StringUtils
            .containsIgnoreCase(ex.getMessage(), TARGET_MEMBER_CONNECT_REFUSE_ERRMSG)) {
        member.setState(NodeState.DOWN);
    }

    if (!Objects.equals(old, member.getState())) {
        manager.notifyMemberChange(); // 发布成员变更事件
    }
}

被kill -9 杀掉的节点显示状态为下线DOWN

注解@9 执行完executeBody后延迟2秒继续执行executeBody,也就是检查健康检查的心跳频率为2秒,一轮全部节点检查结束后延迟2秒接着下一轮

无论检查成功还是失败,当节点状态变更时,发布成员变更事件。

代码语言:javascript
复制
if (!Objects.equals(old, member.getState())) {
    manager.notifyMemberChange();
}

void notifyMemberChange() {
    NotifyCenter.publishEvent(MembersChangeEvent.builder().members(allMembers()).build());
}

小结: Nacos节点会向集群其他节点发送健康检查心跳,每一轮频率为2秒;当健康检查异常时设置为不信任「SUSPICIOUS」状态,超过失败最大次数3次设置为下线「DOWN」状态;健康检查成功设置该节点为科通信「UP」状态;无论成功还是失败当节点状态变更时均发布MembersChangeEvent事件。

三、成员变更事件

当集群中有节点下线或者新节点上线都会通过心跳健康检查探测对节点状态进行改变。而状态的变更均会触发成员变更事件MembersChangeEvent。那订阅到这个事件干啥呢?

ClusterRpcClientProxy继承了MemberChangeListener,当有MembersChangeEvent事件时会回调其onEvent方法。

代码语言:javascript
复制
@Override
public void onEvent(MembersChangeEvent event) {
    try {
        List<Member> members = serverMemberManager.allMembersWithoutSelf();
        refresh(members);
    } catch (NacosException e) {
        // ...
    }
}

那接着看refresh方法。

代码语言:javascript
复制
private void refresh(List<Member> members) throws NacosException {

    for (Member member : members) {
        if (MemberUtil.isSupportedLongCon(member)) {
            // 注解@10
            createRpcClientAndStart(member, ConnectionType.GRPC);
        }
    }
    Set<Map.Entry<String, RpcClient>> allClientEntrys = RpcClientFactory.getAllClientEntries();
    Iterator<Map.Entry<String, RpcClient>> iterator = allClientEntrys.iterator();
    List<String> newMemberKeys = members.stream().filter(a -> MemberUtil.isSupportedLongCon(a))
            .map(a -> memberClientKey(a)).collect(Collectors.toList());
    // 关闭旧的grpc连接
    while (iterator.hasNext()) {
        Map.Entry<String, RpcClient> next1 = iterator.next();
        if (next1.getKey().startsWith("Cluster-") && !newMemberKeys.contains(next1.getKey())) {
            Loggers.CLUSTER.info("member leave,destroy client of member - > : {}", next1.getKey());
            RpcClientFactory.getClient(next1.getKey()).shutdown();
            iterator.remove();
        }
    }

}

注解@10 为集群中每个节点member创建rcp client,在client启动时会先目标节点发送HealthCheckRequest,如果非健康节点将会被移除。见RpcClient类部分代码。

代码语言:javascript
复制
boolean isHealthy = healthCheck();
// 非健康节点
if (!isHealthy) {
    if (currentConnection == null) {
        continue;
    }
    LoggerUtils.printIfInfoEnabled(LOGGER,
            "[{}]Server healthy check fail,currentConnection={}", name,
            currentConnection.getConnectionId());
    // 标记客户端状态为unhealthy
    rpcClientStatus.set(RpcClientStatus.UNHEALTHY);
    // 重置ReconnectContext移除serverInfo
    reconnectContext = new ReconnectContext(null, false);

这个意味着如果集群中有节点下线,与下线节点的rpc将会失效;同样如果集群中有新节点加入将会建立新的rpc通道。

小结: 当集群节点成员变更时,MemberChangeListener会收到该事件。例如回调ClusterRpcClientProxy#onEvent触发refresh。刷新本节点与集群中其他节点的RPC状态,关闭无效的或者增加新的RPC连接。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-07-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 瓜农老梁 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 健康检查
  • 成员变更事件
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档