前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >微服务架构之服务冶理Dubbo-服务引用

微服务架构之服务冶理Dubbo-服务引用

作者头像
公众号_松华说
发布2019-07-16 11:44:59
8530
发布2019-07-16 11:44:59
举报
文章被收录于专栏:松华说

注:公众号关于dubbo解读文章均基于apache-dubbo-incubating-2.7.1版本,发版于5月26号,此版本注册中心(多数是zookeeper)在某些特殊场景下会出现重复URL地址数据无法删除,导致消费方拿到的是失效地址,从而导致调用失败的问题。如果你也在使用此版本进行源码学习,在网络漂移(下班回家再调试源码)的情况下需要手动删除zookeeper的dubbo节点路径

服务引用示例

代码语言:javascript
复制
public class Application {
    /**
     * In order to make sure multicast registry works, need to specify '-Djava.net.preferIPv4Stack=true' before
     * launch the application
     */
    public static void main(String[] args) {
        ReferenceConfig<DemoService> reference = new ReferenceConfig<>();
        reference.setApplication(new ApplicationConfig("dubbo-demo-api-consumer"));
        reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
        reference.setInterface(DemoService.class);
        DemoService service = reference.get();
        String message = service.sayHello("dubbo");
        System.out.println(message);
    }
}

这里得到的DemoService是什么?将下来一层一层地掀开迷底

ReferenceConfig#get=>ReferenceConfig#init,在init方法中会执行ref = createProxy(map);

代码语言:javascript
复制
@SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
    private T createProxy(Map<String, String> map) {
        //TODO 本地引用inJvm
        if (shouldJvmRefer(map)) {
                invoker = refprotocol.refer(interfaceClass, url);
        } else {
            //TODO urls为服务引用的接口信息,URL是dubbo中的统一数据模型
            if (urls.size() == 1) {
                invoker = refprotocol.refer(interfaceClass, urls.get(0));
            } else {
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                for (URL url : urls) {
                    invokers.add(refprotocol.refer(interfaceClass, url));
                }
                if (registryURL != null) { 
                    invoker = cluster.join(new StaticDirectory(u, invokers));
                } else { 
                    //TODO 直连
                    //TODO 适用于测试环境或者注册中心不可用时需要发布的情况
                   invoker = cluster.join(new StaticDirectory(invokers));
                }
            }
        }
        //TODO 创建服务代理
        return (T) proxyFactory.getProxy(invoker);
    }

直连中的StaticDirectory的作用是本文第一个重点关注对象,实际上集群目录服务实现父抽象类AbstractDirectory的doList模板方法,会返回经过路由过滤后的Invoker列表,路由过滤也就是服务路由,常用于设置分组调用、同机房调用优先、灰度分布、流量切换、读写分离等。另外还有RegistryDirectory、MockDirectory目录服务,可以不加思索地猜想RegistryDirectory会动态维护Invoker列表,StaticDirector则是直接返回

Protocol中的refer方法是被@Adaptive注解修饰的,说明它是一个自适应扩展点,自适应扩展点加在方法层面上,表示会动态生成一个自适应的适配器,比如这里的DubboProtocol$Adaptive,并且默认实现是"dubbo",最终实现可以通过在URL指定

代码语言:javascript
复制
/**
 * Protocol. (API/SPI, Singleton, ThreadSafe)
 */
@SPI("dubbo")
public interface Protocol {

    int getDefaultPort();

    @Adaptive
    <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

    @Adaptive
    <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;

    void destroy();

}

refprotocol#refer先后经过filter包装类ProtocolFilterWrapper、ProtocolListenerWrapper最后执行RegistryProtocol。这些包装类是在创建扩展器时,通过查找构造方法的参数类型获取Wrapper类,然后将自身注入,前者ProtocolFilterWrapper负责过滤器,Dubbo允许我们在provider端设置权限校验、缓存、限流等等一些Filter过滤器,在consumer端也可以设置一些Filter,这是一种责任链模式;后者ProtocolListenerWrapper负责监听器,Dubbo允许consumer端在调用之前、调用之后或出现异常时,触发oninvoke、onreturn、onthrow三个事件

