前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >源码分析Dubbo服务调用-服务提供者如何处理请求命令与再谈Invoker

源码分析Dubbo服务调用-服务提供者如何处理请求命令与再谈Invoker

作者头像
丁威
发布2019-06-10 17:21:43
5420
发布2019-06-10 17:21:43
举报
文章被收录于专栏:中间件兴趣圈中间件兴趣圈

源码分析Dubbo NettyServer与HeaderExchangeServer微信公众号:[中间件兴趣圈] 作者简介:《RocketMQ技术内幕》作者

本文将重点剖析服务调用流程,也就是消费端通过网络发起RPC服务调用时,服务提供者是如何进行服务请求响应的。

Dubbo服务调用流程

了解过Netty网络编程的童鞋们应该知道,netty的命令处理基于ChannelHandler(事件处理链),编码、网络传输、解码、处理等,下面是关于NettyServer的初始化过程,该部分在源码分析Dubbo NettyServer与HeaderExchangeServer时已经讲过,本节先做一个简单的回顾:

根据服务暴露的协议,本文以dubbo协议为例:首先通过DubboProtocol通过export暴露服务时,会尝试创建关于该协议的Server服务器,服务器按ip:port进行缓存,本文重点关注ChannelHandler的包装链条,首先看一下NettyServer创建时关于Handler的绑定,代码如下:

NettyServer#doOpen
代码语言:javascript
复制
 1final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
 2channels = nettyHandler.getChannels();
 3// https://issues.jboss.org/browse/NETTY-365
 4// https://issues.jboss.org/browse/NETTY-379
 5// final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
 6bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
 7    @Override
 8    public ChannelPipeline getPipeline() {
 9         NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
10         ChannelPipeline pipeline = Channels.pipeline();
11         pipeline.addLast("decoder", adapter.getDecoder());    // @start
12         pipeline.addLast("encoder", adapter.getEncoder());    
13         pipeline.addLast("handler", nettyHandler);                  // @end
14         return pipeline;
15    }
16});

可以看出,传入Netty框架的事件处理Handler主要是3个:1、解码器;2、编码器;3、业务类NettyHandler。也就是说当服务端(Server)的读事件就绪后,进行网络读写后,会将二进制流传入解码器(Decoder),解码出一个一个的RPC请求,然后针对每一个RPC请求,交给NettyHandler相关事件处理方法去处理,在这里传入NettyHandler的ChannelHandler为NettyServer,以网络读命令为例,最终将调用NettyServer的父类AbstractPeer的received方法:

代码语言:javascript
复制
1@Override
2 public void received(Channel ch, Object msg) throws RemotingException {
3     if (closed) {
4         return;
5     }
6     handler.received(ch, msg);
7 }

那AbstractPeer中的ChannelHandler又是“何许人也”,是通过调用NettyServer(URL url, ChannelHandler handler)中传入的,结合上图中NettyServer的构建流程,可以追溯其流程如下:

DubboProtocol#createServer

server = Exchangers.bind(url, requestHandler); // @1

requestHandler,为最原始的ChannelHandler,接下来整个过程都是对该handler的包装。

HeaderExchanger#bind
代码语言:javascript
复制
1return new HeaderExchangeServer(Transporters.bind(url, new 
2DecodeHandler(new HeaderExchangeHandler(handler))));

其包装顺序为 DecodeHandler 》HeaderExchangeHandler 》(DubboProtocol#requestHandler)

NettyTransporter#bind
NettyServer构造函数
代码语言:javascript
复制
1super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));

这里主要包装的是事件的派发Handler,例如AllChannelHandler、ExecutionChannelHandler【Dispatch】业务Handler最终的包装顺序为:事件派发模型handler[AllChannelHandler] 》DecodeHandler 》HeaderExchangeHandler 》 DubboProtocol#requestHandler(最终的业务Handler)。结合网络Netty的处理Handler,服务端事件Handler的处理为:DubboCodec2(解码器) 》 事件派发模型handler[AllChannelHandler] 》DecodeHandler 》 HeaderExchangeHandler》 DubboProtocol#requestHandler(最终的业务Handler)。

上述Handler都在前面的章节中详细介绍了,接下来重点分析服务调用流程,自然需要从DubboProtocol的内部类requestHandler入手。

requestHandler#recive
代码语言:javascript
复制
1@Override
2public void received(Channel channel, Object message) throws RemotingException {
3     if (message instanceof Invocation) {
4          reply((ExchangeChannel) channel, message);
5     } else {
6          super.received(channel, message);
7     }
8 }

如果是服务调用,就进入到reply方法中,否则调用父类进行请求响应。

