前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Nacos配置中心原理

Nacos配置中心原理

作者头像
spilledyear
发布2019-12-30 17:29:34
3.8K0
发布2019-12-30 17:29:34
举报
文章被收录于专栏:小白鼠小白鼠

使用示例

代码语言:javascript
复制
@Before
public void init() throws NacosException {
    Properties properties = new Properties();
    properties.put(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:" + 8848);
    properties.put(PropertyKeyConst.CONFIG_LONG_POLL_TIMEOUT, "20000");
    properties.put(PropertyKeyConst.CONFIG_RETRY_TIME, "3000");
    properties.put(PropertyKeyConst.MAX_RETRY, "5");
    configService = NacosFactory.createConfigService(properties);
}

@Test
public void test() throws InterruptedException, NacosException {

    configService.addListener("test", "DEFAULT_GROUP", new Listener() {
        @Override
        public Executor getExecutor() {
            return null;
        }

        @Override
        public void receiveConfigInfo(String configInfo) {
            System.out.println(configInfo);
        }
    });

    TimeUnit.SECONDS.sleep(100);
}

NacosConfigService

这是Nacos给客户端提供的API,可以通过该API:增、删、盖、查配置信息,还可以通过该API给配置添加Listener

创建ConfigService

代码语言:javascript
复制
// NacosFactory#createConfigService
public static ConfigService createConfigService(Properties properties) throws NacosException {
    return ConfigFactory.createConfigService(properties);
}

// ConfigFactory#createConfigService
public static ConfigService createConfigService(Properties properties) throws NacosException {
    try {
        Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");
        Constructor constructor = driverImplClass.getConstructor(Properties.class);
        // 反射创建对象
        ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties);
        return vendorImpl;
    } catch (Throwable e) {
        throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
    }
}

通过反射的机制创建了一个NacosConfigService实例,它的构造函数如下

代码语言:javascript
复制
public NacosConfigService(Properties properties) throws NacosException {
    String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
    if (StringUtils.isBlank(encodeTmp)) {
        encode = Constants.ENCODE;
    } else {
        encode = encodeTmp.trim();
    }
    initNamespace(properties);
    agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
    agent.start();
    worker = new ClientWorker(agent, configFilterChainManager, properties);
}
  1. MetricsHttpAgent主要是用来记录一些Metric信息,内部持有一个ServerHttpAgent对象,所以主要逻辑还是关注ServerHttpAgent
  2. ClientWorker是一个非常核心的类,里面封装了客户端从服务端主动PULL配置信息的关键逻辑,这个在下面重点介绍

ServerHttpAgent

代码语言:javascript
复制
public ServerHttpAgent(Properties properties) throws NacosException {
    serverListMgr = new ServerListManager(properties);
    init(properties);
}

ServerHttpAgent内部主要就是封装了一个HTTP交互的通用方法,比如GETPUTDELETE等方法,在ServerHttpAgent构造函数中,主要做了两件事情:

  1. 创建ServerListManager对象,这个对象的主要作用就是根据Properties解析出服务端的地址,然后维护在一个List<String>
  2. init方法中主要做了三件事情:初始化编码格式,如果没有就默认UTF-8;初始化accessKeysecretKey,这个应该是用来验证客户端的身份的;初始化重试次数,如果没传默认为3

ServerListManager对象中,还有一个比较重要的属性isFixed,这个属性的主要作用就是标识服务器的地址信息是不是固定的,比如,如果服务器的地址是通过Properties传入,那isFixed的值为true

为什么要介绍这个属性呢,因为在NacosConfigService的构造函数中,调用了MetricsHttpAgent#start方法,而这个方法的内部调用链如下

代码语言:javascript
复制
MetricsHttpAgent#start =>
ServerHttpAgent#start =>
ServerListManager#start

ServerListManager#start方法主要做了什么事情呢?如果isFixed的值为true,就直接返回。否则先执行initServerlistRetryTimesGetServerListTask#run方法获取,该方法用于更新服务器地址信息,如果执行initServerlistRetryTimes次 之后还是没有获取到服务器地址列表信息,则直接抛出异常,否则开启一个定时任务,每30s更新一次服务器地址列表信息。

至此,在NacosConfigService构造函数中,只剩下ClientWorker相关的逻辑了

ClientWorker

