前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >微服务架构之服务框架Dubbo-服务暴露

微服务架构之服务框架Dubbo-服务暴露

作者头像
公众号_松华说
发布2019-07-16 11:39:00
1.7K0
发布2019-07-16 11:39:00
举报
文章被收录于专栏:松华说松华说

注:文章代码挺多,电脑打开www.liangsonghua.me观看效果更佳

上篇文章说到ServiceBean监听了ContextRefreshedEvent然后export服务,我们接着谈这个话题

代码语言:javascript
复制
private static final ScheduledExecutorService delayExportExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("DubboServiceDelayExporter", true));

 public synchronized void export() {    
          checkAndUpdateSubConfigs();
        if (shouldDelay()) {
            delayExportExecutor.schedule(this::doExport, delay, TimeUnit.MILLISECONDS);
        } else {
            doExport();
        }
    }

Dubbo提供了延迟暴露服务的特性,这个功能对初始化服务非常友好,建议开启预热。

先看下ServiceConfig#checkAndUpdateSubConfigs,第一眼看到是check开头以为只是校验功能的

代码语言:javascript
复制
public void checkAndUpdateSubConfigs() {
    // Use default configs defined explicitly on global configs
    completeCompoundConfigs();
    // Config Center should always being started first.
    startConfigCenter();
    checkDefault();
    checkApplication();
    checkRegistry();
    checkProtocol();
    this.refresh();
    checkMetadataReport();
}

AbstractInterfaceConfig#startConfigCenter

代码语言:javascript
复制
void startConfigCenter() {
        if (configCenter == null) {
            ConfigManager.getInstance().getConfigCenter().ifPresent(cc -> this.configCenter = cc);
        }

        if (this.configCenter != null) {
            // TODO there may have duplicate refresh
            this.configCenter.refresh();
            prepareEnvironment();
        }
        //刷新所有的配置,通过set方法注入相关属性
        //getApplication().ifPresent(ApplicationConfig::refresh);
        //java8的Optional#ifPresent()如果实例非空则调用Comsumer Lambda 表达式
        ConfigManager.getInstance().refreshAll();
    }

AbstractInterfaceConfig#prepareEnvironment

代码语言:javascript
复制
private void prepareEnvironment() {
        if (configCenter.isValid()) {
            if (!configCenter.checkOrUpdateInited()) {
                return;
            }
            DynamicConfiguration dynamicConfiguration = getDynamicConfiguration(configCenter.toUrl());
            String configContent = dynamicConfiguration.getConfig(configCenter.getConfigFile(), 
            try {
                Environment.getInstance().setConfigCenterFirst(configCenter.isHighestPriority());
                Environment.getInstance().updateExternalConfigurationMap(parseProperties(configContent));
                Environment.getInstance().updateAppExternalConfigurationMap(parseProperties(appConfigContent));
            } catch (IOException e) {
                throw new IllegalStateException("Failed to parse configurations from Config Center.", e);
            }
        }
    }

AbstractInterfaceConfig#getDynamicConfiguration,Dubbo默认可以支持apollo、consul、etcd、zookeeper作为配置中心

代码语言:javascript
复制
private DynamicConfiguration getDynamicConfiguration(URL url) {
        DynamicConfigurationFactory factories = ExtensionLoader
                .getExtensionLoader(DynamicConfigurationFactory.class)
                .getExtension(url.getProtocol());
        DynamicConfiguration configuration = factories.getDynamicConfiguration(url);
        Environment.getInstance().setDynamicConfiguration(configuration);
        return configuration;
    } 
代码语言:javascript
复制

ZookeeperDynamicConfigurationFactory#createDynamicConfiguration

代码语言:javascript
复制
@Override
    protected DynamicConfiguration createDynamicConfiguration(URL url) {
        return new ZookeeperDynamicConfiguration(url, zookeeperTransporter);
    }

ZookeeperDynamicConfiguration#ZookeeperDynamicConfiguration,CountDownLatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程的操作执行完后再执行,这里需要等待的线程数只有一个,另外一个常常被相提并论的CyclicBarrier,它让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活

代码语言:javascript
复制
ZookeeperDynamicConfiguration(URL url, ZookeeperTransporter zookeeperTransporter) {
         //zookeeper://127.0.0.1:2181/ConfigCenterConfig?address=zookeeper://127.0.0.1:2181&check=true&configFile=dubbo.properties&group=dubbo&highestPriority=false&namespace=dubbo&prefix=dubbo.config-center&timeout=3000&valid=true
        this.url = url;
        rootPath = "/" + url.getParameter(CONFIG_NAMESPACE_KEY, DEFAULT_GROUP) + "/config";
        initializedLatch = new CountDownLatch(1);
        this.cacheListener = new CacheListener(rootPath, initializedLatch);
        this.executor = Executors.newFixedThreadPool(1, new NamedThreadFactory(this.getClass().getSimpleName(), true));
        zkClient = zookeeperTransporter.connect(url);
        zkClient.addDataListener(rootPath, cacheListener, executor);
        try {
            // Wait for connection
            this.initializedLatch.await();
        } catch (InterruptedException e) {
            logger.warn("Failed to build local cache for config center (zookeeper)." + url);
        }
    }

Dubbo的Zookeeper客户端使用的是Curator,Curator是Netflix公司开源的一套Zookeeper客户端框架,解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等等

代码语言:javascript
复制
CuratorZookeeperClient#CuratorZookeeperClient
 public CuratorZookeeperClient(URL url) {
        super(url);
        try {
            int timeout = url.getParameter(Constants.TIMEOUT_KEY, 5000);
            CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
                    .connectString(url.getBackupAddress())
                    .retryPolicy(new RetryNTimes(1, 1000))//重试策略
                    .connectionTimeoutMs(timeout);
            //授权认证
            String authority = url.getAuthority();
            if (authority != null && authority.length() > 0) {
                builder = builder.authorization("digest", authority.getBytes());
            }
            client = builder.build();
            //连接状态变更通知
            client.getConnectionStateListenable().addListener(new ConnectionStateListener() {

                @Override
                public void stateChanged(CuratorFramework client, ConnectionState state) {
                    if (state == ConnectionState.LOST) {
                        CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED);
                    } else if (state == ConnectionState.CONNECTED) {
                        CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED);
                    } else if (state == ConnectionState.RECONNECTED) {
                        CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED);
                    }
                }
            });
            client.start();
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }

