专栏首页并发笔记手写dubbo框架4-服务治理(服务发现-zookeeper)

手写dubbo框架4-服务治理(服务发现-zookeeper)

博客中代码地址:https://github.com/farliu/farpc.git

本文实现的是服务的发现,也就是图片中的第2,3步,经过上一章的服务注册,对于服务发现我们只需要从zookeeper中取得对应的provider就行了。

项目结构介绍

本节涉及博客中代码的两个module,farpc-registry(服务治理)、farpc-cluster(集群管理)。

在上一章的基础上,我们扩展IRegistrar接口,增加discover方法。

public interface IRegistrar {
    /**     * 初始化     * @param registerAddress zookeeper地址,例如127.0.0.1:2181     */    void init(String registerAddress);
    /**     * 注册服务     * @param providerAddress 服务提供者地址     * @param service 服务     */    void register(String providerAddress, String service);
    String discover(String service);}

farpc-cluster同样提供类似的接口ILoadbalance。

public interface ILoadbalance {
    String select(List<String> providers);}

服务发现

基于上一章的代码,zookeeper的连接已经在init方法中初始化了。那么本节要实现的服务发现就是,从zookeeper取得某一个服务下的所有节点,也就是provider。一起来看看代码怎么写。

private static final String SEPARATOR = "/";private static final String FOLDER = "/faregistrys";private Map<String, List<String>> serviceProviderMap = new HashMap<String, List<String>>();
public String discover(String service) {    String path = FOLDER + SEPARATOR + service;    try {        // ---- 1        List<String> provider = curatorFramework.getChildren().forPath(path);
        // ---- 2        serviceProviderMap.put(service, provider);
        watchProvider(path);
       return serviceProviderMap.get(service).get(0);    } catch (Exception e) {        logger.error(String.format("call ZookeeperRegistrarImpl.discover, occur exception, service:[%s], e.getMessage:[%s]", service, e.getMessage()), e);        return "";    }
     private void watchProvider(final String path) {        PathChildrenCache childrenCache = new PathChildrenCache(curatorFramework, path, true);        PathChildrenCacheListener listener = new PathChildrenCacheListener() {            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {                //提供者有更新,则及时更新到内存中                serviceProviderMap.put(path, curatorFramework.getChildren().forPath(path));            }        };        childrenCache.getListenable().addListener(listener);        try {            childrenCache.start();        } catch (Exception e) {            throw new RuntimeException(e.getMessage(), e);        }    }}

服务发现的代码也很简单,就是从zookeeper中根据服务取到provider,这里没有做负载均衡,所以始终返回第一个。继续解析一下要注意的地方。

第一点,返回的是一个list,是因为某一个节点路径下肯定是存在多个节点的,也就意味着某一个服务应该有多个provider。

第二点,使用一个map保存,是为了监听zookeeper中provider的变化,也就是watchProvider()方法,给指定的路径添加监听器,当有更新时,更新map中的信息。而这里就是zookeeper提供的监听机制。达到服务动态发现的效果。

同样的我们测试一下代码,记得要先注册服务

public class ConsumerTest {    @Test    public void test() throws IOException {        IRegistrar registrar = new ZookeeperRegistrarImpl();        registrar.init("127.0.0.1:2181");        System.out.println(registrar.discover("com.ofcoder.farpc.demo.api.IWelcome"));    }}
------------------------------------------127.0.0.1:20880

正确输出zookeeper中管理服务提供者地址。那么基于zookeeper的服务发现也就实现了。

负载均衡

在文中一直有提到,从注册中心拿到的是一个list的provider,那么我们需要做一个类似负载均衡的东西,也就是从众多的provider中,取其中一个来真正调用。这里提供两种方式。

// 随机取其中一个public class RandomLoadbalanceImpl implements ILoadbalance {    public String select(List<String> providers) {        int len = providers.size();        Random random = new Random();        int lucky = random.nextInt(len);        return providers.get(lucky);    }}
// 一个个的遍历取public class RoundLoadBalanceImpl implements ILoadbalance {
    private AtomicInteger previous = new AtomicInteger(0);
    public String select(List<String> providers) {        int size = providers.size();        if (previous.get() >= size) {            previous.set(0);        }        String provider = providers.get(previous.get());        previous.set(previous.get() + 1);        return provider;    }}

那么使用的话,我新建了一个抽象类实现IRegistrar,来完成负载均衡.

public abstract class AbstractRegistrar implements IRegistrar {    public String discover(String service) {        List<String> providers = lookup(service);        ILoadbalance loadbalance = new RoundLoadBalanceImpl();        String select = loadbalance.select(providers);        return select;    }
    public abstract List<String> lookup(String service);}

将之前ZookeeperRegistrarImpl实现AbstractRegistrar,在lookup中完成之前实现的服务发现。

public class ZookeeperRegistrarImpl extends AbstractRegistrar {    private static final Logger logger = LoggerFactory.getLogger(ZookeeperRegistrarImpl.class);    private static final int SESSION_TIMEOUT_MS = 5000;    private static final int SLEEP_TIME_MS = 1000;    private static final int MAX_RETRIES = 2;    private static final String SEPARATOR = "/";    private static final String FOLDER = "/faregistrys";
    private Map<String, List<String>> serviceProviderMap = new HashMap<String, List<String>>();    private CuratorFramework curatorFramework;
    @Override    public List<String> lookup(String service) {        String path = FOLDER + SEPARATOR + service;        try {            List<String> provider = curatorFramework.getChildren().forPath(path);            serviceProviderMap.put(service, provider);
            watchProvider(path);
            return serviceProviderMap.get(service);        } catch (Exception e) {            logger.error(String.format("call ZookeeperRegistrarImpl.discover, occur exception, service:[%s], e.getMessage:[%s]", service, e.getMessage()), e);            return null;        }    }
    private void watchProvider(final String path) {        PathChildrenCache childrenCache = new PathChildrenCache(curatorFramework, path, true);        PathChildrenCacheListener listener = new PathChildrenCacheListener() {            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {                //提供者有更新,则及时更新到内存中                serviceProviderMap.put(path, curatorFramework.getChildren().forPath(path));            }        };        childrenCache.getListenable().addListener(listener);        try {            childrenCache.start();        } catch (Exception e) {            throw new RuntimeException(e.getMessage(), e);        }    }}

以上则为服务发现的所有代码,意在模拟dubbo,而不是照抄dubbo,希望可以帮助大家对dubbo服务治理有一定的了解。

dubbo源码

ZookeeperRegistry.doSubscribe()public void doSubscribe(final URL url, final NotifyListener listener) {    try {        if (ANY_VALUE.equals(url.getServiceInterface())) {            String root = toRootPath();            ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);            if (listeners == null) {                zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());                listeners = zkListeners.get(url);            }            // ------------ 1            ChildListener zkListener = listeners.get(listener);            if (zkListener == null) {                listeners.putIfAbsent(listener, (parentPath, currentChilds) -> {                    for (String child : currentChilds) {                        child = URL.decode(child);                        if (!anyServices.contains(child)) {                            anyServices.add(child);                            subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child,                                    Constants.CHECK_KEY, String.valueOf(false)), listener);                        }                    }                });                zkListener = listeners.get(listener);            }           ....}
ZookeeperRegistry.lookup()public List<URL> lookup(URL url) {    if (url == null) {        throw new IllegalArgumentException("lookup url == null");    }    try {        List<String> providers = new ArrayList<>();        for (String path : toCategoriesPath(url)) {            // ------------ 2            List<String> children = zkClient.getChildren(path);            if (children != null) {                providers.addAll(children);            }        }        return toUrlsWithoutEmpty(url, providers);    } catch (Throwable e) {        throw new RpcException("Failed to lookup " + url + " from zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);    }}

源码中标注的1,2点,是不是也看到了我们的代码影子,第一点使用zookeeper的监听器,第二点根据url获得一个list的provider。只是dubbo获取的方式不一样而已。

本文分享自微信公众号 - 并发笔记(ofcoder),作者:far

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-07-09

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 手写dubbo 5-服务治理(redis番外篇)

    博客中代码地址:https://github.com/farliu/farpc.git

    far
  • swagger增加接口版本管理

    怎么使用swagger,这里就不说了,本站已经跟各大搜索引擎达成合作,只要你在各大搜索引擎中输入关键词springboot swagger,就会在第一页返回给你...

    far
  • 自定义注解2-动态修改注解的属性值

    经过上一节的,我们可以自己解析spel表达式。那么我现在的想法是,在注解的第一层aop中解析spel,然后将解析后的值设置到属性中,那么在之后的aop中就不用解...

    far
  • Java基础知识之Scanner类和String类学习,讲明白了,适合初学者

    1、Scanner 的概述和方法介绍 A:Scanner 的概述 B:Scanner 的构造方法原理 Scanner(InputStream source) S...

    用户1289394
  • Map排序

    Map排序的方式有很多种,这里记录下自己总结的两种比较常用的方式:按键排序(sort by key), 按值排序(sort by value)。 按键排序(...

    xiangzhihong
  • 这样规范写代码,同事直呼“666”

    zhisheng
  • 实时数仓 | Flink实时维表join方法总结(附项目源码)

    在我们实时数仓日常工作中,经常会有一些实时的需求,这些需求往往都是一些拉宽的需求。为了给实时数仓来进行OLAP对来进行Ad-hoc查询,但是我们工作中一些维度表...

    zhisheng
  • 这样规范写代码,同事直呼“666”

    当遇到多个查询条件,使用where 1=1 可以很方便的解决我们的问题,但是这样很可能会造成非常大的性能损失,因为添加了 “where 1=1 ”的过滤条件之后...

    xcbeyond
  • java-小程序微信支付

    哈喽 我是你们的KingYiFan,一直说把微信支付给分享出来一直没有机会。终于闲下来了。听着音乐给你们分享一下。不懂可以随时联系我。。

    猿码优创
  • java中两个map比较

    ydymz

扫码关注云+社区

领取腾讯云代金券