前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >dubbo服务暴露过程源码分析

dubbo服务暴露过程源码分析

作者头像
技术蓝海
发布2018-04-26 13:42:51
2K0
发布2018-04-26 13:42:51
举报
文章被收录于专栏:wannshan(javaer,RPC)wannshan(javaer,RPC)

本例以一个简单典型的服务发布为例,spring配置如下

代码语言:javascript
复制
  //dubbo协议
 <dubbo:protocol name="dubbo" port="20880" id="dubbo1"/>
   //zk注册中心
 <dubbo:registry id="hangzhouRegistry" address="zookeeper://192.168.64.128:2181"/>
 <dubbo:service  interface="demo.dubbo.api.DemoService" ref="demoService" protocol="dubbo1" />
代码语言:javascript
复制
//服务接口
public interface DemoService {

 public String sayHello(String name);

}
//实现
public class DemoServiceImpl implements DemoService {

public String sayHello(String name) {

Random random=new Random();
try {
    Thread.sleep(800* random.nextInt(6));
} catch (InterruptedException e) {
    e.printStackTrace();
}
System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "] Hello " + name + ", request from consumer: " + RpcContext.getContext().getRemoteAddress());
return "Hello " + name + ", response form provider: " + RpcContext.getContext().getLocalAddress();
}
}

有博文 dubbo基于spring的构建分析,可以看到dubbo服务发布注册的启动方法有两个入口,

第一在ServiceBean类的onApplicationEvent()方法,在容器初始化完成后,执行暴露逻辑

第二在ServiceBean类的afterPropertiesSet()方法,当前servicebean属性构造完成后,执行暴露逻辑

具体暴露方法在其父类ServiceConfig的ServiceConfig方法里

代码语言:javascript
复制
  //这是个同步的方法
    public synchronized void export() {
        if (provider != null) {
            if (export == null) {
                export = provider.getExport();
            }
            if (delay == null) {
                delay = provider.getDelay();
            }
        }
        if (export != null && !export) {
            return;
        }

        //延迟暴露 通过delay参数配置的,延迟暴露,放入单独线程中。
        if (delay != null && delay > 0) {
            delayExportExecutor.schedule(new Runnable() {
                public void run() {
                    doExport();
                }
            }, delay, TimeUnit.MILLISECONDS);
        } else {
            //暴露方法
            doExport();
        }
    }
代码语言:javascript
复制
/***
     * 也是个同步的方法,暴露过程具体过程
     */
    protected synchronized void doExport() {
        //....属性检查和赋值,代码略

        //暴露过程
        doExportUrls();
    }

     private void doExportUrls() {
        //获取注册中心信息
        List<URL> registryURLs = loadRegistries(true);
        for (ProtocolConfig protocolConfig : protocols) {
            //多个协议,暴露多次
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }

     //暴露过程
    private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
        //属性的解析和赋值.....
	//..... 代码略.....

        //获取服务暴露范围,本地或者远程
        String scope = url.getParameter(Constants.SCOPE_KEY);
        //配置为none不暴露
        //不配默认,服务暴露远程同时在本地暴露
        if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {

            //配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务)
            if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
                //本地暴露 (***看这里***)关键1
                exportLocal(url);
            }
            //如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露本地服务)
            if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
                if (logger.isInfoEnabled()) {
                    logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                }
                if (registryURLs != null && registryURLs.size() > 0
                        && url.getParameter("register", true)) {
                    //有多个注册中心,暴露到多个注册中心
                    for (URL registryURL : registryURLs) {
		        //url 添加dynamic 属性值
                        url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
                        URL monitorUrl = loadMonitor(registryURL);
                        if (monitorUrl != null) {
                            url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                        }
                        if (logger.isInfoEnabled()) {
                            logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                        }
			//(***看这里***)关键2
                        //默认走到JavassistProxyFactory.getInvoker方法
                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                        //这里的invoker的url的协议是register类型
                        Exporter<?> exporter = protocol.export(invoker);
                        exporters.add(exporter);
                    }
                } else {
                    //没有注册中心 ,只在本机ip打开服务端口,生成服务代理,并不注册到注册中心。
                    //(***看这里***)关键3 此处的url协议为dubbo
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
                    Exporter<?> exporter = protocol.export(invoker);
                    exporters.add(exporter);
                }
            }
        }
        this.urls.add(url);
    }