回到服务暴露入口

代码语言:javascript
复制
private void doExportUrls() {
        //加载注册中心列表
        List<URL> registryURLs = loadRegistries(true);
        //逐一向注册中心分组暴露服务
        for (ProtocolConfig protocolConfig : protocols) {
            //org.apache.dubbo.demo.DemoService
            String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
            ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
            ApplicationModel.initProviderModel(pathKey, providerModel);
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
    }
}

ServiceConfig#doExportUrlsFor1Protocol

代码语言:javascript
复制
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {

         //本地scope
          if (!Constants.SCOPE_REMOTE.equalsIgnoreCase(scope)) {

                exportLocal(url);
            }
           //远程scope
          if (!Constants.SCOPE_LOCAL.equalsIgnoreCase(scope)) {
                 Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                Exporter<?> exporter = protocol.export(wrapperInvoker);
                exporters.add(exporter);
          }
    }

本地暴露,使用proxyFactory#getInvoker创建服务代理切入点也就是invoker对象,然后protocol#export暴露服务

代码语言:javascript
复制
private void exportLocal(URL url) {
        if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
                //injvm://127.0.0.1/....
            URL local = URLBuilder.from(url)
                    .setProtocol(Constants.LOCAL_PROTOCOL)
                    .setHost(LOCALHOST_VALUE)
                    .setPort(0)
                    .build();
            Exporter<?> exporter = protocol.export(
                    proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
            exporters.add(exporter);
        }
    }

ProxyFactory通过ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension()获得

如果扩展加载器getExtension找不到就创建(注:Dubbo使用了大量的双重检测机制),这里的injectExtension是通过set方法注入扩展加载器的依赖。注:原始JDK SPI不支持IOC注入,Dubbo SPI支持Set方法和构造方法注入,wrapperClass.getConstructor(type).newInstance(instance)

代码语言:javascript
复制
@SuppressWarnings("unchecked")
    private T createExtension(String name) {
        Class<?> clazz = getExtensionClasses().get(name);
        if (clazz == null) {
            throw findException(name);
        }
        try {
            T instance = (T) EXTENSION_INSTANCES.get(clazz);
            if (instance == null) {
                EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
                instance = (T) EXTENSION_INSTANCES.get(clazz);
            }
            injectExtension(instance);
            Set<Class<?>> wrapperClasses = cachedWrapperClasses;
            if (CollectionUtils.isNotEmpty(wrapperClasses)) {
                for (Class<?> wrapperClass : wrapperClasses) {
                    instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
                }
            }
            return instance;
        } catch (Throwable t) {
            throw new IllegalStateException("Extension instance (name: " + name + ", class: " +
                    type + ") couldn't be instantiated: " + t.getMessage(), t);
        }
    }