前面已经提到过,ClientWorker是一个非常核心的类,里面封装了客户端从服务端主动pull配置信息的关键逻辑。注意这里很明确指出了ClientWorker是通过pull模式从服务端获取配置信息的,而我们在使用的时候通常会给它添加Listener,这 会让我们以为它是push模式,这是一点需要注意的地方。

而它实现的pull模式的关键点在于两个线程池,这两个线程池在ClientWorker的构造函数中初始化。其中有一个线程池executor里面只有一个线程,它的作用就是让另一个线程池启动。

代码语言:javascript
复制
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {
    init(properties);

    executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
            t.setDaemon(true);
            return t;
        }
    });

    executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
            t.setDaemon(true);
            return t;
        }
    });

    executor.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            try {
                checkConfigInfo();
            } catch (Throwable e) {
                LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
            }
        }
    }, 1L, 10L, TimeUnit.MILLISECONDS);
}
  1. init(properties)方法主要就是初始化一些timeout的参数
  2. executor线程池里面只有一个线程,它的唯一作用就是让另一个线程池开始执行,每10s中执行一次
  3. executorService线程池用于更新配置信息,核心任务LongPollingRunnable

ClientWorker#checkConfigInfo方法主要作用是更新配置信息,目前已经获取到的配置信息会缓存到一个Map<String, CacheData>中,然后对map中的数据分批次,一个批次默认是3000条数据,每个批次的数据对应一个线程负责更新,如下:

代码语言:javascript
复制
public void checkConfigInfo() {
    // 分任务
    int listenerSize = cacheMap.get().size();
    // 向上取整为批数
    int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
    if (longingTaskCount > currentLongingTaskCount) {
        for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
            // 要判断任务是否在执行 这块需要好好想想。 任务列表现在是无序的。变化过程可能有问题
            executorService.execute(new LongPollingRunnable(i));
        }
        currentLongingTaskCount = longingTaskCount;
    }
}

LongPollingRunnable的名字可以看出,客户端主要是通过长轮询的方式去更新配置信息

LongPollingRunnable

代码语言:javascript
复制
public void run() {
    // 本批次号的数据
    List<CacheData> cacheDatas = new ArrayList<CacheData>();
    // 本批次号中 isInitializing=true 的数据,CacheData首次出现在Map中并且是首次check更新时,isInitializing的值才为true
    List<String> inInitializingCacheList = new ArrayList<String>();
    try {
        // check failover config
        for (CacheData cacheData : cacheMap.get().values()) {
            if (cacheData.getTaskId() == taskId) {
                cacheDatas.add(cacheData);
                try {
                    // 涉及到FailoverFile
                    checkLocalConfig(cacheData);
                    // 如果有更新,需要更新Listener的MD5值,并执行Listener逻辑
                    if (cacheData.isUseLocalConfigInfo()) {
                        cacheData.checkListenerMd5();
                    }
                } catch (Exception e) {
                    LOGGER.error("get local config info error", e);
                }
            }
        }

        // check server config 
        // 找到isUseLocalConfig=false的数据,然后将每个符合条件的CacheData的 `group + dataId + tenant` 拼成一个字符串传给服务端校验,然后服务端会返回一个需要更新的`List<String>`,该列表里面的每个元素代表一个CacheData的key
        List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);

        for (String groupKey : changedGroupKeys) {
            String[] key = GroupKey.parseKey(groupKey);
            String dataId = key[0];
            String group = key[1];
            String tenant = null;
            if (key.length == 3) {
                tenant = key[2];
            }
            try {
                // 根据需要更新的key列表,从服务端获取配置信息
                String content = getServerConfig(dataId, group, tenant, 3000L);
                CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
                cache.setContent(content);
            } catch (NacosException ioe) {
            }
        }
        
        // 这部分代码是在没看懂
        for (CacheData cacheData : cacheDatas) {
            if (!cacheData.isInitializing() || inInitializingCacheList.contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
                cacheData.checkListenerMd5();
                cacheData.setInitializing(false);
            }
        }
        inInitializingCacheList.clear();

        // 重新提交自己
        executorService.execute(this);

    } catch (Throwable e) {
        // If the rotation training task is abnormal, the next execution time of the task will be punished
        LOGGER.error("longPolling error : ", e);
        executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
    }
}