上面方法中有三个地方涉及到具体的服务暴露先看,关键1

本地暴露的逻辑

代码语言:javascript
复制
    private void exportLocal(URL url) {
        if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
            URL local = URL.valueOf(url.toFullString())
                    .setProtocol(Constants.LOCAL_PROTOCOL)//设置为injvm 协议
                    .setHost(NetUtils.LOCALHOST)
                    .setPort(0);
            //这里的protocol是Protocol$Adpative的实例(spi机制)
        //proxyFactory是ProxyFactory$Adpative实例(spi机制)
            Exporter<?> exporter = protocol.export(
                    proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
            //放到暴露列表
            exporters.add(exporter);
            logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");
        }
    }

ProxyFactory$Adpative的getInvoker方法

代码语言:javascript
复制
public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg2 == null) 
            throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg2;
        String extName = url.getParameter("proxy", "javassist");
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
        com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
        //这里的extension默认是JavassistProxyFactory实例,
	return extension.getInvoker(arg0, arg1, arg2);
    }

JavassistProxyFactory的getInvoker方法

代码语言:javascript
复制
 //proxy 是服务实现类,type是服务接口
 public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
	//利用Wrapper类通过服务接口生成对应的代理类。
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        //实现抽象类AbstractProxyInvoker抽象方法doInvoke,并调用(proxy, type, url)构造函数实例化匿名类
	return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                //这里调用代理类的invokeMethod方法
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

AbstractProxyInvoker的构造方法

代码语言:javascript
复制
    public AbstractProxyInvoker(T proxy, Class<T> type, URL url) {
        if (proxy == null) {
            throw new IllegalArgumentException("proxy == null");
        }
        if (type == null) {
            throw new IllegalArgumentException("interface == null");
        }
        if (!type.isInstance(proxy)) {
            throw new IllegalArgumentException(proxy.getClass().getName() + " not implement interface " + type);
        }
        this.proxy = proxy;
        this.type = type;
        this.url = url;//赋值ur到自身(invoker),这个后面用到
    }

AbstractProxyInvoker的invoke方法

代码语言:javascript
复制
    public Result invoke(Invocation invocation) throws RpcException {
        try {
	   //回调用doInvoke方法,会传入执行实例proxy,方法名和方法参数类型和值
            return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));
        } catch (InvocationTargetException e) {
            return new RpcResult(e.getTargetException());
        } catch (Throwable e) {
            throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

这里以文章开头DemoService接口为例通过Wrapper.getWrapper返回的类代码,这里需要代码hack

代码语言:javascript
复制
package com.alibaba.dubbo.common.bytecode;

import com.alibaba.dubbo.common.DemoService;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;

public class Wrapper0 extends Wrapper
  implements ClassGenerator.DC
{
  public static String[] pns;
  public static Map pts;
  public static String[] mns;
  public static String[] dmns;
  public static Class[] mts0;

  public String[] getPropertyNames()
  {
    return pns;
  }

  public boolean hasProperty(String paramString)
  {
    return pts.containsKey(paramString);
  }

  public Class getPropertyType(String paramString)
  {
    return (Class)pts.get(paramString);
  }

  public String[] getMethodNames()
  {
    return mns;
  }

  public String[] getDeclaredMethodNames()
  {
    return dmns;
  }

  public void setPropertyValue(Object paramObject1, String paramString, Object paramObject2)
  {
    try
    {
      DemoService localDemoService = (DemoService)paramObject1;
    }
    catch (Throwable localThrowable)
    {
      throw new IllegalArgumentException(localThrowable);
    }
    throw new NoSuchPropertyException("Not found property \"" + paramString + "\" filed or setter method in class com.alibaba.dubbo.common.DemoService.");
  }

  public Object getPropertyValue(Object paramObject, String paramString)
  {
    try
    {
      DemoService localDemoService = (DemoService)paramObject;
    }
    catch (Throwable localThrowable)
    {
      throw new IllegalArgumentException(localThrowable);
    }
    throw new NoSuchPropertyException("Not found property \"" + paramString + "\" filed or setter method in class com.alibaba.dubbo.common.DemoService.");
  }
   
  //(**看这里,关键方法实现****)
  public Object invokeMethod(Object paramObject, String paramString, Class[] paramArrayOfClass, Object[] paramArrayOfObject)
    throws InvocationTargetException
  {
    DemoService localDemoService;
    try
    {
    //赋值执行实例,这里是接口实现类,DemoServiceImpl对象
      localDemoService = (DemoService)paramObject;
    }
    catch (Throwable localThrowable1)
    {
      throw new IllegalArgumentException(localThrowable1);
    }
    try
    {
    //根据传入的要调用的方法名paramString,方法参数值,调用执行实例方法
      if (("sayHello".equals(paramString)) || (paramArrayOfClass.length == 1))
        return localDemoService.sayHello((String)paramArrayOfObject[0]);
    }
    catch (Throwable localThrowable2)
    {
      throw new InvocationTargetException(localThrowable2);
    }
    throw new NoSuchMethodException("Not found method \"" + paramString + "\" in class com.alibaba.dubbo.common.DemoService.");
  }
}

到这比较清楚了解,具体的代理过程了。

第一步上面invoker对象生成好后,接下来就要通过Protocol$Adpative的export方法暴露服务

代码语言:javascript
复制
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg0 == null)
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null)
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
        com.alibaba.dubbo.common.URL url = arg0.getUrl();
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
	    //根据上文本地调用这里的protocal协议别设置为injvm
	    //所以这里会走到InjvmProtocol的export方法
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.export(arg0);
    }

