对于dubbo的provider和consumer,通过不同的配置项会进行不同的操作,provider会对应创建ExchangeServer,而consumer端对应创建ExchangeClient。而根据实现的不同server和client也用对应的方式工作,比如dubbo默认使用的实现是nettyServer和nettyClient。这一切加载流程是如何工作的呢?这个问题就是本文要解决的重点。
一般配置项:
provider:
<dubbo:application name="appA" />
<!--使用zookeeper注册中心暴露服务地址 --><dubbo:registry address="${zookeeper.url}" ></dubbo:registry><dubbo:protocol name="dubbo" port="-1" />
<dubbo:provider filter="MDCFilter,DubboExceptionFilter,-exception" delay="-1" timeout="7000" retries="0" /><!-- 让监控中心进行统计 --><dubbo:monitor protocol="registry" />
<!-- 使用注解方式暴露接口 --> <dubbo:annotation package="com.user"/>
consumer: <dubbo:application name="appB"></dubbo:application><!-- 注意这里的traceFilterc必须放在HystrixFilter前--><dubbo:consumer filter="HystrixFilter,MDCFilter,DubboExceptionFilter,-exception" timeout="10000" check="false"/>
<!-- 使用注解方式暴露接口 缺损package时,默认扫描全包 --><dubbo:annotation/>
<dubbo:registry address="${zookeeper.url}"></dubbo:registry>
provider对应的是com.alibaba.dubbo.config.ProviderConfig中的属性值; consumer对应的是com.alibaba.dubbo.config.ConsumerConfig中的属性值。
解析部分com.alibaba.dubbo.config.spring.schema.DubboBeanDefinitionParser#parse(org.w3c.dom.Element, org.springframework.beans.factory.xml.ParserContext, java.lang.Class, boolean):
从上面代码来看,整个流程就相当清晰了,DubboNamespaceHandler中加载的顺序是(这里只关注provider、consumer和service)先ProviderConfig再ConsumerConfig,再ProtocolConfig,之后才是ServiceBean。在解析完provider,consumer和protocol之后还会把这些节点当成serviceBean继续进行相应的解析,也就是说真正的解析过程都是在serviceBean中。包括provider和consumer中配置的filter属性也是会当做ServiceBean进行解析,解析时对应的protocol是:
在provider、consumer、registry、filter等作为ServiceBean来解析和进行相应的操作时,对应的ServiceBean的protocol属性是不一样的。这里需要提一点的是filter对应的是ProtocolFilterWrapper,而Consumer和Provider对应的是DubboProtocol
需要注意的是com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper中包裹的属性private final Protocol protocol;在这里对应的是DubboProtocol。
private void doExportUrls() { List<URL> registryURLs = loadRegistries(true); for (ProtocolConfig protocolConfig : protocols) {//使用的协议,这里以dubbo协议为例 doExportUrlsFor1Protocol(protocolConfig, registryURLs); } }
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { ... String scope = url.getParameter(Constants.SCOPE_KEY); //配置为none不暴露 if (! Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
//配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务) if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) { //本地 exportLocal(url); } //如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露远程服务) if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){ if (logger.isInfoEnabled()) { logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); } if (registryURLs != null && registryURLs.size() > 0 && url.getParameter("register", true)) { for (URL registryURL : registryURLs) { url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic")); URL monitorUrl = loadMonitor(registryURL); if (monitorUrl != null) { url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString()); } if (logger.isInfoEnabled()) { logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL); } //暴露时先获取invoker Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
Exporter<?> exporter = protocol.export(invoker); exporters.add(exporter); } } else { //暴露时先获取invoker Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
Exporter<?> exporter = protocol.export(invoker); exporters.add(exporter); } } } this.urls.add(url);
}
exportLocal方法:private void exportLocal(URL url) { if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) { URL local = URL.valueOf(url.toFullString()) .setProtocol(Constants.LOCAL_PROTOCOL) .setHost(NetUtils.LOCALHOST) .setPort(0);
// modified by lishen ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref)); //暴露时先获取invoker Exporter<?> exporter = protocol.export( proxyFactory.getInvoker(ref, (Class) interfaceClass, local)); exporters.add(exporter); logger.info("Export dubbo service " + interfaceClass.getName() +" to local registry"); } }
在进行服务暴露时,先获取invoker,然后获取exporter,并将exporter添加到ServiceConfig的List<exporter</exporter
关于暴露url成Service部分这里不去深究了,这里dubbo会把每个url暴露成服务(Service),在dubbo控制台上看时就可以看到每个服务对应一大串的url。
这里我们主要看下JdkProxyFactory:
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker)); }
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { Method method = proxy.getClass().getMethod(methodName, parameterTypes); return method.invoke(proxy, arguments); } }; }
主要用于生成接口实现类的代理对象,并生成一个携带doInvoke方法的AbstractProxyInvoker。
可见它包括创建Client和创建Server的相关方法,其中它的export方法如下:
图中标红的部分为openServer方法,也就是provider创建server的部分:
private void openServer(URL url) { // find server. String key = url.getAddress(); //client 也可以暴露一个只有server可以调用的服务。 boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true); if (isServer) { ExchangeServer server = serverMap.get(key); if (server == null) { serverMap.put(key, createServer(url)); } else { //server支持reset,配合override功能使用 server.reset(url); } } }
private ExchangeServer createServer(URL url) { //默认开启server关闭时发送readonly事件 url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()); //默认开启heartbeat url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); //public static final String DEFAULT_REMOTING_SERVER = "netty"; //这里默认使用的是netty String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); //这里默认使用的是NettyTransporter if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) throw new RpcException("Unsupported server type: " + str + ", url: " + url);
url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME); ExchangeServer server; try { server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } str = url.getParameter(Constants.CLIENT_KEY); if (str != null && str.length() > 0) { Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); if (!supportedTypes.contains(str)) { throw new RpcException("Unsupported client type: " + str); } } return server; }
try { server = Exchangers.bind(url, requestHandler);} catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);}
com.alibaba.dubbo.remoting.exchange.Exchangers#bind(com.alibaba.dubbo.common.URL, com.alibaba.dubbo.remoting.exchange.ExchangeHandler):
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handler == null) { throw new IllegalArgumentException("handler == null"); } url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); return getExchanger(url).bind(url, handler);}
public static Exchanger getExchanger(URL url) { //com.alibaba.dubbo.common.Constants#DEFAULT_EXCHANGER // public static final String DEFAULT_EXCHANGER = "header"; // 默认用的echanger是header String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER); return getExchanger(type);}
public static Exchanger getExchanger(String type) { //这里是加载Exchanger的spi拓展点信息 return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);}
加载Exchanger并通过type进行匹配,默认是使用的是HeaderExchanger
最终getExchanger(url).bind(url, handler)对应的方法是:
最终是将传入的ExchangeHandler装饰成HeaderExchangeHandler,然后将HeaderExchangeHandler装饰成DecodeHandler。
这里可以看到,server也是装饰器模式,最终工作的是NettyServer。到这里Server的初始化流程也就结束了。
文章开头部分我们知道com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper中包裹的属性private final Protocol protocol;在这里对应的是DubboProtocol。com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper#refer方法:
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { return protocol.refer(type, url); } return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER); }
可以看到它调用了protocol.refer方法,那么com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol#refer方法如下:
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
// modified by lishen optimizeSerialization(url);
// create rpc invoker. DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers); invokers.add(invoker); return invoker;}
在这里面会调用com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol#getClients方法:
private ExchangeClient[] getClients(URL url){ //是否共享连接 boolean service_share_connect = false; int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0); //如果connections不配置,则共享连接,否则每服务每连接 if (connections == 0){ service_share_connect = true; connections = 1; }
ExchangeClient[] clients = new ExchangeClient[connections]; for (int i = 0; i < clients.length; i++) { if (service_share_connect){ clients[i] = getSharedClient(url); } else { clients[i] = initClient(url); } } return clients;}
继续看initClient:
接下来看client = Exchangers.connect(url ,requestHandler);的实现:
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handler == null) { throw new IllegalArgumentException("handler == null"); } url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); return getExchanger(url).connect(url, handler);}
其中getExchanger方法部分与server部分相同,我们看看它的connect方法的实现:
这里最后返回的是一个HeaderExchangeClient,它的内部装饰的是NettyClient。到这里client端的初始化也全部结束了。
本文以DubboNamespaceHandler为入口对Server和Client的初始化以及初始化过程中涉及到的Proxy、Transporter和Exchanger进行了一个流程梳理。