前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Nacos 配置中心源码分析

Nacos 配置中心源码分析

作者头像
没有故事的陈师傅
发布2021-08-13 15:46:01
9500
发布2021-08-13 15:46:01
举报
文章被收录于专栏:运维开发故事运维开发故事

本文主要和大家一起以源码的角度来分析 Nacos 配置中心的配置信息获取,以及配置信息动态同步的过程和原理。环境介绍和使用 环境介绍:

  • Jdk 1.8
  • nacos-server-1.4.2
  • spring-boot-2.3.5.RELEASE
  • spring-cloud-Hoxton.SR8
  • spring-cloiud-alibab-2.2.5.RELEASE

如果我们需要使用 Nacos 作为配置中心,我们首先需要导入 Nacos Config 的依赖信息,如下所示:

代码语言:javascript
复制
<dependency>
  <groupId>com.alibaba.cloud</groupId>
  <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>

然后再 bootstartp.yml 文件中配置 Nacos 服务信息。

代码语言:javascript
复制
spring:
  cloud:
    nacos:
      config:
        server-addr: 127.0.0.1:8848

客户端初始化

主要是通过 NacosConfigBootstrapConfiguration 类来进行初始化 NacosConfigManager 、NacosPropertySourceLocator

代码语言:javascript
复制
@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(name = "spring.cloud.nacos.config.enabled", matchIfMissing = true)
public class NacosConfigBootstrapConfiguration {

 @Bean
 @ConditionalOnMissingBean
 public NacosConfigManager nacosConfigManager(
   NacosConfigProperties nacosConfigProperties) {
  return new NacosConfigManager(nacosConfigProperties);
 }
    
    @Bean
 public NacosPropertySourceLocator nacosPropertySourceLocator(
   NacosConfigManager nacosConfigManager) {
  return new NacosPropertySourceLocator(nacosConfigManager);
 }
    // ...
}

在 NacosConfigManager 的构造方法中会调用 createConfigService 方法来创建 ConfigService 实例,内部调用工厂方法 ConfigFactory#createConfigService 通过反射实例化一个com.alibaba.nacos.client.config.NacosConfigService 的实例对象。

代码语言:javascript
复制
public static ConfigService createConfigService(Properties properties) throws NacosException {
    try {
        Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");
        Constructor constructor = driverImplClass.getConstructor(Properties.class);
        ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties);
        return vendorImpl;
    } catch (Throwable e) {
        throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
    }
}

NacosPropertySourceLocator 继承 PropertySourceLocator(PropertySourceLocator接口支持扩展自定义配置加载到 Spring Environment中)通过 locate 加载配置信息。

代码语言:javascript
复制
 @Override
 public PropertySource<?> locate(Environment env) {
  nacosConfigProperties.setEnvironment(env);
  ConfigService configService = nacosConfigManager.getConfigService();

  if (null == configService) {
   log.warn("no instance of config service found, can't load config from nacos");
   return null;
  }
  long timeout = nacosConfigProperties.getTimeout();
  nacosPropertySourceBuilder = new NacosPropertySourceBuilder(configService,
    timeout);
  String name = nacosConfigProperties.getName();

  String dataIdPrefix = nacosConfigProperties.getPrefix();
  if (StringUtils.isEmpty(dataIdPrefix)) {
   dataIdPrefix = name;
  }

  if (StringUtils.isEmpty(dataIdPrefix)) {
   dataIdPrefix = env.getProperty("spring.application.name");
  }

  CompositePropertySource composite = new CompositePropertySource(
    NACOS_PROPERTY_SOURCE_NAME);

        // 共享配置
  loadSharedConfiguration(composite);
  // 拓展配置
        loadExtConfiguration(composite);
  // 应用配置
        loadApplicationConfiguration(composite, dataIdPrefix, nacosConfigProperties, env);
  return composite;
 }

配置读取过程

配置加载有三个方法 loadSharedConfiguration、loadSharedConfiguration、 loadApplicationConfiguration 以 loadApplicationConfiguration 继续跟进。

