专栏首页码匠的流水账聊聊NacosNamingService的subscribe及unsubscribe
原创

聊聊NacosNamingService的subscribe及unsubscribe

本文主要研究一下NacosNamingService的subscribe及unsubscribe

NacosNamingService

nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java

public class NacosNamingService implements NamingService {
    private static final String DEFAULT_PORT = "8080";
    private static final long DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5);
​
    /**
     * Each Naming instance should have different namespace.
     */
    private String namespace;
​
    private String endpoint;
​
    private String serverList;
​
    private String cacheDir;
​
    private String logName;
​
    private HostReactor hostReactor;
​
    private BeatReactor beatReactor;
​
    private EventDispatcher eventDispatcher;
​
    private NamingProxy serverProxy;
​
    //......
​
    @Override
    public void subscribe(String serviceName, EventListener listener) throws NacosException {
        subscribe(serviceName, new ArrayList<String>(), listener);
    }
​
    @Override
    public void subscribe(String serviceName, String groupName, EventListener listener) throws NacosException {
        subscribe(serviceName, groupName, new ArrayList<String>(), listener);
    }
​
    @Override
    public void subscribe(String serviceName, List<String> clusters, EventListener listener) throws NacosException {
        subscribe(serviceName, Constants.DEFAULT_GROUP, clusters, listener);
    }
​
    @Override
    public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener) throws NacosException {
        eventDispatcher.addListener(hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),
            StringUtils.join(clusters, ",")), StringUtils.join(clusters, ","), listener);
    }
​
    @Override
    public void unsubscribe(String serviceName, EventListener listener) throws NacosException {
        unsubscribe(serviceName, new ArrayList<String>(), listener);
    }
​
    @Override
    public void unsubscribe(String serviceName, String groupName, EventListener listener) throws NacosException {
        unsubscribe(serviceName, groupName, new ArrayList<String>(), listener);
    }
​
    @Override
    public void unsubscribe(String serviceName, List<String> clusters, EventListener listener) throws NacosException {
        unsubscribe(serviceName, Constants.DEFAULT_GROUP, clusters, listener);
    }
​
    @Override
    public void unsubscribe(String serviceName, String groupName, List<String> clusters, EventListener listener) throws NacosException {
        eventDispatcher.removeListener(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","), listener);
    }
​
    //......
}
  • subscribe方法执行eventDispatcher.addListener;unsubscribe方法执行eventDispatcher.removeListener

EventDispatcher

nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/core/EventDispatcher.java

public class EventDispatcher {
​
    private ExecutorService executor = null;
​
    private BlockingQueue<ServiceInfo> changedServices = new LinkedBlockingQueue<ServiceInfo>();
​
    private ConcurrentMap<String, List<EventListener>> observerMap
        = new ConcurrentHashMap<String, List<EventListener>>();
​
    public EventDispatcher() {
​
        executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r, "com.alibaba.nacos.naming.client.listener");
                thread.setDaemon(true);
​
                return thread;
            }
        });
​
        executor.execute(new Notifier());
    }
​
    public void addListener(ServiceInfo serviceInfo, String clusters, EventListener listener) {
​
        NAMING_LOGGER.info("[LISTENER] adding " + serviceInfo.getName() + " with " + clusters + " to listener map");
        List<EventListener> observers = Collections.synchronizedList(new ArrayList<EventListener>());
        observers.add(listener);
​
        observers = observerMap.putIfAbsent(ServiceInfo.getKey(serviceInfo.getName(), clusters), observers);
        if (observers != null) {
            observers.add(listener);
        }
​
        serviceChanged(serviceInfo);
    }
​
    public void removeListener(String serviceName, String clusters, EventListener listener) {
​
        NAMING_LOGGER.info("[LISTENER] removing " + serviceName + " with " + clusters + " from listener map");
​
        List<EventListener> observers = observerMap.get(ServiceInfo.getKey(serviceName, clusters));
        if (observers != null) {
            Iterator<EventListener> iter = observers.iterator();
            while (iter.hasNext()) {
                EventListener oldListener = iter.next();
                if (oldListener.equals(listener)) {
                    iter.remove();
                }
            }
            if (observers.isEmpty()) {
                observerMap.remove(ServiceInfo.getKey(serviceName, clusters));
            }
        }
    }