requestHandler#reply
代码语言:javascript
复制
 1@Override
 2public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
 3     if (message instanceof Invocation) {
 4          Invocation inv = (Invocation) message;                                                // @1
 5          Invoker<?> invoker = getInvoker(channel, inv);                                   // @2
 6          // need to consider backward-compatibility if it's a callback
 7          if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
 8                String methodsStr = invoker.getUrl().getParameters().get("methods");
 9                boolean hasMethod = false;
10                if (methodsStr == null || methodsStr.indexOf(",") == -1) {
11                      hasMethod = inv.getMethodName().equals(methodsStr);
12                } else {
13                     String[] methods = methodsStr.split(",");
14                     for (String method : methods) {
15                          if (inv.getMethodName().equals(method)) {
16                             hasMethod = true;
17                             break;
18                           }
19                      }
20                }
21               if (!hasMethod) {
22                   logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
23                                + " not found in callback service interface ,invoke will be ignored."
24                                + " please update the api interface. url is:"
25                                + invoker.getUrl()) + " ,invocation is :" + inv);
26                   return null;
27               }
28          }
29         RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
30         return invoker.invoke(inv);                    // @3
31   }
32  throw new RemotingException(channel, "Unsupported request: "
33                    + (message == null ? null : (message.getClass().getName() + ": " + message))
34                    + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
35}

代码@1:获取服务调用信息,例如调用服务类名(interface)、服务方法名、参数类型,参数值。

代码@2:获取调用者Invoker。

代码@3:调用Invoker,执行具体的方法调用。

上述过程非常简单,但其关键点在于Invoker,那这个Invoker到底是什么呢? 回顾一下我们在讲解:Dubbo服务提供者启动流程中已提到Invoker,在本篇中我们再次对该文进行补充说明。

再谈Dubbo Invoker

服务提供者视角看Invoker

我们应该记得,服务提供者在暴露服务时(export)会创建Invoker,其代码片段如下:

ServiceConfig#doExportUrlsFor1Protocol

代码语言:javascript
复制
 1if (registryURLs != null && !registryURLs.isEmpty()) {
 2      for (URL registryURL : registryURLs) {
 3          url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
 4          URL monitorUrl = loadMonitor(registryURL);
 5          if (monitorUrl != null) {
 6              url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
 7          }
 8          if (logger.isInfoEnabled()) {
 9               logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
10          }
11          Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));  // @1
12          DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);     // @2
13          Exporter<?> exporter = protocol.export(wrapperInvoker);    // @3
14          exporters.add(exporter);
15       }
16} else {
17      Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
18      DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
19      Exporter<?> exporter = protocol.export(wrapperInvoker);
20      exporters.add(exporter);
21}

代码@1:根据 首先获取ref的代理对象,真正的服务实现类proxy,然后通过proxyFactory【JavassistProxyFactory、JdkProxyFactory】创建最原始的Invoker,即AbstractProxyInvoker,使用的是匿名实现类,即提供反射方式进行方法的调用。

从abstract Object doInvoker(T proxy, String methodName, Class< ? >[] parameterTypes, Object[] arguments) 可以最终是通过对象发射方式进行方法调用。

代码@2:首先使用DelegateProviderMetaDataInvoker对AbstractProxyInvoker进行包装,主要是将ServerConfig对象与Invoker一起保存。

代码@3:根据具体协议对服务端Invoker进行导出(继续包装)。

代码语言:javascript
复制
1registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.0&export=dubbo%3A%2F%2F192.168.56.1%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26bind.ip%3D192.168.56.1%26bind.port%3D20880%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D14360%26qos.port%3D22222%26side%3Dprovider%26stub%3Dcom.alibaba.dubbo.demo.provider.DemoServiceStub%26timestamp%3D1533944510702&pid=14360&qos.port=22222&registry=zookeeper&timestamp=1533944510687

协议前缀:registry,故根据SPI机制,具体的协议为RegistryProtocol。 registry=zookeeper :代表注册中心使用zookeeper,在连接注册中心时根据该值进行策略选择。

export= dubbo://… : 根据export,在服务端按照协议启动对应的服务端程序,该协议注意指定请求包的二进制协议,例如协议头和协议体。

按照registry协议,将调用RegistryProtocol#export,但我们忽略了Dubbo的另一机制,该部分也是在服务提供者启动流程中被遗漏。Dubbo为了对服务调用进行包装,采用了过滤器Filter 链模式,在AbstractProxyInvoker调用之前,先执行一系列的过滤器Filter,Dubbo协议默认的协议层面的过滤器代理实现为:com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper,SPI定义文件见:

dubbo-rpc-api/src/main/resources/METAINF/dubbo/internal/com.alibaba.dubbo.rpc.Protocol

代码语言:javascript
复制
1filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper
2listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper
3mock=com.alibaba.dubbo.rpc.support.MockProtocol
ProtocolFilterWrapper#export
代码语言:javascript
复制
1public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
2     if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {     // @1
3        return protocol.export(invoker);
4     }
5     return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));    // @2
6}

代码@1:如果协议为registry,则直接调用RegistryProtocol#expoert完成协议导出,协议为registry其含义是通过注册中心暴露,最终会根据expoert,调用具体的协议进行服务暴露,最终会再次进入该方法。

代码@2:如果为具体协议,例如dubbo等,则通过buildInvokerChain构建Invoker链。

ProtocolFilterWrapper#buildInvokerChain

