专栏首页码匠的流水账聊聊nacos的ServiceReporter
原创

聊聊nacos的ServiceReporter

本文主要研究一下nacos的ServiceReporter

ServiceManager.init

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java

@Component
@DependsOn("nacosApplicationContext")
public class ServiceManager implements RecordListener<Service> {
​
    /**
     * Map<namespace, Map<group::serviceName, Service>>
     */
    private Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
​
    private LinkedBlockingDeque<ServiceKey> toBeUpdatedServicesQueue = new LinkedBlockingDeque<>(1024 * 1024);
​
    private Synchronizer synchronizer = new ServiceStatusSynchronizer();
​
    private final Lock lock = new ReentrantLock();
​
    @Resource(name = "consistencyDelegate")
    private ConsistencyService consistencyService;
​
    @Autowired
    private SwitchDomain switchDomain;
​
    @Autowired
    private DistroMapper distroMapper;
​
    @Autowired
    private ServerListManager serverListManager;
​
    @Autowired
    private PushService pushService;
​
    private final Object putServiceLock = new Object();
​
    @PostConstruct
    public void init() {
​
        UtilsAndCommons.SERVICE_SYNCHRONIZATION_EXECUTOR.schedule(new ServiceReporter(), 60000, TimeUnit.MILLISECONDS);
​
        UtilsAndCommons.SERVICE_UPDATE_EXECUTOR.submit(new UpdatedServiceProcessor());
​
        try {
            Loggers.SRV_LOG.info("listen for service meta change");
            consistencyService.listen(KeyBuilder.SERVICE_META_KEY_PREFIX, this);
        } catch (NacosException e) {
            Loggers.SRV_LOG.error("listen for service meta change failed!");
        }
    }
​
    //......
}
  • ServiceManager的init方法往UtilsAndCommons.SERVICE_SYNCHRONIZATION_EXECUTOR注册了ServiceReporter

ServiceReporter

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java

    private class ServiceReporter implements Runnable {
​
        @Override
        public void run() {
            try {
​
                Map<String, Set<String>> allServiceNames = getAllServiceNames();
​
                if (allServiceNames.size() <= 0) {
                    //ignore
                    return;
                }
​
                for (String namespaceId : allServiceNames.keySet()) {
​
                    ServiceChecksum checksum = new ServiceChecksum(namespaceId);
​
                    for (String serviceName : allServiceNames.get(namespaceId)) {
                        if (!distroMapper.responsible(serviceName)) {
                            continue;
                        }
​
                        Service service = getService(namespaceId, serviceName);
​
                        if (service == null) {
                            continue;
                        }
​
                        service.recalculateChecksum();
​
                        checksum.addItem(serviceName, service.getChecksum());
                    }
​
                    Message msg = new Message();
​
                    msg.setData(JSON.toJSONString(checksum));
​
                    List<Server> sameSiteServers = serverListManager.getServers();
​
                    if (sameSiteServers == null || sameSiteServers.size() <= 0) {
                        return;
                    }
​
                    for (Server server : sameSiteServers) {
                        if (server.getKey().equals(NetUtils.localServer())) {
                            continue;
                        }
                        synchronizer.send(server.getKey(), msg);
                    }
                }
            } catch (Exception e) {
                Loggers.SRV_LOG.error("[DOMAIN-STATUS] Exception while sending service status", e);
            } finally {
                UtilsAndCommons.SERVICE_SYNCHRONIZATION_EXECUTOR.schedule(this, switchDomain.getServiceStatusSynchronizationPeriodMillis(), TimeUnit.MILLISECONDS);
            }
        }
    }
  • ServiceReporter实现Runnable接口,其run方法会遍历allServiceNames,取出distroMapper.responsible的serviceName,重新计算recalculateChecksum,然后添加到ServiceChecksum中,构造Message,遍历sameSiteServers使用synchronizer.send发送该消息;最后往UtilsAndCommons.SERVICE_SYNCHRONIZATION_EXECUTOR重新注册ServiceReporter

ServiceStatusSynchronizer

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/misc/ServiceStatusSynchronizer.java