如果自适应扩展找不到就创建。注:原始JDK SPI要用循环判断对象,Dubbo自己设计的可以通过getAdaptiveExtension灵活获取对象

代码语言:javascript
复制
private T createAdaptiveExtension() {
        try {
            return injectExtension((T) getAdaptiveExtensionClass().newInstance());
        } catch (Exception e) {
            throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage(), e);
        }
    }

getAdaptiveExtensionClass方法是生成code然后编译

代码语言:javascript
复制
private Class<?> createAdaptiveExtensionClass() {
        String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();
        //取得该Class对象的类装载器
        ClassLoader classLoader = findClassLoader();
        org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
        return compiler.compile(code, classLoader);
    }

code长下面这个样子,重点关注extension。ProtocolFactory$Adaptive的code就不显示了

代码语言:javascript
复制
代码语言:javascript
复制
package org.apache.dubbo.rpc;
import org.apache.dubbo.common.extension.ExtensionLoader;
public class ProxyFactory$Adaptive implements  org.apache.dubbo.rpc.ProxyFactory {

    public java.lang.Object getProxy(org.apache.dubbo.rpc.Invoker arg0, boolean arg1) throws org.apache.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
        org.apache.dubbo.common.URL url = arg0.getUrl();
        String extName = url.getParameter("proxy", "javassist");
        if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])");
        org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName);
        return extension.getProxy(arg0, arg1);
    }
    public org.apache.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, org.apache.dubbo.common.URL arg2) throws org.apache.dubbo.rpc.RpcException {
        if (arg2 == null) throw new IllegalArgumentException("url == null");
        org.apache.dubbo.common.URL url = arg2;
        String extName = url.getParameter("proxy", "javassist");
        if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])");
        org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName);
        return extension.getInvoker(arg0, arg1, arg2);
    }
}

默认类装载器是org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory,另外一种是org.apache.dubbo.rpc.proxy.jdk.JdkProxyFactory

代码语言:javascript
复制
public class JavassistProxyFactory extends AbstractProxyFactory {

    @Override
    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }

    @Override
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

}

客户端调用(消费时)会用到getProxy方法获取代理对象,其中InvokerInvocationHandler#invoker中会过滤方法,toString、hashCode、equals方法不走rpc调用,rpc调用走Invoker.invoke(createInvocation(method, args)).recreate()

由于Invoker对象实际和Service实现对象是无法直接调用的,需要先将ref实现类包装成了一个Wrapper,利用字节码增强技术Javassist为Wrapper创建一个实现类,然后Invoker被调用的时候会触发doInvoke()方法,然后调用Wrapper的invokeMethod()方法

跳回到protocol#export的分析,它必须是幂等的,也就是暴露同一个URL的Invoker两次,和暴露一次没有区别,Invoker传入后,根据Invoker.url自动获得对应Protocol拓展实现。export()方法的调用顺序是:Protocol$Adaptive => QosProtocolWrapper => ProtocolListenerWrapper => ProtocolFilterWrapper => InjvmProtocol

ProtocolFilterWrapper#export

代码语言:javascript
复制
@Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        //注册中心export
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }
        //构建执行链后export
        return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
    }

ProtocolFilterWrapper#buildInvokerChain,默认的filters包括EchoFilter、ClassFilter、GenericFilter、ContextFilter、TraceFilter、TimeOutFilter、MonitorFilter、ExceptionFilter

代码语言:javascript
复制
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        Invoker<T> last = invoker;
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
        if (!filters.isEmpty()) {
            for (int i = filters.size() - 1; i >= 0; i--) {
                final Filter filter = filters.get(i);
                final Invoker<T> next = last;
                last = new Invoker<T>() {

                @Override
                public Class<T> getInterface() {
                    return invoker.getInterface();
                }

                @Override
                public URL getUrl() {
                    return invoker.getUrl();
                }

                @Override
                public boolean isAvailable() {
                    return invoker.isAvailable();
                }

                @Override
                public Result invoke(Invocation invocation) throws RpcException {
                    Result result = filter.invoke(next, invocation);
                    if (result instanceof AsyncRpcResult) {
                        AsyncRpcResult asyncResult = (AsyncRpcResult) result;
                        asyncResult.thenApplyWithContext(r -> filter.onResponse(r, invoker, invocation));
                        return asyncResult;
                    } else {
                        return filter.onResponse(result, invoker, invocation);
                    }
                }

                @Override
                public void destroy() {
                    invoker.destroy();
                }

                @Override
                public String toString() {
                    return invoker.toString();
                }
            };
        }
    }
    return last;
}

