专栏首页码匠的流水账聊聊NacosNamingService的selectOneHealthyInstance
原创

聊聊NacosNamingService的selectOneHealthyInstance

本文主要研究一下NacosNamingService的selectOneHealthyInstance

NacosNamingService

nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java

public class NacosNamingService implements NamingService {
    private static final String DEFAULT_PORT = "8080";
    private static final long DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5);
​
    /**
     * Each Naming instance should have different namespace.
     */
    private String namespace;
​
    private String endpoint;
​
    private String serverList;
​
    private String cacheDir;
​
    private String logName;
​
    private HostReactor hostReactor;
​
    private BeatReactor beatReactor;
​
    private EventDispatcher eventDispatcher;
​
    private NamingProxy serverProxy;
​
    //......
​
    @Override
    public Instance selectOneHealthyInstance(String serviceName) throws NacosException {
        return selectOneHealthyInstance(serviceName, new ArrayList<String>());
    }
​
    @Override
    public Instance selectOneHealthyInstance(String serviceName, String groupName) throws NacosException {
        return selectOneHealthyInstance(serviceName, groupName, true);
    }
​
    @Override
    public Instance selectOneHealthyInstance(String serviceName, boolean subscribe) throws NacosException {
        return selectOneHealthyInstance(serviceName, new ArrayList<String>(), subscribe);
    }
​
    @Override
    public Instance selectOneHealthyInstance(String serviceName, String groupName, boolean subscribe) throws NacosException {
        return selectOneHealthyInstance(serviceName, groupName, new ArrayList<String>(), subscribe);
    }
​
    @Override
    public Instance selectOneHealthyInstance(String serviceName, List<String> clusters) throws NacosException {
        return selectOneHealthyInstance(serviceName, clusters, true);
    }
​
    @Override
    public Instance selectOneHealthyInstance(String serviceName, String groupName, List<String> clusters) throws NacosException {
        return selectOneHealthyInstance(serviceName, groupName, clusters, true);
    }
​
    @Override
    public Instance selectOneHealthyInstance(String serviceName, List<String> clusters, boolean subscribe)
        throws NacosException {
        return selectOneHealthyInstance(serviceName, Constants.DEFAULT_GROUP, clusters, subscribe);
    }