LongPollingRunnable#run方法比较长:

  1. 从map中取出批次号符与该线程对应批次号相同的数据,因为每条配置信息CacheData都维护了一个批次号信息,然后每个LongPollingRunnable也对应一个批次号信息,只需要负责更新批次号相同的数据,不同的由别的LongPollingRunnable去更新,这部分据放在变量cacheDatas
  2. 如果客户端和服务端部署在同一个节点,此时客户端可以直接从本地文件中获取配置信息,避免远程交互,设置这部分数据的isUseLocalConfig=true,并更新CacheData的值,同时更新CacheData的MD5值
  3. 如果步骤2可以直接根据本地文件更新值isUseLocalConfig==true,此时执行cacheData#checkListenerMd5方法,该方法会拿CacheData的MD5值和Listener的MD5值对比,如果不一样就更新Listener的MD5值并回调Listener的相关方法
  4. 排除isUseLocalConfig=true的配置信息,然后将每个符合条件的配置信息的 group + dataId +tenant 拼成一个字符串传给服务端校验,然后服务端会返回一个需要更新的List<String>,该列表里面的每个元素代表一个CacheData的key
  5. 根据上一步骤返回的key列表,从服务端拉取配置信息,然后更新CacheData的值
  6. cacheData.isInitializing()代表该条配置信息首次出现在map中并且是首次更新,inInitializingCacheList代表本批次中isUseLocalConfig==false && isInitializing == true
  7. 最后那个遍历不太清楚啥意思,isInitializing == false || isInitializing == true

配置监听器

上面提到ClientWorker中有一个Map,那里面的数据从哪里来呢?为什么要存这部分数据呢?答案是在为CacheData添加Listener的时候,会初始化一条CacheData数据,并添加到Map中

添加监听

代码语言:javascript
复制
ConfigService configService = NacosFactory.createConfigService(properties);
configService.addListener("test", "DEFAULT_GROUP", new Listener() {
    @Override
    public Executor getExecutor() {
        return null;
    }

    @Override
    public void receiveConfigInfo(String configInfo) {
        System.out.println(configInfo);
    }
});

源码

代码语言:javascript
复制
// NacosConfigService#addListener
public void addListener(String dataId, String group, Listener listener) throws NacosException {
    worker.addTenantListeners(dataId, group, Arrays.asList(listener));
}

// ClientWorker#addTenantListeners
public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners) throws NacosException {
    group = null2defaultGroup(group);
    String tenant = agent.getTenant();
    CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);
    for (Listener listener : listeners) {
        cache.addListener(listener);
    }
}

内部还是委托给ClientWorker来实现,添加监时主要做了以下几件事情

  1. dataId group tenant封装成key,从ClientWorker的Map缓存中获取CacheData 1.1. 如果从缓存中拿到了数据,将CacheDataisInitializing属性设置为true 1.2. 如果从缓存中没拿到数据,则创建一个CacheData,同时判断是否开启远程同步,即enableRemoteSyncConfig属性的值是否为true。如果开启了,则从服务端获取该配置的值,并设置到CacheData
  2. 将最新的CacheData设置到缓存中
  3. Listener添加到CacheDataCopyOnWriteArrayList<ManagerListenerWrap> listeners列表中

触发监听

其实在上面已经提到过,当配置有更新时,会触发Listener的回调逻辑,这部分逻辑在CacheData#checkListenerMd5方法中

代码语言:javascript
复制
void checkListenerMd5() {
    // `ManagerListenerWrap`只是一个包装类,里面维护了`Listener`和对应`CacheData`的MD5值
    for (ManagerListenerWrap wrap : listeners) {
        // 如果不一样,说明配置有变更,此时更新ManagerListenerWrap的MD5值,并执行Listener的回调
        if (!md5.equals(wrap.lastCallMd5)) {
            safeNotifyListener(dataId, group, content, md5, wrap);
        }
    }
}