代码语言:javascript
复制
private void loadApplicationConfiguration(
    CompositePropertySource compositePropertySource, String dataIdPrefix,
    NacosConfigProperties properties, Environment environment) {
    String fileExtension = properties.getFileExtension();
    String nacosGroup = properties.getGroup();
    // load directly once by default
    loadNacosDataIfPresent(compositePropertySource, dataIdPrefix, nacosGroup,
                           fileExtension, true);
    // load with suffix, which have a higher priority than the default
    loadNacosDataIfPresent(compositePropertySource,
                           dataIdPrefix + DOT + fileExtension, nacosGroup, fileExtension, true);
    // Loaded with profile, which have a higher priority than the suffix
    for (String profile : environment.getActiveProfiles()) {
        String dataId = dataIdPrefix + SEP1 + profile + DOT + fileExtension;
        loadNacosDataIfPresent(compositePropertySource, dataId, nacosGroup,
                               fileExtension, true);
    }

}

主要通过 loadNacosDataIfPresent 读取配置信息, 其实我们可以通过参数看出,主要配置文件包含以下部分:dataId, group, fileExtension

代码语言:javascript
复制
private void loadNacosDataIfPresent(final CompositePropertySource composite,
                                    final String dataId, final String group, String fileExtension,
                                    boolean isRefreshable) {
    if (null == dataId || dataId.trim().length() < 1) {
        return;
    }
    if (null == group || group.trim().length() < 1) {
        return;
    }
    NacosPropertySource propertySource = this.loadNacosPropertySource(dataId, group,
                                                                      fileExtension, isRefreshable);
    this.addFirstPropertySource(composite, propertySource, false);
}

然后调用 loadNacosPropertySource 最后一步步的会调用到 NacosConfigService#getConfigInner

代码语言:javascript
复制
private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {
        group = null2defaultGroup(group);
        ParamUtils.checkKeyParam(dataId, group);
        ConfigResponse cr = new ConfigResponse();
        
        cr.setDataId(dataId);
        cr.setTenant(tenant);
        cr.setGroup(group);
        
        // 优先使用本地配置
        String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
        if (content != null) {
            LOGGER.warn("[{}] [get-config] get failover ok, dataId={}, group={}, tenant={}, config={}", agent.getName(),
                    dataId, group, tenant, ContentUtils.truncateContent(content));
            cr.setContent(content);
            configFilterChainManager.doFilter(null, cr);
            content = cr.getContent();
            return content;
        }
        
        try {
            // 获取远程配置
            String[] ct = worker.getServerConfig(dataId, group, tenant, timeoutMs);
            cr.setContent(ct[0]);
            
            configFilterChainManager.doFilter(null, cr);
            content = cr.getContent();
            
            return content;
        } catch (NacosException ioe) {
            if (NacosException.NO_RIGHT == ioe.getErrCode()) {
                throw ioe;
            }
            LOGGER.warn("[{}] [get-config] get from server error, dataId={}, group={}, tenant={}, msg={}",
                    agent.getName(), dataId, group, tenant, ioe.toString());
        }
        
        LOGGER.warn("[{}] [get-config] get snapshot ok, dataId={}, group={}, tenant={}, config={}", agent.getName(),
                dataId, group, tenant, ContentUtils.truncateContent(content));
        content = LocalConfigInfoProcessor.getSnapshot(agent.getName(), dataId, group, tenant);
        cr.setContent(content);
        configFilterChainManager.doFilter(null, cr);
        content = cr.getContent();
        return content;
    }

加载远程配置

worker.getServerConfig 主要是获取远程配置, ClIentWorker 的 getServerConfig 定义如下:

代码语言:javascript
复制
public String[] getServerConfig(String dataId, String group, String tenant, long readTimeout)
    throws NacosException {
    String[] ct = new String[2];
    if (StringUtils.isBlank(group)) {
        group = Constants.DEFAULT_GROUP;
    }

    HttpRestResult<String> result = null;
    try {
        Map<String, String> params = new HashMap<String, String>(3);
        if (StringUtils.isBlank(tenant)) {
            params.put("dataId", dataId);
            params.put("group", group);
        } else {
            params.put("dataId", dataId);
            params.put("group", group);
            params.put("tenant", tenant);
        }
        result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout);
    } catch (Exception ex) {
        String message = String
            .format("[%s] [sub-server] get server config exception, dataId=%s, group=%s, tenant=%s",
                    agent.getName(), dataId, group, tenant);
        LOGGER.error(message, ex);
        throw new NacosException(NacosException.SERVER_ERROR, ex);
    }

    switch (result.getCode()) {
        case HttpURLConnection.HTTP_OK:
            LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.getData());
            ct[0] = result.getData();
            if (result.getHeader().getValue(CONFIG_TYPE) != null) {
                ct[1] = result.getHeader().getValue(CONFIG_TYPE);
            } else {
                ct[1] = ConfigType.TEXT.getType();
            }
            return ct;
        case HttpURLConnection.HTTP_NOT_FOUND:
            LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, null);
            return ct;
        case HttpURLConnection.HTTP_CONFLICT: {
            LOGGER.error(
                "[{}] [sub-server-error] get server config being modified concurrently, dataId={}, group={}, "
                + "tenant={}", agent.getName(), dataId, group, tenant);
            throw new NacosException(NacosException.CONFLICT,
                                     "data being modified, dataId=" + dataId + ",group=" + group + ",tenant=" + tenant);
        }
        case HttpURLConnection.HTTP_FORBIDDEN: {
            LOGGER.error("[{}] [sub-server-error] no right, dataId={}, group={}, tenant={}", agent.getName(),
                         dataId, group, tenant);
            throw new NacosException(result.getCode(), result.getMessage());
        }
        default: {
            LOGGER.error("[{}] [sub-server-error]  dataId={}, group={}, tenant={}, code={}", agent.getName(),
                         dataId, group, tenant, result.getCode());
            throw new NacosException(result.getCode(),
                                     "http error, code=" + result.getCode() + ",dataId=" + dataId + ",group=" + group + ",tenant="
                                     + tenant);
        }
    }
}

