专栏首页码匠的流水账聊聊nacos ServiceManager的UpdatedServiceProcessor
原创

聊聊nacos ServiceManager的UpdatedServiceProcessor

本文主要研究一下nacos ServiceManager的UpdatedServiceProcessor

ServiceManager.init

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java

@Component
@DependsOn("nacosApplicationContext")
public class ServiceManager implements RecordListener<Service> {
​
    /**
     * Map<namespace, Map<group::serviceName, Service>>
     */
    private Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
​
    private LinkedBlockingDeque<ServiceKey> toBeUpdatedServicesQueue = new LinkedBlockingDeque<>(1024 * 1024);
​
    private Synchronizer synchronizer = new ServiceStatusSynchronizer();
​
    private final Lock lock = new ReentrantLock();
​
    @Resource(name = "consistencyDelegate")
    private ConsistencyService consistencyService;
​
    @Autowired
    private SwitchDomain switchDomain;
​
    @Autowired
    private DistroMapper distroMapper;
​
    @Autowired
    private ServerListManager serverListManager;
​
    @Autowired
    private PushService pushService;
​
    private final Object putServiceLock = new Object();
​
    @PostConstruct
    public void init() {
​
        UtilsAndCommons.SERVICE_SYNCHRONIZATION_EXECUTOR.schedule(new ServiceReporter(), 60000, TimeUnit.MILLISECONDS);
​
        UtilsAndCommons.SERVICE_UPDATE_EXECUTOR.submit(new UpdatedServiceProcessor());
​
        try {
            Loggers.SRV_LOG.info("listen for service meta change");
            consistencyService.listen(KeyBuilder.SERVICE_META_KEY_PREFIX, this);
        } catch (NacosException e) {
            Loggers.SRV_LOG.error("listen for service meta change failed!");
        }
    }
​
    //......
}
  • ServiceManager的init方法往UtilsAndCommons.SERVICE_UPDATE_EXECUTOR提交了UpdatedServiceProcessor任务

UpdatedServiceProcessor

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java

    private class UpdatedServiceProcessor implements Runnable {
        //get changed service from other server asynchronously
        @Override
        public void run() {
            ServiceKey serviceKey = null;
​
            try {
                while (true) {
                    try {
                        serviceKey = toBeUpdatedServicesQueue.take();
                    } catch (Exception e) {
                        Loggers.EVT_LOG.error("[UPDATE-DOMAIN] Exception while taking item from LinkedBlockingDeque.");
                    }
​
                    if (serviceKey == null) {
                        continue;
                    }
                    GlobalExecutor.submitServiceUpdate(new ServiceUpdater(serviceKey));
                }
            } catch (Exception e) {
                Loggers.EVT_LOG.error("[UPDATE-DOMAIN] Exception while update service: {}", serviceKey, e);
            }
        }
    }
  • UpdatedServiceProcessor实现了Runnable方法,其run方法会不断循环从toBeUpdatedServicesQueue获取元素,然后使用GlobalExecutor.submitServiceUpdate提交ServiceUpdater

ServiceUpdater

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java

    private class ServiceUpdater implements Runnable {
​
        String namespaceId;
        String serviceName;
        String serverIP;
​
        public ServiceUpdater(ServiceKey serviceKey) {
            this.namespaceId = serviceKey.getNamespaceId();
            this.serviceName = serviceKey.getServiceName();
            this.serverIP = serviceKey.getServerIP();
        }
​
        @Override
        public void run() {
            try {
                updatedHealthStatus(namespaceId, serviceName, serverIP);
            } catch (Exception e) {
                Loggers.SRV_LOG.warn("[DOMAIN-UPDATER] Exception while update service: {} from {}, error: {}",
                    serviceName, serverIP, e);
            }
        }
    }
  • ServiceUpdater实现了Runnable接口,其run方法执行的是updatedHealthStatus

ServiceManager.updatedHealthStatus

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java