private void safeNotifyListener(final String dataId, final String group, final String content, final String md5, final ManagerListenerWrap listenerWrap) {
    final Listener listener = listenerWrap.listener;

    Runnable job = new Runnable() {
        @Override
        public void run() {
            try {
                ConfigResponse cr = new ConfigResponse();
                cr.setDataId(dataId);
                cr.setGroup(group);
                cr.setContent(content);
                configFilterChainManager.doFilter(null, cr);
                String contentTmp = cr.getContent();
                listener.receiveConfigInfo(contentTmp);
                listenerWrap.lastCallMd5 = md5;
            } finally {
                Thread.currentThread().setContextClassLoader(myClassLoader);
            }
        }
    };

    final long startNotify = System.currentTimeMillis();
    try {
        if (null != listener.getExecutor()) {
            // 如果配置了线程池,交给线程池执行
            listener.getExecutor().execute(job);
        } else {
            job.run();
        }
    } catch (Throwable t) {
    }
}
  1. ManagerListenerWrap只是一个包装类,里面维护了Listener和对应CacheData的MD5值
  2. 判断ManagerListenerWrap和当前CacheData的MD5值是否相同,如果不一样,说明配置有变更,此时需要更新ManagerListenerWrap的MD5值,并执行Listener的回调
  3. 更新ManagerListenerWrap的MD5值和执行Listener的回调的逻辑都在safeNotifyListener方法中,同时会判断是否为Listener配置了线程池如,没有就直接执行,有就交给线程池执行

创建配置

代码语言:javascript
复制
ConfigService configService = NacosFactory.createConfigService(properties);
configService.publishConfig("one", "LSZ", "大美女");

源码

代码语言:javascript
复制
private boolean publishConfigInner(String tenant, String dataId, String group, String tag, String appName,
                                    String betaIps, String content) throws NacosException {
    ...... 一大坨构造参数的代码 ......

    HttpResult result = null;
    try {
        // 推送到服务端
        result = agent.httpPost(url, headers, params, encode, POST_TIMEOUT);
    } catch (IOException ioe) {
        return false;
    }
    ......
}

获取配置

代码语言:javascript
复制
ConfigService configService = NacosFactory.createConfigService(properties);
configService.getConfig("testlsz", "lszgroup", 1000000);

源码

代码语言:javascript
复制
private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {
    ......
    // 优先使用本地配置
    String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
    if (content != null) {
        return content;
    }

    try {
        // 本地没有从服务器获取
        content = worker.getServerConfig(dataId, group, tenant, timeoutMs);

        cr.setContent(content);

        configFilterChainManager.doFilter(null, cr);
        content = cr.getContent();

        return content;
    } catch (NacosException ioe) {
    }
    ......
}

删除配置

代码语言:javascript
复制
ConfigService configService = NacosFactory.createConfigService(properties);
configService.removeConfig("testlsz", "lszgroup");

源码

代码语言:javascript
复制
private boolean removeConfigInner(String tenant, String dataId, String group, String tag) throws NacosException {
    ...... 一大坨构造参数的代码 ......
    HttpResult result = null;
    try {
        // 直接发送http请求
        result = agent.httpDelete(url, null, params, encode, POST_TIMEOUT);
    } catch (IOException ioe) {
        LOGGER.warn("[remove] error, " + dataId + ", " + group + ", " + tenant + ", msg: " + ioe.toString());
        return false;
    }
    ......
}

本地文件

在Naco中涉及到两个文件,FailoverFileSnapshotFile

  1. FailoverFile为容灾文件,当本地和数据库里面数据不一致的时候会去使用,一般不会用;
  2. SnapshotFile为配置的快照,当获取不到服务器上的配置的时候,会读取本地快照; FailoverFile在客户端不会自动生成,它是在服务端生成的,当更新了一条配置之后,就会反应到这个文件中。所以如果想在客户端使用到这个功能,需要手工将文件添加到客户端,然后客户端就不会去读取服务端的配置了,也许某些场景下可以用到

SnapshotFile

SnapshotFile文件位置:\userhome\nacos\config\fixed-127.0.0.1_8848_nacos\snapshot\{group}\{dataId}

当客户端从服务端获取配置之后,会将该信息写入快照文件中,核心代码就在ClientWorker#getServerConfig

