前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >微服务专题|Naocs 源码设计的精髓就在这了,给你一个手撕面试官的机会

微服务专题|Naocs 源码设计的精髓就在这了,给你一个手撕面试官的机会

原创
作者头像
AI码师
修改2021-01-18 14:22:52
6970
修改2021-01-18 14:22:52
举报

微服务专题|Naocs 源码设计的精髓就在这了,给你一个手撕面试官的机会

Nacos 如何扛住高并发读写?

最近经常阅读源码,发现大部分框架在解决并发读写的时候,都会使用使用COW的思想来解决; nacos也不例外。

解决方案

假设我们创建一个map来存储并发数据,我们先看下在并发场景下,从这个map中进行读写会出现什么问题:

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

针对超大的map进行写操作会很耗时,导致其他线程对这个map的读写操作会等待很久;

那么在nacos中,是如何进行解决的呢?

其实nacos处理的思路很简单,我简要概括下,然后跟踪下源码,带大家看看大佬是如何写代码的:

  1. 首先naocs 将内存中的注册列表map 复制一份当到map1
  2. 然后将客户端同步过来的注册key添加到map1中
  3. 处理完所有的key之后,将map1重新复制给内存中的注册列表map中
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

源码跟踪

通过阅读源码,我找到了nacos进行更新注册列表的方法: com.alibaba.nacos.naming.core.Cluster.updateIPs()

代码语言:javascript
复制
  public void updateIPs(List<Instance> ips, boolean ephemeral) {
// 首先判断是需要更新临时注册列表还是持久化的注册列表(这个会在后面讲解ap/cp提到)
        Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;
// 创建一个map,来保存内存中的注册列表
        HashMap<String, Instance> oldIPMap = new HashMap<>(toUpdateInstances.size());
// 遍历注册列表,依次添加到副本中
        for (Instance ip : toUpdateInstances) {
            oldIPMap.put(ip.getDatumKey(), ip);
        }

// 省略处理key的过程
        toUpdateInstances = new HashSet<>(ips);
// 将更新后的注册列表 重新复制到内存注册列表中
        if (ephemeral) {
            ephemeralInstances = toUpdateInstances;
        } else {
            persistentInstances = toUpdateInstances;
        }
    }

作为注册中心的Eureka是怎么实现高并发读写?

在eureka中,使用多级缓存结构来解决高并发读写的问题。 eureka会创建一个只读注册列表和一个读写注册列表: 如果客户端发起注册或退出的时候,eureka会先把最新的注册列表内容更新到读写注册列表中,同时在eureka启动时会创建一个定时任务,定时把读写的注册列表的内容同步到只读注册列表中,客户端在进行服务发现的时候,是从只读注册列表中获取可用的服务列表。

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

Nacos的ap和cp又是怎么回事

在学习分布式相关框架的时候,我们都离不开CAP理论,这里就不过多介绍CAP理论了; 令开发者疑惑的是为什么nacos既能支持ap又能支持cp呢,这在面试过程中经常会被问到。相信大家在看完这篇文章后,应该就可以手撕面试官了。

前言

在nacos中,ap和cp主要体现在集群中如何同步注册信息到其它集群节点的实现方式上; nacos通过ephemeral 字段值来决定是使用ap方式同步还是cp方式同步,默认使用的的ap方式同步注册信息。 通过阅读源码,我们可以找到这段代码,关于如何找到这段代码,后面会在nacos源码解读的文章中讲解: com.alibaba.nacos.naming.core.ServiceManager.addInstance()

代码语言:javascript
复制
    public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {
        // 生成服务的key
        String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
        // 获取服务
        Service service = getService(namespaceId, serviceName);
        // 使用同步锁处理
        synchronized (service) {
            List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);

            Instances instances = new Instances();
            instances.setInstanceList(instanceList);
            // 调用consistencyService.put 处理同步过来的服务
            consistencyService.put(key, instances);
        }
    }

我们在进入到consistencyService.put方法中

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

点击put方法时,会看到有三个实现类,根据上下文(或者debug方式),可以推断出这里引用的是DelegateConsistencyServiceImpl实现类

代码语言:javascript
复制
    @Override
    public void put(String key, Record value) throws NacosException {
        // 进入到这个put方法后,就可以知道应该使用ap方式同步还是cp方式同步
        mapConsistencyService(key).put(key, value);
    }

从下面的方法中 可以判断通过key来判断使用ap还是cp来同步注册信息,其中key是由ephemeral字段组成;

代码语言:javascript
复制
   private ConsistencyService mapConsistencyService(String key) {
        return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
    }

AP 方式同步的流程(ephemeralConsistencyService)

本地服务器处理注册信息&将注册信息同步到其它节点
代码语言:javascript
复制
    @Override
    public void put(String key, Record value) throws NacosException {
        // 处理本地注册列表
        onPut(key, value);
        // 添加阻塞任务,同步信息到其他集群节点
        taskDispatcher.addTask(key);
    }
处理本地注册节点

nacos将key做为一个task,添加到notifer中阻塞队列tasks中,并且使用单线程执行,其中notifer是初始化的时候,作为一个线程被放到线程池中(线程池只设置了一个核心线程);

这里有一个点需要告诉大家:在大多数分布式框架,都会采用单线程的阻塞队列来处理耗时的任务,一方面解决并发问题,另一方面能够解决并发带来的写写冲突问题。

线程中的主要处理逻辑就是,循环读取阻塞队列中的内容,然后处理注册信息,更新到内存注册列表中。