​
    @Override
    public Instance selectOneHealthyInstance(String serviceName, String groupName, List<String> clusters, boolean subscribe) throws NacosException {
​
        if (subscribe) {
            return Balancer.RandomByWeight.selectHost(
                hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")));
        } else {
            return Balancer.RandomByWeight.selectHost(
                hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")));
        }
    }
​
    //......
}
  • selectOneHealthyInstance跟selectInstances类似,只不过它返回的是单个instance;selectOneHealthyInstance也是先从hostReactor获取serviceInfo
  • 如果subscribe为true,则执行hostReactor.getServiceInfo获取serviceInfo,否则执行hostReactor.getServiceInfoDirectlyFromServer获取serviceInfo
  • 获取到serviceInfo之后,selectOneHealthyInstance通过Balancer.RandomByWeight.selectHost方法来选取单个healthy的instance

Balancer

nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/core/Balancer.java

public class Balancer {
​
    /**
     * report status to server
     */
    public final static List<String> UNCONSISTENT_SERVICE_WITH_ADDRESS_SERVER = new CopyOnWriteArrayList<String>();
​
    public static class RandomByWeight {
​
        public static List<Instance> selectAll(ServiceInfo serviceInfo) {
            List<Instance> hosts = serviceInfo.getHosts();
​
            if (CollectionUtils.isEmpty(hosts)) {
                throw new IllegalStateException("no host to srv for serviceInfo: " + serviceInfo.getName());
            }
​
            return hosts;
        }
​
        public static Instance selectHost(ServiceInfo dom) {
​
            List<Instance> hosts = selectAll(dom);
​
            if (CollectionUtils.isEmpty(hosts)) {
                throw new IllegalStateException("no host to srv for service: " + dom.getName());
            }
​
            return getHostByRandomWeight(hosts);
        }
    }
​
    /**
     * Return one host from the host list by random-weight.
     *
     * @param hosts The list of the host.
     * @return The random-weight result of the host
     */
    protected static Instance getHostByRandomWeight(List<Instance> hosts) {
        NAMING_LOGGER.debug("entry randomWithWeight");
        if (hosts == null || hosts.size() == 0) {
            NAMING_LOGGER.debug("hosts == null || hosts.size() == 0");
            return null;
        }
​
        Chooser<String, Instance> vipChooser = new Chooser<String, Instance>("www.taobao.com");
​
        NAMING_LOGGER.debug("new Chooser");
​
        List<Pair<Instance>> hostsWithWeight = new ArrayList<Pair<Instance>>();
        for (Instance host : hosts) {
            if (host.isHealthy()) {
                hostsWithWeight.add(new Pair<Instance>(host, host.getWeight()));
            }
        }
        NAMING_LOGGER.debug("for (Host host : hosts)");
        vipChooser.refresh(hostsWithWeight);
        NAMING_LOGGER.debug("vipChooser.refresh");
        return vipChooser.randomWithWeight();
    }
}
  • Balancer的RandomByWeight提供了selectAll及selectHost方法;selectAll针对serviceInfo.getHosts()进行了空判断,空的话会抛出IllegalStateException
  • selectHost方法内部调用了selectAll方法,其最后通过getHostByRandomWeight来选取单个healthy的instance
  • getHostByRandomWeight方法首先创建一个Chooser,然后选取healthy的instance构造hostsWithWeight,再通过vipChooser.refresh(hostsWithWeight)进行refresh,最后通过vipChooser.randomWithWeight()选取单个healthy的instance

Chooser

nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/utils/Chooser.java

public class Chooser<K, T> {
​
    private K uniqueKey;
​
    private volatile Ref<T> ref;
​
    public T random() {
        List<T> items = ref.items;
        if (items.size() == 0) {
            return null;
        }
        if (items.size() == 1) {
            return items.get(0);
        }
        return items.get(ThreadLocalRandom.current().nextInt(items.size()));
    }
​
    public T randomWithWeight() {
        Ref<T> ref = this.ref;
        double random = ThreadLocalRandom.current().nextDouble(0, 1);
        int index = Arrays.binarySearch(ref.weights, random);
        if (index < 0) {
            index = -index - 1;
        } else {
            return ref.items.get(index);
        }
​
        if (index >= 0 && index < ref.weights.length) {
            if (random < ref.weights[index]) {
                return ref.items.get(index);
            }
        }
​
        /* This should never happen, but it ensures we will return a correct
         * object in case there is some floating point inequality problem
         * wrt the cumulative probabilities. */
        return ref.items.get(ref.items.size() - 1);
    }
​
    public Chooser(K uniqueKey) {
        this(uniqueKey, new ArrayList<Pair<T>>());
    }
​
    public Chooser(K uniqueKey, List<Pair<T>> pairs) {
        Ref<T> ref = new Ref<T>(pairs);
        ref.refresh();
        this.uniqueKey = uniqueKey;
        this.ref = ref;
    }
​
    public K getUniqueKey() {
        return uniqueKey;
    }
​
    public Ref<T> getRef() {
        return ref;
    }
​
    public void refresh(List<Pair<T>> itemsWithWeight) {
        Ref<T> newRef = new Ref<T>(itemsWithWeight);
        newRef.refresh();
        newRef.poller = this.ref.poller.refresh(newRef.items);
        this.ref = newRef;
    }
​
    //......
}
  • Chooser的refresh方法会根据itemsWithWeight创建Ref,然后执行Ref的refresh方法;randomWithWeight方法通过Arrays.binarySearch(ref.weights, random)创建初始index,然后根据index从ref.items获取元素

Ref

nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/utils/Chooser.java

    public class Ref<T> {
        private List<Pair<T>> itemsWithWeight = new ArrayList<Pair<T>>();
        private List<T> items = new ArrayList<T>();
        private Poller<T> poller = new GenericPoller<T>(items);
        private double[] weights;
​
        @SuppressWarnings("unchecked")
        public Ref(List<Pair<T>> itemsWithWeight) {
            this.itemsWithWeight = itemsWithWeight;
        }
​
        public void refresh() {
            Double originWeightSum = (double) 0;
​
            for (Pair<T> item : itemsWithWeight) {
​
                double weight = item.weight();
                //ignore item which weight is zero.see test_randomWithWeight_weight0 in ChooserTest
                if (weight <= 0) {
                    continue;
                }
​
                items.add(item.item());
                if (Double.isInfinite(weight)) {
                    weight = 10000.0D;
                }
                if (Double.isNaN(weight)) {
                    weight = 1.0D;
                }
                originWeightSum += weight;
            }
​
            double[] exactWeights = new double[items.size()];
            int index = 0;
            for (Pair<T> item : itemsWithWeight) {
                double singleWeight = item.weight();
                //ignore item which weight is zero.see test_randomWithWeight_weight0 in ChooserTest
                if (singleWeight <= 0) {
                    continue;
                }
                exactWeights[index++] = singleWeight / originWeightSum;
            }
​
            weights = new double[items.size()];
            double randomRange = 0D;
            for (int i = 0; i < index; i++) {
                weights[i] = randomRange + exactWeights[i];
                randomRange += exactWeights[i];
            }
​
            double doublePrecisionDelta = 0.0001;
​
            if (index == 0 || (Math.abs(weights[index - 1] - 1) < doublePrecisionDelta)) {
                return;
            }
            throw new IllegalStateException("Cumulative Weight caculate wrong , the sum of probabilities does not equals 1.");
        }
​
        //......
    }
  • Ref的refresh方法主要是初始化items及weights

小结

  • selectOneHealthyInstance跟selectInstances类似,只不过它返回的是单个instance;selectOneHealthyInstance也是先从hostReactor获取serviceInfo
  • 如果subscribe为true,则执行hostReactor.getServiceInfo获取serviceInfo,否则执行hostReactor.getServiceInfoDirectlyFromServer获取serviceInfo
  • 获取到serviceInfo之后,selectOneHealthyInstance通过Balancer.RandomByWeight.selectHost方法来选取单个healthy的instance

doc

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 聊聊NacosNamingService的selectOneHealthyInstance

    本文主要研究一下NacosNamingService的selectOneHealthyInstance

    codecraft
  • 聊聊dubbo的ConfigChangeEvent

    dubbo-2.7.3/dubbo-configcenter/dubbo-configcenter-api/src/main/java/org/apache/d...

    codecraft
  • 聊聊spring cloud gateway的RedirectToGatewayFilter

    本文主要研究下spring cloud gateway的RedirectToGatewayFilter

    codecraft
  • 聊聊NacosNamingService的selectOneHealthyInstance

    本文主要研究一下NacosNamingService的selectOneHealthyInstance

    codecraft
  • Spring Boot使用Shiro实现登录授权认证

    1、Shiro是Apache下的一个开源项目,我们称之为Apache Shiro。它是一个很易用与Java项目的的安全框架,提供了认证、授权、加密、会话管理,与...

    朝雨忆轻尘
  • 微信公众号开发消息推送以及图文推送

    (adsbygoogle =window.adsbygoogle ||[]).push({});

    猿码优创
  • java之struts2的数据处理

    struts2中有3种方式来接收请求提交的数据。分别是:属性驱动方式、对象驱动方式、模型驱动方式

    Vincent-yuan
  • 18.手写Spring MVC

    Serviet 的生命周期由 init()到 service()再到 destory()组成, destory()方法我们 不做实现。

    编程之心
  • 我的开源项目|可扩展、可自由玩耍的miniexcel,不需要考虑OOM

    去年我也写过一篇文章介绍miniexcel,不过现在加了点新功能,并且现在可以通过依赖配置使用啦。

    wujiuye
  • 《项目架构那点儿事》——Hibernate泛型Dao,让持久层简洁起来

    【前言】hibernate作为持久层ORM技术,它对JDBC进行非常轻量级对象封装,使得我们可以随心所欲的使用面向对象的思想来操作数据 库。同时,作为后台开发的...

    I Tech You_我教你

扫码关注云+社区

领取腾讯云代金券