代码语言:javascript
复制
public String getServerConfig(String dataId, String group, String tenant, long readTimeout) throws NacosException {
    HttpResult result = null;
    try {
        // 从服务端获取配置信息
        result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout);
    } catch (IOException e) {
        throw new NacosException(NacosException.SERVER_ERROR, e);
    }

    switch (result.code) {
        case HttpURLConnection.HTTP_OK:
            // 将配置更新到本地文件
            LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.content);
            return result.content;
        case HttpURLConnection.HTTP_NOT_FOUND:
            // 根据 dataId、group、tenant 将本地的文件删除
            LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, null);
            return null;
        case HttpURLConnection.HTTP_CONFLICT: {
            throw new NacosException(NacosException.CONFLICT,"data being modified, dataId=" + dataId + ", group=" + group + ",tenant=" + tenant);
        }
        case HttpURLConnection.HTTP_FORBIDDEN: {
            LOGGER.error("[{}] [sub-server-error] no right, dataId={}, group={}, tenant={}", agent.getName(), dataId, group, tenant);
            throw new NacosException(result.code, result.content);
        }
        default: {
            LOGGER.error("[{}] [sub-server-error]  dataId={}, group={}, tenant={}, code={}", agent.getName(), dataId, group, tenant, result.code);
            throw new NacosException(result.code, "http error, code=" + result.code + ",dataId=" + dataId + ",group=" + group + ",tenant=" + tenant);
        }
    }
}

FailoverFile

FailoverFile文件位置:\userhome\nacos\config\fixed-127.0.0.1_8848_nacos\data\config-data\{group}\{dataId}

FailoverFile文件的判断主要是在ClientWorker#checkLocalConfig方法,看了几篇文章,都说这个方法的校验是为了在服务端挂了的时候,可以直接从客户端获取文件,这明显是不对的。刚刚在上面也提到过,FailoverFile的主要作用是容灾,而且这个文件在客户端不会自动生成,想要使用这个功能必须手动添加

代码语言:javascript
复制
private void checkLocalConfig(CacheData cacheData) {
    final String dataId = cacheData.dataId;
    final String group = cacheData.group;
    final String tenant = cacheData.tenant;
    // 注意这个,容灾文件,这个文件在客户端是不会自动生成的
    File path = LocalConfigInfoProcessor.getFailoverFile(agent.getName(), dataId, group, tenant);

    // 没有 -> 有
    if (!cacheData.isUseLocalConfigInfo() && path.exists()) {
        String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
        String md5 = MD5.getInstance().getMD5String(content);
        cacheData.setUseLocalConfigInfo(true);
        cacheData.setLocalConfigInfoVersion(path.lastModified());
        cacheData.setContent(content);

        LOGGER.warn("[{}] [failover-change] failover file created. dataId={}, group={}, tenant={}, md5={}, content={}",
            agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content));
        return;
    }

    // 有 -> 没有。不通知业务监听器,从server拿到配置后通知。
    if (cacheData.isUseLocalConfigInfo() && !path.exists()) {
        cacheData.setUseLocalConfigInfo(false);
        LOGGER.warn("[{}] [failover-change] failover file deleted. dataId={}, group={}, tenant={}", agent.getName(),
            dataId, group, tenant);
        return;
    }

    // 有变更
    /**
     * isUseLocalConfig=true && && path.exists() && cacheData.getLocalConfigInfoVersion() != path.lastModified()
     * 说明配置有更新,所以此时需要更新 CacheData
     */
    if (cacheData.isUseLocalConfigInfo() && path.exists()
        && cacheData.getLocalConfigInfoVersion() != path.lastModified()) {
        String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
        String md5 = MD5.getInstance().getMD5String(content);
        cacheData.setUseLocalConfigInfo(true);
        cacheData.setLocalConfigInfoVersion(path.lastModified());
        cacheData.setContent(content);
        LOGGER.warn("[{}] [failover-change] failover file changed. dataId={}, group={}, tenant={}, md5={}, content={}",
            agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content));
    }
}

有关于获取配置的优化

从上面已经知道,有一个线程池会不停的执行LongPollingRunnable#run方法,这个方法主要作用就是从服务端拉取最新的配置信息,如果直接按照正常的做法来做,直接根据dataId group tenant去拉取就好了,如果每次都直接去服务器来配置信息,但这样会有一些性能问题:

  1. 配置信息变动的可能性很小,如果每次都需要全量去拉取,拉取的信息基本都是一样的,这很浪费资源;
  2. 如果从服务端拉取数据的频率太高,会太耗性能;如果拉取的频率太低,数据发生变更之后客户端响应不及时;