代码语言:javascript
复制
private T createExtension(String name) {

        try {  
            Set<Class<?>> wrapperClasses = cachedWrapperClasses;
            if (CollectionUtils.isNotEmpty(wrapperClasses)) {
                for (Class<?> wrapperClass : wrapperClasses) {
                    instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
                }
            }       
    } 

RegistryProtocol#refer=>doRefer

代码语言:javascript
复制
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        //TODO 对多个invoker进行封装
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // all attributes of REFER_KEY
        Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
        URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
        if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
            directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
            //TODO 注册
            registry.register(directory.getRegisteredConsumerUrl());
        }
        directory.buildRouterChain(subscribeUrl);
        //TODO 订阅,监听变化
        directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
                PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
        //TODO 根据容错模式和负载均衡算法获取invoker
        Invoker invoker = cluster.join(directory);
        ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
        return invoker;
    }

简单回顾下之前提到的服务注册,执行链大概是这样的,Registry根据URL通过registryFactory获取自适应适配类,最后会执行FailbackRegistry中的模块方法doRegister,真正的实现类是ZookeeperRegistry,创建路径节点,将url信息写入zookeeper中

/***************分隔线***********************/

接下来说下订阅监听zookeeper变化

RegistryDirectory#subscribe

代码语言:javascript
复制
public void subscribe(URL url) {
        setConsumerUrl(url);
        consumerConfigurationListener.addNotifyListener(this);
        serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
        registry.subscribe(url, this);
    }

FailbackRegistry#subscribe

代码语言:javascript
复制
@Override
    public void subscribe(URL url, NotifyListener listener) {
        try {
            doSubscribe(url, listener);
        } catch (Exception e) {
            List<URL> urls = getCacheUrls(url);
            if (CollectionUtils.isNotEmpty(urls)) {
                notify(url, listener, urls);
            }
            addFailedSubscribed(url, listener);
        }
    }

    //TODO 模板方法
    public abstract void doSubscribe(URL url, NotifyListener listener);

ZookeeperRegistry#doSubscribe

代码语言:javascript
复制
@Override
    public void doSubscribe(final URL url, final NotifyListener listener) {
                List<URL> urls = new ArrayList<>();
                for (String path : toCategoriesPath(url)) {
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                    if (listeners == null) {
                        zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
                        listeners = zkListeners.get(url);
                    }
                    ChildListener zkListener = listeners.get(listener);
                    if (zkListener == null) {
                        listeners.putIfAbsent(listener, (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)));
                        zkListener = listeners.get(listener);
                    }
                    //TODO 服务提供者dubbo/org.apache.dubbo.demo.DemoService/providers
                    //TODO 服务配置dubbo/org.apache.dubbo.demo.DemoService/configurators
                    /TODO 服务路由/dubbo/org.apache.dubbo.demo.DemoService/routers
                    zkClient.create(path, false);
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    if (children != null) {
                    //TODO 只有服务提供者才有children,也就是具体某个实例
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
                notify(url, listener, urls);
            }       

可以看到当有注册中心有服务列表更新的时候会执行通知,此接口接受三种类别的url,包括服务提供方provider、服务配置configurator、服务路由router。通知方法执行链大概是

FailbackRegistry#notify=>AbstractRegistry#notify=>RegistryDirectory#notify

AbstractRegistry#notify

代码语言:javascript
复制
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
            String category = entry.getKey();
            List<URL> categoryList = entry.getValue();
            categoryNotified.put(category, categoryList);
            //TODO 通知
            listener.notify(categoryList);
            // We will update our cache file after each notification.
            // When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.
            //TODO 更新缓存
            saveProperties(url);
        }  

AbstractRegistry#saveProperties