我们选取限流Filter看下,其他如最小活跃连接数、缓存和异常Filter推荐自行阅读

代码语言:javascript
复制
 @Activate(group = Constants.PROVIDER, value = Constants.TPS_LIMIT_RATE_KEY)
public class TpsLimitFilter implements Filter {

    private final TPSLimiter tpsLimiter = new DefaultTPSLimiter();

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {

        if (!tpsLimiter.isAllowable(invoker.getUrl(), invocation)) {
            throw new RpcException(
                    "Failed to invoke service " +
                            invoker.getInterface().getName() +
                            "." +
                            invocation.getMethodName() +
                            " because exceed max service tps.");
        }

        return invoker.invoke(invocation);
    }

}

StatItem#isAllowable,Dubbo使用的是AtomicInteger。首先根据时间偏差重置窗口,然后每次来一个请求就减少一个许可。业内常见的限流方式有计数器、滑动容器、漏斗、令牌

代码语言:javascript
复制
public boolean isAllowable() {
        long now = System.currentTimeMillis();
        if (now > lastResetTime + interval) {
            token.set(rate);
            lastResetTime = now;
        }

        int value = token.get();
        boolean flag = false;
        while (value > 0 && !flag) {
            flag = token.compareAndSet(value, value - 1);
            value = token.get();
        }

        return flag;
    }

InjvmProtocol#export

代码语言:javascript
复制
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}

当scope为空时,会同时进行本地暴露和远程暴露,远程export流程中我们先看下ProtocolListenerWrapper#startQosServer。在Dubbo中,QoS这个概念被用于动态的对服务进行查询和控制,就是后台控制器角色,例如对获取当前提供和消费的所有服务,以及对服务进行动态的上下线,即从注册中心上进行注册和反注册操作

代码语言:javascript
复制
private void startQosServer(URL url) {
        try {           

            //又看到了AtomicBoolean。在进行完比较和赋值操作之前,不会发生任何其他的操作,CAS锁
            if (!hasStarted.compareAndSet(false, true)) {
                return;
            }
            int port = url.getParameter(QOS_PORT, QosConstants.DEFAULT_PORT);
            boolean acceptForeignIp = Boolean.parseBoolean(url.getParameter(ACCEPT_FOREIGN_IP, "false"));
            Server server = Server.getInstance();
            server.setPort(port);
            server.setAcceptForeignIp(acceptForeignIp);
            server.start();

        } catch (Throwable throwable) {
            logger.warn("Fail to start qos server: ", throwable);
        }
    }

Server#start

代码语言:javascript
复制
public void start() throws Throwable {
        if (!started.compareAndSet(false, true)) {
            return;
        }
        boss = new NioEventLoopGroup(1, new DefaultThreadFactory("qos-boss", true));
        worker = new NioEventLoopGroup(0, new DefaultThreadFactory("qos-worker", true));
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(boss, worker);
        serverBootstrap.channel(NioServerSocketChannel.class);
        serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true);
        serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, true);
        serverBootstrap.childHandler(new ChannelInitializer<Channel>() {

            @Override
            protected void initChannel(Channel ch) throws Exception {
                ch.pipeline().addLast(new QosProcessHandler(welcome, acceptForeignIp));
            }
        });
        try {
            serverBootstrap.bind(port).sync();
            logger.info("qos-server bind localhost:" + port);
        } catch (Throwable throwable) {
            logger.error("qos-server can not bind localhost:" + port, throwable);
            throw throwable;
        }
    }

回到远程暴露export本身,RegistryProtocol#export

代码语言:javascript
复制
public void register(URL registryUrl, URL registeredProviderUrl) {
        Registry registry = registryFactory.getRegistry(registryUrl);
        registry.register(registeredProviderUrl);
    }

    @Override
    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {

        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
         ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
                registryUrl, registeredProviderUrl);
        boolean register = registeredProviderUrl.getParameter("register", true);
        if (register) {
            register(registryUrl, registeredProviderUrl);
            providerInvokerWrapper.setReg(true);
        }       
          // Deprecated! Subscribe to override rules in 2.6.x or before.
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

        exporter.setRegisterUrl(registeredProviderUrl);
        exporter.setSubscribeUrl(overrideSubscribeUrl);
        //Ensure that a new exporter instance is returned every time export
        return new DestroyableExporter<>(exporter);

    }