agent 默认使用 MetricsHttpAgent 实现类

配置同步过程

Nacos 配置同步过程如下图所示:

客户端请求

客户端初始请求配置完成后,会通过 WorkClient 进行长轮询查询配置, 它的构造方法如下:

代码语言:javascript
复制
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager,
            final Properties properties) {
        this.agent = agent;
        this.configFilterChainManager = configFilterChainManager;
        
        // Initialize the timeout parameter
        
        init(properties);

        // 检查线程池
        this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
                t.setDaemon(true);
                return t;
            }
        });
                
        // 长轮询线程
        this.executorService = Executors
                .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread t = new Thread(r);
                        t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
                        t.setDaemon(true);
                        return t;
                    }
                });
        
        this.executor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                try {
                    checkConfigInfo();
                } catch (Throwable e) {
                    LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
                }
            }
        }, 1L, 10L, TimeUnit.MILLISECONDS);
    }

这里初始化了两个线程池:

  • 第一个线程池主要是用来初始化做长轮询的;
  • 第二个线程池使用来做检查的,会每间隔 10 秒钟执行一次检查方法 checkConfigInfo

checkConfigInfo

在这个方法里面主要是分配任务,给每个 task 分配一个 taskId , 后面会去检查本地配置和远程配置,最终调用的是 LongPollingRunable 的 run 方法。

代码语言:javascript
复制
public void checkConfigInfo() {
    // Dispatch taskes.
    int listenerSize = cacheMap.size();
    // Round up the longingTaskCount.
    int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
    if (longingTaskCount > currentLongingTaskCount) {
        for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
            // The task list is no order.So it maybe has issues when changing.
            executorService.execute(new LongPollingRunnable(i));
        }
        currentLongingTaskCount = longingTaskCount;
    }
}

LongPollingRunnable

长轮询线程实现,首先第一步检查本地配置信息,然后通过 dataId 去检查服务端是否有变动的配置信息,如果有就更新下来然后刷新配置。

代码语言:javascript
复制
public void run() {

        List<CacheData> cacheDatas = new ArrayList<CacheData>();
        List<String> inInitializingCacheList = new ArrayList<String>();
        try {
            // check failover config
            for (CacheData cacheData : cacheMap.values()) {
                if (cacheData.getTaskId() == taskId) {
                    cacheDatas.add(cacheData);
                    try {
                        checkLocalConfig(cacheData);
                        if (cacheData.isUseLocalConfigInfo()) {
                            // 触发回调
                            cacheData.checkListenerMd5();
                        }
                    } catch (Exception e) {
                        LOGGER.error("get local config info error", e);
                    }
                }
            }

            // check server config
            List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
            if (!CollectionUtils.isEmpty(changedGroupKeys)) {
                LOGGER.info("get changedGroupKeys:" + changedGroupKeys);
            }

            for (String groupKey : changedGroupKeys) {
                String[] key = GroupKey.parseKey(groupKey);
                String dataId = key[0];
                String group = key[1];
                String tenant = null;
                if (key.length == 3) {
                    tenant = key[2];
                }
                try {
                    String[] ct = getServerConfig(dataId, group, tenant, 3000L);
                    CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant));
                    cache.setContent(ct[0]);
                    if (null != ct[1]) {
                        cache.setType(ct[1]);
                    }
                    LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",
                                agent.getName(), dataId, group, tenant, cache.getMd5(),
                                ContentUtils.truncateContent(ct[0]), ct[1]);
                } catch (NacosException ioe) {
                    String message = String
                        .format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
                                agent.getName(), dataId, group, tenant);
                    LOGGER.error(message, ioe);
                }
            }
            for (CacheData cacheData : cacheDatas) {
                if (!cacheData.isInitializing() || inInitializingCacheList
                    .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
                    cacheData.checkListenerMd5();
                    cacheData.setInitializing(false);
                }
            }
            inInitializingCacheList.clear();

            executorService.execute(this);

        } catch (Throwable e) {

            // If the rotation training task is abnormal, the next execution time of the task will be punished
            LOGGER.error("longPolling error : ", e);
            executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
        }
    }
}