代码语言:javascript
复制
File lockfile = new File(file.getAbsolutePath() + ".lock");
    if (!lockfile.exists()) {
        lockfile.createNewFile();
    }
    try (RandomAccessFile raf = new RandomAccessFile(lockfile, "rw");
         FileChannel channel = raf.getChannel()) {
        FileLock lock = channel.tryLock();
        if (lock == null) {
            throw new IOException("Can not lock the registry cache file " + file.getAbsolutePath() + ", ignore and retry later, maybe multi java process use the file, please config: dubbo.registry.file=xxx.properties");
        }
        // Save
        try {
            if (!file.exists()) {
                file.createNewFile();
            }
            try (FileOutputStream outputFile = new FileOutputStream(file)) {
                properties.store(outputFile, "Dubbo Registry Cache");
            }
        } finally {
            lock.release();
        }
    }

平时我们更习惯使用输入流和输出流操作文件,将输入流的数据写入到输出流中,但是利用FileChannel会更加高效,它能直连输入和输出流的文件通道

/***************分隔线*************/

回到notify方法中,执行链

RegistryDirectory#notify=>RegistryDirectory#refreshOverrideAndInvoker=>RegistryDirectory#refreshInvoker

代码语言:javascript
复制
@Override
public URL getUrl() {
    return this.overrideDirectoryUrl;
}

 private void refreshOverrideAndInvoker(List<URL> urls) {
        // mock zookeeper://xxx?mock=return null
        overrideDirectoryUrl();
        refreshInvoker(urls);
  }

 private void refreshInvoker(List<URL> invokerUrls) {
        Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
      List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
        routerChain.setInvokers(newInvokers);
        this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
        this.urlInvokerMap = newUrlInvokerMap;
  }


    private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
        Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>();
        String key = url.toFullString(); // The parameter urls are sorted
        invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
        newUrlInvokerMap.put(key, invoker);
        return newUrlInvokerMap;
    } 

在refreshOverrideAndInvoker中会更新routerChain,也会更新overrideDirectoryUrl等,invoker#getUrl实际上是取的overrideDirectoryUrl,而扩展点适配器选择具体实现是根据URL来的。注:URL在Dubbo中是统一的数据模式

protocol#refer又用到了Dubbo SPI机制,执行链是Protocol$Adaptive#refer=>ProtocolListenerWrapper#refer=>ProtocolFilterWrapper#refer=>DubboProtocol#refer,其中在 DubboProtocol#refer方法中会构建DubboInvoker对象

DubboProtocol#refer

代码语言:javascript
复制
@Override
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
    optimizeSerialization(url);

    // create rpc invoker.
    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
    invokers.add(invoker);

    return invoker;
}

refer方法参数中还有一个很重要的DubboProtocol#getClient方法,猜想是Netty客户端的建立,下篇文章再分析

/**************分隔线*************/

回到doRefer中的Cluster.join(directory),Cluster是一个集群容错接口,同时也是一个@Adaptive自适应扩展点,默认实现类是FailoverCluster.NAME。Dubbo主要内置了如下几种策略:失败自动切换Failover、安全失败Failsafe、快速失败Failfast、失败自动恢复Failback、并行调用Forking、广播Broadcast

代码语言:javascript
复制
@SPI(FailoverCluster.NAME)
public interface Cluster {

    /**
     * Merge the directory invokers to a virtual invoker.
     *
     * @param <T>
     * @param directory
     * @return cluster invoker
     * @throws RpcException
     */
    @Adaptive
    <T> Invoker<T> join(Directory<T> directory) throws RpcException;

}

前面我们说过ExtensionLoader在实例化对象时,会将自己(这里是FailOverCluster)注入到包装类中,所以这里实际上是调用MockClusterWrapper#join,所以在ReferenceConfig#createProxy中的invoker = refprotocol.refer(interfaceClass, urls.get(0))中得到的invoker是一个MockClusterWrapper包装类。使用这种机制可以把一些公共处理放在Wrapper包装类中