代码语言:javascript
复制
 1private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
 2        Invoker<T> last = invoker;
 3        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);    // @1
 4        if (!filters.isEmpty()) {
 5            for (int i = filters.size() - 1; i >= 0; i--) {
 6                final Filter filter = filters.get(i);
 7                final Invoker<T> next = last;
 8                last = new Invoker<T>() {       // @2
 9
10                    @Override
11                    public Class<T> getInterface() {
12                        return invoker.getInterface();
13                    }
14
15                    @Override
16                    public URL getUrl() {
17                        return invoker.getUrl();
18                    }
19
20                    @Override
21                    public boolean isAvailable() {
22                        return invoker.isAvailable();
23                    }
24
25                    @Override
26                    public Result invoke(Invocation invocation) throws RpcException {
27                        return filter.invoke(next, invocation);
28                    }
29
30                    @Override
31                    public void destroy() {
32                        invoker.destroy();
33                    }
34
35                    @Override
36                    public String toString() {
37                        return invoker.toString();
38                    }
39                };
40            }
41        }
42        return last;
43    }

代码@1:加载系统配置的所有Filer,并根据作用对象(服务提供者、服务消费者),返回合适的Filter链表。

代码@2:根据Filter构建Invoker链。 Dubbo默认提供的Filter在dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Filter中定义,其内容为:

代码语言:javascript
复制
 1echo=com.alibaba.dubbo.rpc.filter.EchoFilter
 2generic=com.alibaba.dubbo.rpc.filter.GenericFilter
 3genericimpl=com.alibaba.dubbo.rpc.filter.GenericImplFilter
 4token=com.alibaba.dubbo.rpc.filter.TokenFilter
 5accesslog=com.alibaba.dubbo.rpc.filter.AccessLogFilter
 6activelimit=com.alibaba.dubbo.rpc.filter.ActiveLimitFilter
 7classloader=com.alibaba.dubbo.rpc.filter.ClassLoaderFilter
 8context=com.alibaba.dubbo.rpc.filter.ContextFilter
 9consumercontext=com.alibaba.dubbo.rpc.filter.ConsumerContextFilter
10exception=com.alibaba.dubbo.rpc.filter.ExceptionFilter
11executelimit=com.alibaba.dubbo.rpc.filter.ExecuteLimitFilter
12deprecated=com.alibaba.dubbo.rpc.filter.DeprecatedFilter
13compatible=com.alibaba.dubbo.rpc.filter.CompatibleFilter
14timeout=com.alibaba.dubbo.rpc.filter.TimeoutFilter

当然这些过滤器并不全是服务提供者端的,每个过滤器通过@Activate注解来定义属于服务端还是消费端。

Dubbo还为服务暴露(export)和服务引用(reference)提供了事件回调通知,其实现与Filter类似,其实现类为:com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper,其事件只是在服务expoert、或服务reference时执行,与调用执行的Filter不一样,因为是针对服务暴露与服务引用的事件。

DubboCodec2(解码器) 》 事件派发模型handler[AllChannelHandler] 》 DecodeHandler 》 HeaderExchangeHandler 》 DubboProtocol#requestHandler(最终的业务Handler) 》Filter Chain 》AbstractProxyInvoker(JavassistProxyFactory#getInvoker),通过反射进行服务端方法调用。

从消费端视角看Invoker

从消费者的视角看Invoker,即我们常说的服务调用器,结合集群容错功能的Invoker,服务调用的门面(统一封装),例如DubboInvoker,根据dubbo协议,封装服务调用请求并通过网络向服务器发送请求包。其主要子类为AbstractInvoker、AbstractClusterInvoker(集群模式)。消费端服务发起请求时,同样会被ProtocolFilterWrapper拦截,引入调用链(获取消费端的Filter Chain)。关于消费端的Invoker,请大家参考如下两篇博文:   Dubbo集群容错模式   Dubbo Invoker概述   Dubbo消费者启动流程

综上所述,服务端的Invoker职责通过反射机制,根据服务名、方法名、参数调用方法完成服务端的响应,其类主要为com.alibaba.dubbo.rpc.proxy.AbstractProxyInvoker,客户端的Invoker职责主要是按照协议组织请求包,通过网络发送服务调用请求,其代表为AbstractInvoker、AbstractClusterInvoker。

服务调用的整体流程就介绍到这里了,从下一篇开始,将重点分析Dubbo Filter机制。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-03-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 中间件兴趣圈 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Dubbo服务调用流程
    • NettyServer#doOpen
      • DubboProtocol#createServer
        • HeaderExchanger#bind
          • NettyTransporter#bind
            • NettyServer构造函数
              • requestHandler#recive
                • requestHandler#reply
                • 再谈Dubbo Invoker
                  • 服务提供者视角看Invoker
                    • ProtocolFilterWrapper#export
                    • 从消费端视角看Invoker
                    相关产品与服务
                    微服务引擎 TSE
                    微服务引擎(Tencent Cloud Service Engine)提供开箱即用的云上全场景微服务解决方案。支持开源增强的云原生注册配置中心(Zookeeper、Nacos 和 Apollo),北极星网格(腾讯自研并开源的 PolarisMesh)、云原生 API 网关(Kong)以及微服务应用托管的弹性微服务平台。微服务引擎完全兼容开源版本的使用方式,在功能、可用性和可运维性等多个方面进行增强。
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档