RegistryProtocol#doLocalExport

代码语言:javascript
复制
 private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
    String key = getCacheKey(originInvoker);
    return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
        Invoker<?> invokerDelegete = new InvokerDelegate<>(originInvoker, providerUrl);
        return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
    });
}

DubboProtocol#export

代码语言:javascript
复制
代码语言:javascript
复制
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    URL url = invoker.getUrl();
    openServer(url);
    optimizeSerialization(url);

    return exporter;
}
代码语言:javascript
复制
private ExchangeServer createServer(URL url) {

      ExchangeServer server;
    try {
        server = Exchangers.bind(url, requestHandler);
    } catch (RemotingException e) {
        throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
    }
}

上述Exchangers.bind(url, requestHandler)流程是 HeaderExchanger => Transporters$Adaptive => NettyTransporter => NettyServer。注:Exchanger、Transporter也是通过SPI扩展加载的

NettyServer#doOpen

代码语言:javascript
复制
@Override
    protected void doOpen() throws Throwable {
        bootstrap = new ServerBootstrap();

        bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
        workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                new DefaultThreadFactory("NettyServerWorker", true));

        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
        channels = nettyServerHandler.getChannels();

        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        // FIXME: should we use getTimeout()?
                        int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                        ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                                .addLast("decoder", adapter.getDecoder())
                                .addLast("encoder", adapter.getEncoder())
                                .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                                .addLast("handler", nettyServerHandler);
                    }
                });
        // bind
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();

    }

接下来看下服务注册,ZookeeperRegistry#doRegister

代码语言:javascript
复制
代码语言:javascript
复制
public class ZookeeperRegistry extends FailbackRegistry {

        public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
        super(url);
        if (url.isAnyHost()) {
            throw new IllegalStateException("registry address == null");
        }
        String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
        if (!group.startsWith(Constants.PATH_SEPARATOR)) {
            group = Constants.PATH_SEPARATOR + group;
        }
        this.root = group;
        zkClient = zookeeperTransporter.connect(url);
        zkClient.addStateListener(state -> {
            if (state == StateListener.RECONNECTED) {
                try {
                    recover();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
        });
    }
         @Override
    public void doRegister(URL url) {
        try {
            zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
}

ZookeeperTransporter

代码语言:javascript
复制
@SPI("curator")
public interface ZookeeperTransporter {
    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
    ZookeeperClient connect(URL url);
}

服务提供者启动后,向"/根/服务名/类型"目录写入自己的URL,比如/dubbo/org.apache.dubbo.demo.DemoService/providers/dubbo://172.23.205.2:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-annotation-provider&bean.name=providers:dubbo:org.apache.dubbo.demo.DemoService&default.deprecated=false&default.dynamic=false&default.register=true&deprecated=false&dubbo=2.0.2&dynamic=false&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=5316&register=true&release=&side=provider&timestamp=1557727085669,生成一个EPHEMERAL临时节点,这样即使服务提供者异常关闭,等待Zookeeper会话超时后,该临时节点也会自动删除,当然Dubbo通过封装JDK提供的ShutdownHook实现优雅停止,大概流程是标记不再新请求,已有请求处理完成后释放资源

注册完并不代表服务暴露的流程结束,还有一个很重要的过程需要处理,也就是监听配置变更,在ZookeeperRegistry#doSubscribe中创建持久化节点“dubbo/org.apache.dubbo.demo.DemoService/configurators”,然后设置监听回调地址,即回调给FailbackRegistry中的notify。如果有变更则更新本地Zookeeper信息缓存文件,然后判断是否需要重新export

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-05-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 松华说 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
微服务引擎 TSE
微服务引擎(Tencent Cloud Service Engine)提供开箱即用的云上全场景微服务解决方案。支持开源增强的云原生注册配置中心(Zookeeper、Nacos 和 Apollo),北极星网格(腾讯自研并开源的 PolarisMesh)、云原生 API 网关(Kong)以及微服务应用托管的弹性微服务平台。微服务引擎完全兼容开源版本的使用方式,在功能、可用性和可运维性等多个方面进行增强。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档