看下InjvmProtocol的export方法

代码语言:javascript
复制
 public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        //返回InjvmExporter对象
        return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
    }

InjvmExporter构造函数

代码语言:javascript
复制
     InjvmExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
        super(invoker);//invoker赋给自身
        this.key = key;
        this.exporterMap = exporterMap;
	//存的形式,serviceKey:自身(exporter) put到map关联起来,这样可以通过servciekey找到exporterMap然后找到invoker
        exporterMap.put(key, this);
    }

这里的exporterMap是由InjvmProtocol实例拥有,而InjvmProtocol有时单例的,因为InjvmProtocol类有以下实例和方法:

代码语言:javascript
复制
    //静态自身成员变量
    private static InjvmProtocol INSTANCE;
    //构造方法,把自身赋值给INSTANCE对象
    public InjvmProtocol() {
        INSTANCE = this;
    }

所以exporterMap对象也是单例的。同时这里顺便看下InjvmProtocol的refer方法,本地服务的引用查找也是通过自身exporterMap对象。

代码语言:javascript
复制
  public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
        //把exporterMap对象赋值给InjvmInvoker
        return new InjvmInvoker<T>(serviceType, url, url.getServiceKey(), exporterMap);
    }
    //具体查找过程
    public Result doInvoke(Invocation invocation) throws Throwable {
        //通过exporterMap获取exporter
        Exporter<?> exporter = InjvmProtocol.getExporter(exporterMap, getUrl());
        if (exporter == null) {
            throw new RpcException("Service [" + key + "] not found.");
        }
        RpcContext.getContext().setRemoteAddress(NetUtils.LOCALHOST, 0);
        return exporter.getInvoker().invoke(invocation);
    }

以上是本地服务的发布和引用过程

远程服务发布暴露过程

接下来看下本例中会用到的远程服务发布暴露过程,即暴露服务端口并发布服务信息到注册中心。也即是关键2代码处

通过上面本地服务暴露过程分析可以知道,远程服务的态代理生成过程和本地服务代理生成是一样的,唯一区别点是构造invoker是传入的url是registryURL

传入url的不同,会造成下面这句的执行过程的不同

Exporter<?> exporter = protocol.export(invoker);

这里protocol通过spi走的是Protocol$Adpative的export方法:

代码语言:javascript
复制
 public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg0 == null)
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null)
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
        com.alibaba.dubbo.common.URL url = arg0.getUrl();
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        //由于传入的url是registryURL所以会走RegistryProtocol的export方法
	com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.export(arg0);
    }

RegistryProtocol的export方法如下