@Component
@DependsOn("nacosApplicationContext")
public class ServiceManager implements RecordListener<Service> {
​
    /**
     * Map<namespace, Map<group::serviceName, Service>>
     */
    private Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
​
    private LinkedBlockingDeque<ServiceKey> toBeUpdatedServicesQueue = new LinkedBlockingDeque<>(1024 * 1024);
​
    private Synchronizer synchronizer = new ServiceStatusSynchronizer();
​
    private final Lock lock = new ReentrantLock();
​
    @Resource(name = "consistencyDelegate")
    private ConsistencyService consistencyService;
​
    @Autowired
    private SwitchDomain switchDomain;
​
    @Autowired
    private DistroMapper distroMapper;
​
    @Autowired
    private ServerListManager serverListManager;
​
    @Autowired
    private PushService pushService;
​
    private final Object putServiceLock = new Object();
​
    //......
​
    public void updatedHealthStatus(String namespaceId, String serviceName, String serverIP) {
        Message msg = synchronizer.get(serverIP, UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
        JSONObject serviceJson = JSON.parseObject(msg.getData());
​
        JSONArray ipList = serviceJson.getJSONArray("ips");
        Map<String, String> ipsMap = new HashMap<>(ipList.size());
        for (int i = 0; i < ipList.size(); i++) {
​
            String ip = ipList.getString(i);
            String[] strings = ip.split("_");
            ipsMap.put(strings[0], strings[1]);
        }
​
        Service service = getService(namespaceId, serviceName);
​
        if (service == null) {
            return;
        }
​
        boolean changed = false;
​
        List<Instance> instances = service.allIPs();
        for (Instance instance : instances) {
​
            boolean valid = Boolean.parseBoolean(ipsMap.get(instance.toIPAddr()));
            if (valid != instance.isHealthy()) {
                changed = true;
                instance.setHealthy(valid);
                Loggers.EVT_LOG.info("{} {SYNC} IP-{} : {}@{}{}",
                    serviceName, (instance.isHealthy() ? "ENABLED" : "DISABLED"),
                    instance.getIp(), instance.getPort(), instance.getClusterName());
            }
        }
​
        if (changed) {
            pushService.serviceChanged(service);
        }
​
        StringBuilder stringBuilder = new StringBuilder();
        List<Instance> allIps = service.allIPs();
        for (Instance instance : allIps) {
            stringBuilder.append(instance.toIPAddr()).append("_").append(instance.isHealthy()).append(",");
        }
​
        if (changed && Loggers.EVT_LOG.isDebugEnabled()) {
            Loggers.EVT_LOG.debug("[HEALTH-STATUS-UPDATED] namespace: {}, service: {}, ips: {}",
                service.getNamespaceId(), service.getName(), stringBuilder.toString());
        }
​
    }
​
    //......
}
  • updatedHealthStatus方法会从synchronizer获取msg,组装ipsMap,之后通过service.allIPs()获取instances信息,然后遍历instances从ipsMap获取实例的valid状态,如果与instance的isHealthy()对不上则标记为changed,更新instance的healthy;对于changed的则通过pushService.serviceChanged(service)发布事件,最后打印日志

小结

  • ServiceManager的init方法往UtilsAndCommons.SERVICE_UPDATE_EXECUTOR提交了UpdatedServiceProcessor任务
  • UpdatedServiceProcessor实现了Runnable方法,其run方法会不断循环从toBeUpdatedServicesQueue获取元素,然后使用GlobalExecutor.submitServiceUpdate提交ServiceUpdater
  • ServiceUpdater实现了Runnable接口,其run方法执行的是updatedHealthStatus

doc

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 聊聊nacos ServiceManager的UpdatedServiceProcessor

    本文主要研究一下nacos ServiceManager的UpdatedServiceProcessor

    codecraft
  • 聊聊nacos的ServiceReporter

    nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.ja...

    codecraft
  • 聊聊dubbo的AccessLogFilter

    dubbo-2.7.3/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/Ac...

    codecraft
  • 聊聊nacos ServiceManager的UpdatedServiceProcessor

    本文主要研究一下nacos ServiceManager的UpdatedServiceProcessor

    codecraft
  • 聊聊nacos的ServiceReporter

    nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.ja...

    codecraft
  • 理解 Python 的 LEGB

    Python 的名字空间是 Python 一个非常核心的内容。 其他语言中如 C 中,变量名是内存地址的别名,而在 Python 中,名字是一个字符串对象,它...

    py3study
  • RxSwift介绍(二)——Observable

    Observable<T>类是RxSwift框架的基础。其作用就像是一条流水线,让观察者可以实时获取对所有可观察对象所触发的事件,也就是说以此来实现对UI的实时...

    我只不过是出来写写代码
  • SpringBoot使用Nacos进行服务注册发现与配置管理

    最近由于业务发展,需要调研一套完善和主流的基础架构,进行中台化(微服务)的实施,考虑到技术栈切换到SOFAStack。既然整个体系都切换到蚂蚁金服的技术栈,那么...

    Throwable
  • AI在抗击疫情中的作用

    这几十年以来,肆虐全球的新冠病毒可以说是最具感染性了。正如我们目前所见,新冠病毒已经给全世界造成了极大的破坏,因此,借助科技的力量抵抗疫情也变得十分必要。AI就...

    人工智能小咖
  • 【AI 引擎】席宁:机器人进军新药领域 | AI将设计制作第一辆车 | AI在星际争霸中与一流人类选手比赛|最大的年度机器人大赛

    1.悉尼机器人杯将会成为最大的年度机器人大赛 ? 2019年6月,来自50个国家2000位研究人员和工作者将会聚集在悉尼,竞争澳大利亚世界级年度机器人大赛——R...

    新智元

扫码关注云+社区

领取腾讯云代金券