首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >聊聊NacosNamingService的selectInstances

聊聊NacosNamingService的selectInstances

原创
作者头像
code4it
修改2019-10-09 10:44:13
修改2019-10-09 10:44:13
9180
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下NacosNamingService的selectInstances

NacosNamingService

nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java

代码语言:javascript
复制
public class NacosNamingService implements NamingService {
    private static final String DEFAULT_PORT = "8080";
    private static final long DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5);
​
    /**
     * Each Naming instance should have different namespace.
     */
    private String namespace;
​
    private String endpoint;
​
    private String serverList;
​
    private String cacheDir;
​
    private String logName;
​
    private HostReactor hostReactor;
​
    private BeatReactor beatReactor;
​
    private EventDispatcher eventDispatcher;
​
    private NamingProxy serverProxy;
​
    //......
​
    @Override
    public List<Instance> selectInstances(String serviceName, boolean healthy) throws NacosException {
        return selectInstances(serviceName, new ArrayList<String>(), healthy);
    }
​
    @Override
    public List<Instance> selectInstances(String serviceName, String groupName, boolean healthy) throws NacosException {
        return selectInstances(serviceName, groupName, healthy, true);
    }
​
    @Override
    public List<Instance> selectInstances(String serviceName, boolean healthy, boolean subscribe)
        throws NacosException {
        return selectInstances(serviceName, new ArrayList<String>(), healthy, subscribe);
    }
​
    @Override
    public List<Instance> selectInstances(String serviceName, String groupName, boolean healthy, boolean subscribe) throws NacosException {
        return selectInstances(serviceName, groupName, new ArrayList<String>(), healthy, subscribe);
    }
​
    @Override
    public List<Instance> selectInstances(String serviceName, List<String> clusters, boolean healthy)
        throws NacosException {
        return selectInstances(serviceName, clusters, healthy, true);
    }
​
    @Override
    public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy) throws NacosException {
        return selectInstances(serviceName, groupName, clusters, healthy, true);
    }
​
    @Override
    public List<Instance> selectInstances(String serviceName, List<String> clusters, boolean healthy,
                                          boolean subscribe) throws NacosException {
        return selectInstances(serviceName, Constants.DEFAULT_GROUP, clusters, healthy, subscribe);
    }
​
    @Override
    public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException {
​
        ServiceInfo serviceInfo;
        if (subscribe) {
            serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
        } else {
            serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
        }
        return selectInstances(serviceInfo, healthy);
    }
​
    private List<Instance> selectInstances(ServiceInfo serviceInfo, boolean healthy) {
        List<Instance> list;
        if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
            return new ArrayList<Instance>();
        }
​
        Iterator<Instance> iterator = list.iterator();
        while (iterator.hasNext()) {
            Instance instance = iterator.next();
            if (healthy != instance.isHealthy() || !instance.isEnabled() || instance.getWeight() <= 0) {
                iterator.remove();
            }
        }
​
        return list;
    }
​
    //......
}
  • selectInstances首先从hostReactor获取serviceInfo,然后再从serviceInfo.getHosts()剔除非healty、非enabled、weight小于等于0的instance再返回;如果subscribe为true,则执行hostReactor.getServiceInfo获取serviceInfo,否则执行hostReactor.getServiceInfoDirectlyFromServer获取serviceInfo

HostReactor

nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/core/HostReactor.java

代码语言:javascript
复制
public class HostReactor {
​
    private static final long DEFAULT_DELAY = 1000L;
​
    private static final long UPDATE_HOLD_INTERVAL = 5000L;
​
    private final Map<String, ScheduledFuture<?>> futureMap = new HashMap<String, ScheduledFuture<?>>();
​
    private Map<String, ServiceInfo> serviceInfoMap;
​
    private Map<String, Object> updatingMap;
​
    private PushReceiver pushReceiver;
​
    private EventDispatcher eventDispatcher;
​
    private NamingProxy serverProxy;
​
    private FailoverReactor failoverReactor;
​
    private String cacheDir;
​
    private ScheduledExecutorService executor;
​
    //......
​
    public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
​
        NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
        String key = ServiceInfo.getKey(serviceName, clusters);
        if (failoverReactor.isFailoverSwitch()) {
            return failoverReactor.getService(key);
        }
​
        ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
​
        if (null == serviceObj) {
            serviceObj = new ServiceInfo(serviceName, clusters);
​
            serviceInfoMap.put(serviceObj.getKey(), serviceObj);
​
            updatingMap.put(serviceName, new Object());
            updateServiceNow(serviceName, clusters);
            updatingMap.remove(serviceName);
​
        } else if (updatingMap.containsKey(serviceName)) {
​
            if (UPDATE_HOLD_INTERVAL > 0) {
                // hold a moment waiting for update finish
                synchronized (serviceObj) {
                    try {
                        serviceObj.wait(UPDATE_HOLD_INTERVAL);
                    } catch (InterruptedException e) {
                        NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
                    }
                }
            }
        }
​
        scheduleUpdateIfAbsent(serviceName, clusters);
​
        return serviceInfoMap.get(serviceObj.getKey());
    }
​
    private ServiceInfo getServiceInfo0(String serviceName, String clusters) {
​
        String key = ServiceInfo.getKey(serviceName, clusters);
​
        return serviceInfoMap.get(key);
    }