代码语言:javascript
复制
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        //export invoker 暴露invoker (***看doLocalExport方法**)
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
        //registry provider 获取对应注册中心操作对象
        final Registry registry = getRegistry(originInvoker);
        //获取要注册到注册中心的地址
        final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
        //注册服务url到注册中心(***把服务信息注册到注册中心**)
        registry.register(registedProviderUrl);
        // 订阅override数据
        // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
        //保证每次export都返回一个新的exporter实例
        //实现一个匿名类实现接口Exporter
        return new Exporter<T>() {
            public Invoker<T> getInvoker() {
                return exporter.getInvoker();
            }
            //取消暴露的过程
            public void unexport() {
                try {

                    exporter.unexport();
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
                try {
                    //取消注册
                    registry.unregister(registedProviderUrl);
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
                try {
                    //取消订阅
                    overrideListeners.remove(overrideSubscribeUrl);
                    registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
            }
        };
    }

doLocalExport方法,这里方法里涉及过程比较多

代码语言:javascript
复制
     private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
        //通过原始originInvoker构造缓存key
        String key = getCacheKey(originInvoker);
        ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
        //缓存没有,走具体暴露逻辑
        if (exporter == null) {
            synchronized (bounds) {
                exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
                if (exporter == null) {
                    //InvokerDelegete是RegistryProtocol类的静态内部类,继承自InvokerWrapper,
		    //通过构造器赋值持有代理originInvoker和服务暴露协议url对象,算是包装一层
                    //而url 是通过getProviderUrl(originInvoker)返回的,此时url的协议已是dubbo,即服务暴露的协议
                    final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));

                    //ExporterChangeableWrapper是RegistryProtocol的私有内部类实现了Exporter接口。
                    //通过调用它的构造方法(Exporter<T> exporter, Invoker<T> originInvoker)构造exporterWrapper实例
		    //而这里传入的exporter是通过(Exporter<T>) protocol.export(invokerDelegete)语句创建
		    //由上一步知道,这里的invokerDelegete里url属性的protocol协议已经是dubbo
                    //下面具体看下protocol.export(invokerDelegete)方法。
                    exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
                    bounds.put(key, exporter);
                }
            }
        }
        return exporter;
    }

还对Protocol$Adpative类的export逻辑分析

代码语言:javascript
复制
    public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg0 == null)
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null)
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
        com.alibaba.dubbo.common.URL url = arg0.getUrl();
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        //由于这里invokerDelegete的url属性的protocol是dubbo,所以这里走DubboProtocol的export协议
	com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.export(arg0);
    }

 接着看下DubboProtocol的export方法

代码语言:javascript
复制
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();
        // get serviceKey.
        String key = serviceKey(url);//key的组成group/service:version:port
        //构造服务的exporter
        //如同InjvmProtocol一样,DubboProtocol也是单例的 所以这里exporterMap也是单例的
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        //通过key放入exporterMap,把持有invoker的exporter 和serviceKey关联
        //这个在后面服务调用时,可以通过key找到对应的exporter进而找到invoker提供服务
        exporterMap.put(key, exporter);

        //export an stub service for dispaching event
        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }
            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }
        //根据url开启一个服务,比如绑定端口,开始接受请求信息(**继续看这里***)
        openServer(url);

        return exporter;
    }
代码语言:javascript
复制
 private void openServer(URL url) {
        //key=host:port 用于定位server
        String key = url.getAddress();
        //client也可以暴露一个只有server可以调用的服务。
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
        if (isServer) {
            //服务实例放到serverMap,key是host:port
            //这里的serverMap也单例的
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                //通过createServer(url)方法获取server (***看这里***)
                serverMap.put(key, createServer(url));
            } else {
                //server支持reset,配合override功能使用
                server.reset(url);
            }
        }
    }

     /***
     * 开启服务
     * @param url
     * @return
     */
    private ExchangeServer createServer(URL url) {
        //默认开启server关闭时发送readonly事件
        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
        //默认开启heartbeat
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
        /***
         * 通过server key 检查是否是dubbo目前spi扩展支持的传输框架。默认是netty
         */
        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);

         //通过codec key 获取编解码方案,兼容dubbo1,默认是dubbo1compatible ,否则默认dubbo 编解码方案
        url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
        ExchangeServer server;
        try {
            //构造具体服务实例,
	    //Exchangers是门面类,里面封装了具体交换层实现,并调用它的bind方法(***看这里***)
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }

        //这里会验证一下,客户端传输层实现
        //如果没有对应的实现,会抛出异常
        str = url.getParameter(Constants.CLIENT_KEY);
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }
        return server;
    }