/************************题外话-开始*************************/

这里插一句题外话,Dubbo是如何实现Forking调用?

代码语言:javascript
复制
// 创建线程池
ExecutorService executor =
  Executors.newFixedThreadPool(3);
// 创建 CompletionService
CompletionService<Integer> cs =
  new ExecutorCompletionService<>(executor);
// 用于保存 Future 对象
List<Future<Integer>> futures =
  new ArrayList<>(3);
// 提交异步任务,并保存 future 到 futures 
futures.add(
  cs.submit(()->sayHello1()));
futures.add(
  cs.submit(()->sayHello2()));
futures.add(
  cs.submit(()->sayHeelo2()));
// 获取最快返回的任务执行结果
Integer r = 0;
try {
  // 只要有一个成功返回,则 break
  for (int i = 0; i < 3; ++i) {
    r = cs.take().get();
    // 简单地通过判空来检查是否成功返回
    if (r != null) {
      break;
    }
  }
} finally {
  // 取消所有任务
  for(Future<Integer> f : futures)
    f.cancel(true);
}
return r;

当需要批量提交异步任务的时候使用CompletionService。CompletionService将线程池 Executor和阻塞队列BlockingQueue的功能融合在了一起,能够让批量异步任务的管理更简单。除此之外,CompletionService能够让异步任务的执行结果有序化,先执行完的先进入阻塞队列,利用这个特性,你可以轻松实现后续处理的有序性,避免无谓的等待。嗯,作者就是这么照顾读者,看源码一定要学到东西,学到东西也会分享出来,所有快快关注“松花皮蛋的黑板报”一起涨见识吧!

/***********题外话-结束******************/

说回到消费端入口ReferenceConfig

代码语言:javascript
复制
private T createProxy(Map<String, String> map) {
        invoker = refprotocol.refer(interfaceClass, urls.get(0));  
        return (T) proxyFactory.getProxy(invoker);
    }

前面我们讲清楚了invoker是一个引用了FailOverClusterInvoker的MockClusterInvoker,接下来看下getProxy,执行链是

ProxyFactory$Adaptive#getProxy=>StubProxyFactoryWrapper#getProxy=>AbstractProxyFactory#getProxy=>JavassistProxyFactory#getProxy

先看下包装类的方法,StubProxyFactoryWrapper#getProxy

代码语言:javascript
复制
@Override
    @SuppressWarnings({"unchecked", "rawtypes"})
    public <T> T getProxy(Invoker<T> invoker) throws RpcException {
        //TODO 这里的invoker是MockClusterInvoker
        //TODO proxyFactory是JavassistProxyFactory
        T proxy = proxyFactory.getProxy(invoker);
       //TODO 泛化调用入口
        if (GenericService.class != invoker.getInterface()) {
                Class<?> stubClass = ReflectUtils.forName(stub);
                Constructor<?> constructor = ReflectUtils.findConstructor(stubClass, serviceType);
                proxy = (T) constructor.newInstance(new Object[]{proxy});
            }
        }
        return proxy;
    }

泛化调用主要用于消费端没有API接口的情况;不需要引入接口JAR包,而是直接通过GenericService接口来发起服务调用,参数及返回值中的所有POJO均用Map表示。泛化调用对于服务端无需关注,按正常服务进行暴露即可。以下几种场景可以考虑使用泛化调用:服务测试平台、API服务网关

再来看下JavassistProxyFactory类

代码语言:javascript
复制
public class JavassistProxyFactory extends AbstractProxyFactory {

    //TODO DUbbo中的URL数据模型中指定有proxyFactory实现,默认是javassist
    @Override
    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }

    @Override
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

}

可以看到getProxy得到的是一个InvokerInvocationHandler,所以在服务端调用RPC接口方法会调用到InvokerInvocationHandler#invoker,其中toString\hashCode\equals方法不走RPC调用。此时开头提出的问题终于有了答案,reference#get得到的是InvokerInvocationHandler对象