针对上面几个问题,Nacos做了以下几个优化

  1. 只拉取改动过的配置信息:客户端先通过一个HTTP请求发送一个key列表给服务端,服务端返回发生了变更的Key列表,大部分时候,这可以过滤掉绝大部分没有配置项;
  2. 通过HTTP长轮询减少少客户端和服务端的交互频率,但这必然要面对一个数据响应不实时问问题,怎么解决?

配置实时更新

先推荐一篇文章:Nacos配置实时更新原理分析 这篇文章已经写的非常详细了,不过那篇文章有点长,这里总结一下,为了自己以后看的时候方便。

通过HTTP长轮询较少客户端和服务端的交互频率,但这必然要面对一个数据响应不实时问问题,怎么解决?

在客户端向服务端拉取配置信息之前,需要先向服务端发送一个配置Key列表,然后服务端返回一个发生了变更的配置Key列表

ClientWorker#checkUpdateConfigStr

代码语言:javascript
复制
// timeout默认30s超时
HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(), timeout);

请求的服务端地址: /v1/cs/configs/listener

ConfigController#listener

代码语言:javascript
复制
@RequestMapping(value = "/listener", method = RequestMethod.POST)
public void listener(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
    ....
    // do long-polling
    inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
}

// ConfigServletInner#doPollingConfig ,暂且只关注长轮询

代码语言:javascript
复制
// 长轮询
if (LongPollingService.isSupportLongPolling(request)) {
    longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
    return HttpServletResponse.SC_OK + "";
}

LongPollingService#addLongPollingClient

代码语言:javascript
复制
public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map, int probeRequestSize) {
    int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
    /**
     * 提前500ms返回响应,为避免客户端超时 add delay time for LoadBalance
     */
    long timeout = Math.max(10000, Long.parseLong(str) - delayTime);

    String ip = RequestUtil.getRemoteIp(req);
    // 一定要由HTTP线程调用,否则离开后容器会立即发送响应, 开启Servlet异步支持
    final AsyncContext asyncContext = req.startAsync();
    // AsyncContext.setTimeout()的超时时间不准,所以只能自己控制
    asyncContext.setTimeout(0L);

    // 向线程池提交了一个任务,所以核心逻辑在 ClientLongPolling 中,
    scheduler.execute(new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
}

ClientLongPolling#run

代码语言:javascript
复制
public void run() {
    asyncTimeoutFuture = scheduler.schedule(new Runnable() {
        @Override
        public void run() {
            try {
                getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
                /**
                 * 删除订阅关系
                 */
                allSubs.remove(ClientLongPolling.this);

                if (isFixedPolling()) {
                    // 根据客户端传过来的key列表,找到该发生了变更的配置的key列表
                    List<String> changedGroups = MD5Util.compareMd5((HttpServletRequest)asyncContext.getRequest(), (HttpServletResponse)asyncContext.getResponse(), clientMd5Map);
                    if (changedGroups.size() > 0) {
                        // 返回给客户端
                        sendResponse(changedGroups);
                    } else {
                        // 没有就返回空
                        sendResponse(null);
                    }
                } else {
                    sendResponse(null);
                }
            } catch (Throwable t) {
                LogUtil.defaultLog.error("long polling error:" + t.getMessage(), t.getCause());
            }

        }

    }, timeoutTime, TimeUnit.MILLISECONDS);

    allSubs.add(this);
}
  1. 在run方法里面又创建了一个任务,延迟 29.5s 后执行
  2. ClientLongPolling添加到allSubs
  3. 延迟时间到了之后,执行任务,先将ClientLongPollingallSubs中移除,然后通过AsyncContext将结果写回客户端

allSubs数据结构如下,可以把allSubs当作是一个订阅者列表,当配置发生变成的时候,会发布一个事件,然后这些订阅者会得到相应,然后再执行相应的功能

代码语言:javascript
复制
Queue<ClientLongPolling> allSubs = new ConcurrentLinkedQueue<ClientLongPolling>();

修改配置

延迟 29.5s 执行,要是在这段时间内,数据发生了变更怎么办,难道要客户端 29.5s 之后才知道吗?肯定不是的。可以看看当数据发生变更时,会涉及到什么接口

ConfigController#publishConfig

请求的服务端地址: /v1/cs/configs

代码语言:javascript
复制
// 持久化
persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false);
// 触发事件
EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));