addTenantListeners

添加监听,这里主要是通过 dataId , group 来获取 cache 本地缓存的配置信息,然后再将 Listener 也传给 cache 统一管理。

代码语言:javascript
复制
public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners)
        throws NacosException {
    group = null2defaultGroup(group);
    String tenant = agent.getTenant();
    CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);
    for (Listener listener : listeners) {
        cache.addListener(listener);
    }
}

回调触发

如果 md5 值发生变化过后就会调用 safeNotifyListener 方法然后将配置信息发送给对应的监听器

代码语言:javascript
复制
void checkListenerMd5() {
    for (ManagerListenerWrap wrap : listeners) {
        if (!md5.equals(wrap.lastCallMd5)) {
            safeNotifyListener(dataId, group, content, type, md5, wrap);
        }
    }
}

服务端响应

当服务端收到请求后,会 hold 住当前请求,如果有变化就返回,如果没有变化就等待超时之前返回无变化。

代码语言:javascript
复制
/**
 * The client listens for configuration changes.
 */
@PostMapping("/listener")
@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
public void listener(HttpServletRequest request, HttpServletResponse response)
    throws ServletException, IOException {
    request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
    String probeModify = request.getParameter("Listening-Configs");
    if (StringUtils.isBlank(probeModify)) {
        throw new IllegalArgumentException("invalid probeModify");
    }

    probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);

    Map<String, String> clientMd5Map;
    try {
        clientMd5Map = MD5Util.getClientMd5Map(probeModify);
    } catch (Throwable e) {
        throw new IllegalArgumentException("invalid probeModify");
    }

    // do long-polling
    inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
}

LongPollingService

核心处理类 LongPollingService

代码语言:javascript
复制
  /**
     * Add LongPollingClient.
     *
     * @param req              HttpServletRequest.
     * @param rsp              HttpServletResponse.
     * @param clientMd5Map     clientMd5Map.
     * @param probeRequestSize probeRequestSize.
     */
    public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
            int probeRequestSize) {
        
        String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
        String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
        String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
        String tag = req.getHeader("Vipserver-Tag");
        int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
        
        // Add delay time for LoadBalance, and one response is returned 500 ms in advance to avoid client timeout.
        long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
        if (isFixedPolling()) {
            timeout = Math.max(10000, getFixedPollingInterval());
            // Do nothing but set fix polling timeout.
        } else {
            long start = System.currentTimeMillis();
            List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
            if (changedGroups.size() > 0) {
                generateResponse(req, rsp, changedGroups);
                LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "instant",
                        RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
                        changedGroups.size());
                return;
            } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
                LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",
                        RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
                        changedGroups.size());
                return;
            }
        }
        String ip = RequestUtil.getRemoteIp(req);
        
        // Must be called by http thread, or send response.
        final AsyncContext asyncContext = req.startAsync();
        
        // AsyncContext.setTimeout() is incorrect, Control by oneself
        asyncContext.setTimeout(0L);
        
        ConfigExecutor.executeLongPolling(
                new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
    }

参考链接

  • https://blog.csdn.net/jason_jiahongfei/article/details/108373442
  • https://www.cnblogs.com/lockedsher/articles/14447700.html

行数:507

字数:2262

主题:默认主题

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-08-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 运维开发故事 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 客户端初始化
    • 配置读取过程
      • 加载远程配置
      • 配置同步过程
        • 客户端请求
          • checkConfigInfo
          • LongPollingRunnable
          • addTenantListeners
          • 回调触发
        • 服务端响应
          • LongPollingService
      • 参考链接
      相关产品与服务
      对象存储
      对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档