代码语言:javascript
复制
@Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        //TODO 以下方法走本地调用
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }
        //TODO 调用MockClusterInvoker#invoker
        return invoker.invoke(createInvocation(method, args)).recreate();
    }

MockClusterInvoker#invoker

代码语言:javascript
复制
@Override
    public Result invoke(Invocation invocation) throws RpcException {
        Result result = null;
        //TODO URL中的mock值,默认是不开启
        String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
        if (value.length() == 0 || value.equalsIgnoreCase("false")) {
            //TODO 不走mock
            result = this.invoker.invoke(invocation);
        } else if (value.startsWith("force")) {
            //TODO 不管是否失败都直接走mock
            result = doMockInvoke(invocation, null);
        } else {
            //TODO 失败时调用mock
            try {
                result = this.invoker.invoke(invocation);
            } catch (RpcException e) {
                result = doMockInvoke(invocation, e);
            }
        }
        return result;
    }   

有了mock,再也不用担心联调慢和上游服务全部宕机

然后执行链会走到AbstractClusterInvoker#invoke

代码语言:javascript
复制
@Override
public Result invoke(final Invocation invocation) throws RpcException {
       //TODO 从directory获取到invokerList
    List<Invoker<T>> invokers = list(invocation);
    //TODO 通过SPI扩展实例化LoadBalance,默认是随机选取
    LoadBalance loadbalance = initLoadBalance(invokers, invocation);
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    //TODO FailoverClusterInvoker
    return doInvoke(invocation, invokers, loadbalance);
}

而list的执行链是RegistryDirectory#doList=>RouterChain#route,之前在分析完成注册中心后订阅变更事件,然后会获取Directory中的RouterChain,这里也验证了开头描述的RegistryDirectory是动态维护的Invoker目录服务

而doInvoke会将loadbalance实例传到FailoverClusterInvoker#doInvoke

代码语言:javascript
复制
@Override
    @SuppressWarnings({"unchecked", "rawtypes"})
    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {

    //TODO 获取invoker
    Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);

     //TODO 执行
     Result result = invoker.invoke(invocation);

 }

FailoverClusterInvoker#doInvoke=>InvokerWrapper#invoke=>ListenerInvokerWrapper#invoke=>ProtocolFilterWrapper#invoke=>AbstractInvoker#invoke=>DubboInvoker

DubboInvoker#doInvoke,异步调用、回调调用,同时这也是与高性能NIO通信框架Netty交互的入口

代码语言:javascript
复制
@Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        ExchangeClient currentClient;//TODO 之前创建的连接
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
         if (isOneway) {//TODO 单向
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                // TODO 将RpcInvocation放到Netty处理流程中
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return new RpcResult();
            } else if (isAsync) {//TODO 异步
               //TODO 将RpcInvocation放到Netty处理流程中
                ResponseFuture future = currentClient.request(inv, timeout);
                FutureAdapter<Object> futureAdapter = new FutureAdapter<>(future);
                RpcContext.getContext().setFuture(futureAdapter);
                Result result;
                if (isAsyncFuture) {//TODO 回调

                    result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
                } else {
                    result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
                }
                return result;
            } else {
                RpcContext.getContext().setFuture(null);
                return (Result) currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException e) {

        } catch (RemotingException e) {

        }
    }
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-06-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 松华说 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
微服务引擎 TSE
微服务引擎(Tencent Cloud Service Engine)提供开箱即用的云上全场景微服务解决方案。支持开源增强的云原生注册配置中心(Zookeeper、Nacos 和 Apollo),北极星网格(腾讯自研并开源的 PolarisMesh)、云原生 API 网关(Kong)以及微服务应用托管的弹性微服务平台。微服务引擎完全兼容开源版本的使用方式,在功能、可用性和可运维性等多个方面进行增强。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档