EventDispatcher#fireEvent

代码语言:javascript
复制
static public void fireEvent(Event event) {
    for (AbstractEventListener listener : getEntry(event.getClass()).listeners) {
        try {
            listener.onEvent(event);
        } catch (Exception e) {
            log.error(e.toString(), e);
        }
    }
}

static public abstract class AbstractEventListener {
    public AbstractEventListener() {
        EventDispatcher.addEventListener(this);
    }
}

// EventDispatcher#addEventListener
static public void addEventListener(AbstractEventListener listener) {
    for (Class<? extends Event> type : listener.interest()) {
        getEntry(type).listeners.addIfAbsent(listener);
    }
}

注意观察AbstractEventListener的构造函数,在其构造函数中涉及到Listener的注册过程,而AbstractEventListener有以下几个子类:

代码语言:javascript
复制
AsyncNotifyService  ->   ConfigDataChangeEvent
LongPollingService ->    LocalDataChangeEvent
MockListener 计数用的

刚刚看上面发布的是一个ConfigDataChangeEvent事件,所以会先执行AsyncNotifyService#onEvent方法。该方法中会先获取服务端所有IP列表,依次通过Http通知对象/v1/cs/communication/dataChange?dataId=xx&group=xx,接收到请求后, 会dump出来所有config信息,同时回调LocalDataChangeEvent事件,然后执行LongPollingService#onEvent方法。

LongPollingService#onEvent

代码语言:javascript
复制
public void onEvent(Event event) {
    if (isFixedPolling()) {
    } else {
        if (event instanceof LocalDataChangeEvent) {
            LocalDataChangeEvent evt = (LocalDataChangeEvent)event;
            scheduler.execute(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));
        }
    }
}

提交了一个任务,关键逻辑在DataChangeTask#run方法中

代码语言:javascript
复制
public void run() {
    try {
        ConfigService.getContentBetaMd5(groupKey);
        for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
            // ClientLongPolling 作为订阅者,配置类似于Topic,更新配置就是发布了一个事件

            ClientLongPolling clientSub = iter.next();
            
            // 找到订阅了 当前发生了变更配置项 的ClientLongPolling
            if (clientSub.clientMd5Map.containsKey(groupKey)) {
                getRetainIps().put(clientSub.ip, System.currentTimeMillis());

                // 删除订阅关系
                iter.remove(); 

                // 向客户端回写数据
                clientSub.sendResponse(Arrays.asList(groupKey));
            }
        }
    } catch (Throwable t) {
        LogUtil.defaultLog.error("data change error:" + t.getMessage(), t.getCause());
    }
}
  1. 找到订阅了 当前发生了变更配置项 的ClientLongPolling
  2. 向客户端回写数据

这里会有一个问题,如果DataChangeTask任务完成了向客户端写数据,此时ClientLongPolling中的调度任务又开始执行了怎么办呢?这时任务都被移除了,肯定会报错啊

很简单,只要在进行"推送"操作之前,先将原来等待执行的调度任务取消掉就可以了,如下:

代码语言:javascript
复制
void sendResponse(List<String> changedGroups) {
    /**
     *  取消超时任务
     */
    if (null != asyncTimeoutFuture) {
        asyncTimeoutFuture.cancel(false);
    }
    generateResponse(changedGroups);
}

这样,就达到了一个类似于数据"推送"的效果,如果一直没有更新,客户端等待时间接近 30s,如果在等待期间有数据发生变更,几乎可以实时的返回给客户端

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 使用示例
  • NacosConfigService
    • 创建ConfigService
      • ServerHttpAgent
        • ClientWorker
          • LongPollingRunnable
          • 配置监听器
            • 添加监听
              • 触发监听
              • 创建配置
              • 获取配置
              • 删除配置
              • 本地文件
                • SnapshotFile
                  • FailoverFile
                    • 有关于获取配置的优化
                    • 配置实时更新
                      • ClientWorker#checkUpdateConfigStr
                        • ConfigController#listener
                          • LongPollingService#addLongPollingClient
                            • ClientLongPolling#run
                              • 修改配置
                                • ConfigController#publishConfig
                                  • EventDispatcher#fireEvent
                                    • LongPollingService#onEvent
                                    相关产品与服务
                                    对象存储
                                    对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
                                    领券
                                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档