​
    public List<ServiceInfo> getSubscribeServices() {
        List<ServiceInfo> serviceInfos = new ArrayList<ServiceInfo>();
        for (String key : observerMap.keySet()) {
            serviceInfos.add(ServiceInfo.fromKey(key));
        }
        return serviceInfos;
    }
​
    public void serviceChanged(ServiceInfo serviceInfo) {
        if (serviceInfo == null) {
            return;
        }
​
        changedServices.add(serviceInfo);
    }
​
    private class Notifier implements Runnable {
        @Override
        public void run() {
            while (true) {
                ServiceInfo serviceInfo = null;
                try {
                    serviceInfo = changedServices.poll(5, TimeUnit.MINUTES);
                } catch (Exception ignore) {
                }
​
                if (serviceInfo == null) {
                    continue;
                }
​
                try {
                    List<EventListener> listeners = observerMap.get(serviceInfo.getKey());
​
                    if (!CollectionUtils.isEmpty(listeners)) {
                        for (EventListener listener : listeners) {
                            List<Instance> hosts = Collections.unmodifiableList(serviceInfo.getHosts());
                            listener.onEvent(new NamingEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), hosts));
                        }
                    }
​
                } catch (Exception e) {
                    NAMING_LOGGER.error("[NA] notify error for service: "
                        + serviceInfo.getName() + ", clusters: " + serviceInfo.getClusters(), e);
                }
            }
        }
    }
​
    public void setExecutor(ExecutorService executor) {
        ExecutorService oldExecutor = this.executor;
        this.executor = executor;
​
        oldExecutor.shutdown();
    }
}
  • EventDispatcher的构造器创建了executor,并执行Notifier;Notifier使用一个while true循环不断执行changedServices.poll(5, TimeUnit.MINUTES)拉取serviceInfo,拉取到的话会从observerMap取出对应的EventListener列表,然后挨个回调listener.onEvent方法
  • addListener方法则是往observerMap创建或添加observers,然后执行serviceChanged方法;removeListener则是从observerMap移除指定的listener,如果指定key的listener列表为空则删除该key
  • serviceChanged方法会往changedServices添加serviceInfo;之后Notifier异步线程可用拉取信息执行listener.onEvent回调

小结

NacosNamingService的subscribe方法执行eventDispatcher.addListener;unsubscribe方法执行eventDispatcher.removeListener

doc

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 聊聊NacosNamingService的subscribe及unsubscribe

    本文主要研究一下NacosNamingService的subscribe及unsubscribe

    codecraft
  • 聊聊NacosNamingService的selectInstances

    本文主要研究一下NacosNamingService的selectInstances

    codecraft
  • 聊聊NacosNamingService的selectInstances

    本文主要研究一下NacosNamingService的selectInstances

    codecraft
  • 聊聊NacosNamingService的subscribe及unsubscribe

    本文主要研究一下NacosNamingService的subscribe及unsubscribe

    codecraft
  • C++函数指针和std::function对象

    这篇博文中通过实现对String字符串大小写转换为列来说明C++中函数指针和std::function对象的使用。

    卡尔曼和玻尔兹曼谁曼
  • JAVA进阶:String源码分析

    乱敲代码
  • JDK源码阅读(二):String源码分析

    乱敲代码
  • AI一分钟 | 娃哈哈要造智能汽车?世界顶级机器学习科学家黄恒加盟京东

    9 月 6 日下午消息,根据国家企业信用信息公示系统资最新工商资料显示,娃哈哈于 8 月 27 日成立了浙江德清娃哈哈科技创新中心有限公司,注册资本为 5000...

    AI科技大本营
  • 一起来约G7~R语言探索约基奇数据的简单小例子

    那作为一个喜欢篮球的R语言初学者,当然不能只看比赛了,还要把约基奇的常规数据探索学起来!

    用户7010445
  • Java Review(三十四、JDBC)

    JDBC指Java 数据库连接,是一种标准Java应用编程接口( JAVA API),用来连接 Java 编程语言和广泛的数据库。

    三分恶

扫码关注云+社区

领取腾讯云代金券