public class ServiceStatusSynchronizer implements Synchronizer {
    @Override
    public void send(final String serverIP, Message msg) {
        if(serverIP == null) {
            return;
        }
​
        Map<String,String> params = new HashMap<String, String>(10);
​
        params.put("statuses", msg.getData());
        params.put("clientIP", NetUtils.localServer());
​
​
        String url = "http://" + serverIP + ":" + RunningConfig.getServerPort() + RunningConfig.getContextPath() +
                UtilsAndCommons.NACOS_NAMING_CONTEXT + "/service/status";
​
        if (serverIP.contains(UtilsAndCommons.IP_PORT_SPLITER)) {
            url = "http://" + serverIP + RunningConfig.getContextPath() +
                    UtilsAndCommons.NACOS_NAMING_CONTEXT + "/service/status";
        }
​
        try {
            HttpClient.asyncHttpPostLarge(url, null, JSON.toJSONString(params), new AsyncCompletionHandler() {
                @Override
                public Integer onCompleted(Response response) throws Exception {
                    if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
                        Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serviceStatus, remote server: {}", serverIP);
​
                        return 1;
                    }
                    return 0;
                }
            });
        } catch (Exception e) {
            Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serviceStatus, remote server: " + serverIP, e);
        }
​
    }
​
    @Override
    public Message get(String serverIP, String key) {
        if(serverIP == null) {
            return null;
        }
​
        Map<String,String> params = new HashMap<>(10);
​
        params.put("key", key);
​
        String result;
        try {
            if (Loggers.SRV_LOG.isDebugEnabled()) {
                Loggers.SRV_LOG.debug("[STATUS-SYNCHRONIZE] sync service status from: {}, service: {}", serverIP, key);
            }
            result = NamingProxy.reqAPI(RunningConfig.getContextPath()
                + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance/" + "statuses", params, serverIP);
        } catch (Exception e) {
            Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] Failed to get service status from " + serverIP, e);
            return null;
        }
​
        if(result == null || result.equals(StringUtils.EMPTY)) {
            return null;
        }
​
        Message msg = new Message();
        msg.setData(result);
​
        return msg;
    }
}
  • ServiceStatusSynchronizer实现了Synchronizer接口,其send方法会异步执行post请求,将statuses通知到目标server

小结

ServiceReporter实现Runnable接口,其run方法会遍历allServiceNames,取出distroMapper.responsible的serviceName,重新计算recalculateChecksum,然后添加到ServiceChecksum中,构造Message,遍历sameSiteServers使用synchronizer.send发送该消息;最后往UtilsAndCommons.SERVICE_SYNCHRONIZATION_EXECUTOR重新注册ServiceReporter

doc

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 聊聊nacos ServiceManager的UpdatedServiceProcessor

    本文主要研究一下nacos ServiceManager的UpdatedServiceProcessor

    codecraft
  • 聊聊nacos ServiceManager的UpdatedServiceProcessor

    本文主要研究一下nacos ServiceManager的UpdatedServiceProcessor

    codecraft
  • 聊聊spring对kafka的集成方式

    除了官方的java api类库外,spring生态中又额外包装了很多,这里一一简单介绍下。

    codecraft
  • 聊聊nacos ServiceManager的UpdatedServiceProcessor

    本文主要研究一下nacos ServiceManager的UpdatedServiceProcessor

    codecraft
  • 聊聊nacos ServiceManager的UpdatedServiceProcessor

    本文主要研究一下nacos ServiceManager的UpdatedServiceProcessor

    codecraft
  • 力扣LeetCode,唯一摩尔斯密码词

    1、国际摩尔斯密码定义一种标准编码方式,将每个字母对应于一个由一系列点和短线组成的字符串, 比如: "a" 对应 ".-", "b" 对应 "-...", "c...

    别先生
  • 如何优雅地过滤敏感词

    敏感词过滤功能在很多地方都会用到,理论上在Web应用中,只要涉及用户输入的地方,都需要进行文本校验,如:XSS校验、SQL注入检验、敏感词过滤等。今天着重讲讲如...

    黄泽杰
  • 设计师编程指南之Sketch插件开发 9 之 Shape中的oval

    往期文章索引: 1 / 入门基本概念、page的相关操作 2 / artboard 、NSFileManager 和 NSString 关于文件及文件夹的相关操...

    mixlab
  • 物联网是如何驱动网络变革的?——上

    ---导读--- 软件定义网络( SDN)已成为管理和维护企业网络安全的新途径。这是自互联网引入以来,企业网络最重大的变化。它不是一种单一的技术,而是一种涵盖各...

    企鹅号小编
  • 最长单调子序列 复杂度nlog(n)

    owent

扫码关注云+社区

领取腾讯云代金券