同步注册信息到其他集群节点

nacos同样也是把注册key作为一个task存放到 TaskDispatcher 中的taskShedule阻塞队列中,然后开启线程循环读取阻塞队列:

代码语言:javascript
复制
       @Override
        public void run() {

            List<String> keys = new ArrayList<>();
            while (true) {
                    String key = queue.poll(partitionConfig.getTaskDispatchPeriod(),
                        TimeUnit.MILLISECONDS);
                    // 省略判断代码
                    // 添加同步的key
                    keys.add(key);
                    // 计数
                    dataSize++;
                    // 判断同步的key大小是否等于 批量同步设置的限量 或者 判断据上次同步时间 是否大于 配置的间隔周期,如果满足任意一个,则开始同步
                    if (dataSize == partitionConfig.getBatchSyncKeyCount() ||
                        (System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) {
                        // 遍历所有集群节点,直接调用http进行同步
                        for (Server member : dataSyncer.getServers()) {
                            if (NetUtils.localServer().equals(member.getKey())) {
                                continue;
                            }
                            SyncTask syncTask = new SyncTask();
                            syncTask.setKeys(keys);
                            syncTask.setTargetServer(member.getKey());

                            if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
                                Loggers.DISTRO.debug("add sync task: {}", JSON.toJSONString(syncTask));
                            }

                            dataSyncer.submit(syncTask, 0);
                        }
                        // 记录本次同步时间
                        lastDispatchTime = System.currentTimeMillis();
                        // 计数清零
                        dataSize = 0;
                    }
            }
        }
    }

使用ap方式作同步的过程很简单,但是这里面有两种设计思路来解决单个key同步的问题: 如果有新的key推送上来,nacos就发起一次同步,这会造成网络资源浪费,因为每次同步的就只有一个key或者几个key;

同步少量的key解决方案:
  1. 只有积累到指定数量的key,才发起批量同步
  2. 距离上次同步时间超过配置的限制时间,则忽略key数量,直接发起同步

CP 方式同步的流程(RaftConsistencyServiceImpl)

cp模式追求的是数据一致性,为了数据一致性,那么肯定得选出一个leader,由leader首先同步,然后再由leader通知follower前来获取最新的注册节点(或者主动推送给follower)

nacos使用raft协议来进行选举leader,来实现cp模式。

同样进入到 RaftConsistencyServiceImpl的put方法

代码语言:javascript
复制
    @Override
    public void put(String key, Record value) throws NacosException {
        try {
            raftCore.signalPublish(key, value);
        } catch (Exception e) {
            Loggers.RAFT.error("Raft put failed.", e);
            throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value, e);
        }
    }

进入到raftCore.signalPublish方法中,我提取几个关键的代码

代码语言:javascript
复制
// 首先判断当前nacos节点是否是leader,如果不是leader,则获取leader节点的ip,然后将请求转发到leader处理,否则往下走
if (!isLeader()) {
            JSONObject params = new JSONObject();
            params.put("key", key);
            params.put("value", value);
            Map<String, String> parameters = new HashMap<>(1);
            parameters.put("key", key);

            raftProxy.proxyPostLarge(getLeader().ip, API_PUB, params.toJSONString(), parameters);
            return;
        }

// 同样采用同样队列的方式,去处理本地注册列表

代码语言:javascript
复制
onPublish(datum, peers.local());

public void onPublish(Datum datum, RaftPeer source) throws Exception {
       
        // 添加同步key任务到阻塞队列中
        notifier.addTask(datum.key, ApplyAction.CHANGE);

        Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);
    }

遍历所有集群节点,发送http同步请求

代码语言:javascript
复制
 for (final String server : peers.allServersIncludeMyself()) {
                // 如果是leader,则不进行同步
                if (isLeader(server)) {
                    latch.countDown();
                    continue;
                }
                // 组装url 发送同步请求到其它集群节点
                final String url = buildURL(server, API_ON_PUB);
                HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content, new AsyncCompletionHandler<Integer>() {
                    @Override
                    public Integer onCompleted(Response response) throws Exception {
                        if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
                            Loggers.RAFT.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
                                datum.key, server, response.getStatusCode());
                            return 1;
                        }
                        latch.countDown();
                        return 0;
                    }

                    @Override
                    public STATE onContentWriteCompleted() {
                        return STATE.CONTINUE;
                    }
                });

            }

各个集群节点处理同步请求这里就不过多介绍了,大家可以自行去看哈

微信搜一搜【乐哉开讲】关注帅气的我,回复【干货领取】,将会有大量面试资料和架构师必看书籍等你挑选,包括java基础、java并发、微服务、中间件等更多资料等你来取哦。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 微服务专题|Naocs 源码设计的精髓就在这了,给你一个手撕面试官的机会
    • Nacos 如何扛住高并发读写?
      • 解决方案
      • 源码跟踪
    • 作为注册中心的Eureka是怎么实现高并发读写?
      • Nacos的ap和cp又是怎么回事
        • 前言
        • AP 方式同步的流程(ephemeralConsistencyService)
        • CP 方式同步的流程(RaftConsistencyServiceImpl)
    相关产品与服务
    消息队列 TDMQ
    消息队列 TDMQ (Tencent Distributed Message Queue)是腾讯基于 Apache Pulsar 自研的一个云原生消息中间件系列,其中包含兼容Pulsar、RabbitMQ、RocketMQ 等协议的消息队列子产品,得益于其底层计算与存储分离的架构,TDMQ 具备良好的弹性伸缩以及故障恢复能力。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档