跟踪方法

代码语言:javascript
复制
//Exchangers.bind方法
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
	//通过spi会走HeaderExchanger的bind逻辑
        return getExchanger(url).bind(url, handler);
    }

    //HeaderExchanger的bind方法

    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        //Transporters也是门面类,封装了具体的传输层实现查找和bind过程
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

    //HeaderExchangeServer的构造函数
      public HeaderExchangeServer(Server server) {
        if (server == null) {
            throw new IllegalArgumentException("server == null");
        }
        this.server = server;
        this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
        this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
        if (heartbeatTimeout < heartbeat * 2) {
            throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
        }
        //开启心跳
        startHeatbeatTimer();
    }

  继续跟进方法

代码语言:javascript
复制
    //Transporters.bind方法
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handlers == null || handlers.length == 0) {
            throw new IllegalArgumentException("handlers == null");
        }
        ChannelHandler handler;
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
	//根据spi 这里具体走 NettyTransporter.bind
        return getTransporter().bind(url, handler);
    }

    //NettyTransporter的bind方法

    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
       //可以看到这里是NettyServer实例
        return new NettyServer(url, listener);
    }
   //NettyServer构造器

    public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
       //调用父类AbstractServer构造器
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }

   父类AbstractServer的构造函数

代码语言:javascript
复制
    public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, handler);
        localAddress = getUrl().toInetSocketAddress();
        String host = url.getParameter(Constants.ANYHOST_KEY, false)
                || NetUtils.isInvalidLocalHost(getUrl().getHost())
                ? NetUtils.ANYHOST : getUrl().getHost();
        bindAddress = new InetSocketAddress(host, getUrl().getPort());
        this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
        this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
        try {
            //打开端口,启动服务,在其子类实现,这里是NettyServer的doOpen()方法(***看这里**)
            doOpen();
            if (logger.isInfoEnabled()) {
                logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
            }
        } catch (Throwable t) {
            throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
                    + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
        }
        //fixme replace this with better method
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
    }

NettyServer的doOpen()方法:

代码语言:javascript
复制
    protected void doOpen() throws Throwable {
        //通过netty 开启服务监听端口
        NettyHelper.setNettyLoggerFactory();
        ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
        ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
        ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
        bootstrap = new ServerBootstrap(channelFactory);

        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
        channels = nettyHandler.getChannels();
        // https://issues.jboss.org/browse/NETTY-365
        // https://issues.jboss.org/browse/NETTY-379
        // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                ChannelPipeline pipeline = Channels.pipeline();
                /*int idleTimeout = getIdleTimeout();
                if (idleTimeout > 10000) {
                    pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
                }*/
                pipeline.addLast("decoder", adapter.getDecoder());//解码器
                pipeline.addLast("encoder", adapter.getEncoder());//编码器
                pipeline.addLast("handler", nettyHandler);//NettyHandler 扩展netty双向handler基类r 可以接受进站和出站数据流
                return pipeline;
            }
        });
        // bind 地址 开启端口
        channel = bootstrap.bind(getBindAddress());
    }

    以上基本梳理了的dubbo从service配置解析到生成服务代理,并通过netty开启服务端口的服务暴露过程。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 本地暴露的逻辑
  • 远程服务发布暴露过程
相关产品与服务
微服务引擎 TSE
微服务引擎(Tencent Cloud Service Engine)提供开箱即用的云上全场景微服务解决方案。支持开源增强的云原生注册配置中心(Zookeeper、Nacos 和 Apollo),北极星网格(腾讯自研并开源的 PolarisMesh)、云原生 API 网关(Kong)以及微服务应用托管的弹性微服务平台。微服务引擎完全兼容开源版本的使用方式,在功能、可用性和可运维性等多个方面进行增强。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档