​
    public void updateServiceNow(String serviceName, String clusters) {
        ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
        try {
​
            String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false);
​
            if (StringUtils.isNotEmpty(result)) {
                processServiceJSON(result);
            }
        } catch (Exception e) {
            NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
        } finally {
            if (oldService != null) {
                synchronized (oldService) {
                    oldService.notifyAll();
                }
            }
        }
    }
​
    public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
        if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
            return;
        }
​
        synchronized (futureMap) {
            if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
                return;
            }
​
            ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
            futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
        }
    }
​
    public ServiceInfo getServiceInfoDirectlyFromServer(final String serviceName, final String clusters) throws NacosException {
        String result = serverProxy.queryList(serviceName, clusters, 0, false);
        if (StringUtils.isNotEmpty(result)) {
            return JSON.parseObject(result, ServiceInfo.class);
        }
        return null;
    }
​
    //......
}
  • getServiceInfo首先判断failoverReactor.isFailoverSwitch(),如果是则返回failoverReactor.getService(key);接着通过getServiceInfo0从serviceInfoMap查找,如果找不到则创建一个新的然后放入serviceInfoMap,同时放入updatingMap,执行updateServiceNow,再从updatingMap移除;如果从serviceInfoMap找出来的serviceObj在updatingMap中则等待UPDATE_HOLD_INTERVAL;最后执行scheduleUpdateIfAbsent,再从serviceInfoMap取出serviceInfo
  • updateServiceNow则从serverProxy.queryList获取结果,然后通过processServiceJSON解析并根据需要更新serviceInfoMap;scheduleUpdateIfAbsent方法判断futureMap是否有该任务,如果没有则添加一个UpdateTask
  • getServiceInfoDirectlyFromServer方法则直接请求serverProxy.queryList获取ServiceInfo

UpdateTask

nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/core/HostReactor.java

代码语言:javascript
复制
    public class UpdateTask implements Runnable {
        long lastRefTime = Long.MAX_VALUE;
        private String clusters;
        private String serviceName;
​
        public UpdateTask(String serviceName, String clusters) {
            this.serviceName = serviceName;
            this.clusters = clusters;
        }
​
        @Override
        public void run() {
            try {
                ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
​
                if (serviceObj == null) {
                    updateServiceNow(serviceName, clusters);
                    executor.schedule(this, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
                    return;
                }
​
                if (serviceObj.getLastRefTime() <= lastRefTime) {
                    updateServiceNow(serviceName, clusters);
                    serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
                } else {
                    // if serviceName already updated by push, we should not override it
                    // since the push data may be different from pull through force push
                    refreshOnly(serviceName, clusters);
                }
​
                executor.schedule(this, serviceObj.getCacheMillis(), TimeUnit.MILLISECONDS);
​
                lastRefTime = serviceObj.getLastRefTime();
            } catch (Throwable e) {
                NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
            }
​
        }
    }
​
    public void refreshOnly(String serviceName, String clusters) {
        try {
            serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false);
        } catch (Exception e) {
            NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
        }
    }
​
  • UpdateTask实现了Runnable接口,其run方法首先从serviceInfoMap获取serviceObj,获取不到则执行updateServiceNow,然后再次延时调度UpdateTask;可以从serviceInfoMap获取serviceObj的话则判断serviceObj.getLastRefTime()是否小于等于lastRefTime,是的话则执行updateServiceNow,否则执行refreshOnly;最后再次延时调度UpdateTask,并更新lastRefTime

NamingProxy

nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/net/NamingProxy.java

代码语言:javascript
复制
public class NamingProxy {
​
    private static final int DEFAULT_SERVER_PORT = 8848;
​
    private int serverPort = DEFAULT_SERVER_PORT;
​
    private String namespaceId;
​
    private String endpoint;
​
    private String nacosDomain;
​
    private List<String> serverList;
​
    private List<String> serversFromEndpoint = new ArrayList<String>();
​
    private long lastSrvRefTime = 0L;
​
    private long vipSrvRefInterMillis = TimeUnit.SECONDS.toMillis(30);
​
    private Properties properties;
​
    //......
​
    public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)
        throws NacosException {
​
        final Map<String, String> params = new HashMap<String, String>(8);
        params.put(CommonParams.NAMESPACE_ID, namespaceId);
        params.put(CommonParams.SERVICE_NAME, serviceName);
        params.put("clusters", clusters);
        params.put("udpPort", String.valueOf(udpPort));
        params.put("clientIP", NetUtils.localIP());
        params.put("healthyOnly", String.valueOf(healthyOnly));
​
        return reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/list", params, HttpMethod.GET);
    }
​
    //......
}
  • queryList方法会往/instance/list接口发送GET请求查询服务实例列表

小结

selectInstances首先从hostReactor获取serviceInfo,然后再从serviceInfo.getHosts()剔除非healty、非enabled、weight小于等于0的instance再返回;如果subscribe为true,则执行hostReactor.getServiceInfo获取serviceInfo,否则执行hostReactor.getServiceInfoDirectlyFromServer获取serviceInfo

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • NacosNamingService
  • HostReactor
  • UpdateTask
  • NamingProxy
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档