注:文章代码挺多,电脑打开www.liangsonghua.me观看效果更佳
上篇文章说到ServiceBean监听了ContextRefreshedEvent然后export服务,我们接着谈这个话题
private static final ScheduledExecutorService delayExportExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("DubboServiceDelayExporter", true));
public synchronized void export() {
checkAndUpdateSubConfigs();
if (shouldDelay()) {
delayExportExecutor.schedule(this::doExport, delay, TimeUnit.MILLISECONDS);
} else {
doExport();
}
}
Dubbo提供了延迟暴露服务的特性,这个功能对初始化服务非常友好,建议开启预热。
先看下ServiceConfig#checkAndUpdateSubConfigs,第一眼看到是check开头以为只是校验功能的
public void checkAndUpdateSubConfigs() {
// Use default configs defined explicitly on global configs
completeCompoundConfigs();
// Config Center should always being started first.
startConfigCenter();
checkDefault();
checkApplication();
checkRegistry();
checkProtocol();
this.refresh();
checkMetadataReport();
}
AbstractInterfaceConfig#startConfigCenter
void startConfigCenter() {
if (configCenter == null) {
ConfigManager.getInstance().getConfigCenter().ifPresent(cc -> this.configCenter = cc);
}
if (this.configCenter != null) {
// TODO there may have duplicate refresh
this.configCenter.refresh();
prepareEnvironment();
}
//刷新所有的配置,通过set方法注入相关属性
//getApplication().ifPresent(ApplicationConfig::refresh);
//java8的Optional#ifPresent()如果实例非空则调用Comsumer Lambda 表达式
ConfigManager.getInstance().refreshAll();
}
AbstractInterfaceConfig#prepareEnvironment
private void prepareEnvironment() {
if (configCenter.isValid()) {
if (!configCenter.checkOrUpdateInited()) {
return;
}
DynamicConfiguration dynamicConfiguration = getDynamicConfiguration(configCenter.toUrl());
String configContent = dynamicConfiguration.getConfig(configCenter.getConfigFile(),
try {
Environment.getInstance().setConfigCenterFirst(configCenter.isHighestPriority());
Environment.getInstance().updateExternalConfigurationMap(parseProperties(configContent));
Environment.getInstance().updateAppExternalConfigurationMap(parseProperties(appConfigContent));
} catch (IOException e) {
throw new IllegalStateException("Failed to parse configurations from Config Center.", e);
}
}
}
AbstractInterfaceConfig#getDynamicConfiguration,Dubbo默认可以支持apollo、consul、etcd、zookeeper作为配置中心
private DynamicConfiguration getDynamicConfiguration(URL url) {
DynamicConfigurationFactory factories = ExtensionLoader
.getExtensionLoader(DynamicConfigurationFactory.class)
.getExtension(url.getProtocol());
DynamicConfiguration configuration = factories.getDynamicConfiguration(url);
Environment.getInstance().setDynamicConfiguration(configuration);
return configuration;
}
ZookeeperDynamicConfigurationFactory#createDynamicConfiguration
@Override
protected DynamicConfiguration createDynamicConfiguration(URL url) {
return new ZookeeperDynamicConfiguration(url, zookeeperTransporter);
}
ZookeeperDynamicConfiguration#ZookeeperDynamicConfiguration,CountDownLatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程的操作执行完后再执行,这里需要等待的线程数只有一个,另外一个常常被相提并论的CyclicBarrier,它让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活
ZookeeperDynamicConfiguration(URL url, ZookeeperTransporter zookeeperTransporter) {
//zookeeper://127.0.0.1:2181/ConfigCenterConfig?address=zookeeper://127.0.0.1:2181&check=true&configFile=dubbo.properties&group=dubbo&highestPriority=false&namespace=dubbo&prefix=dubbo.config-center&timeout=3000&valid=true
this.url = url;
rootPath = "/" + url.getParameter(CONFIG_NAMESPACE_KEY, DEFAULT_GROUP) + "/config";
initializedLatch = new CountDownLatch(1);
this.cacheListener = new CacheListener(rootPath, initializedLatch);
this.executor = Executors.newFixedThreadPool(1, new NamedThreadFactory(this.getClass().getSimpleName(), true));
zkClient = zookeeperTransporter.connect(url);
zkClient.addDataListener(rootPath, cacheListener, executor);
try {
// Wait for connection
this.initializedLatch.await();
} catch (InterruptedException e) {
logger.warn("Failed to build local cache for config center (zookeeper)." + url);
}
}
Dubbo的Zookeeper客户端使用的是Curator,Curator是Netflix公司开源的一套Zookeeper客户端框架,解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等等
CuratorZookeeperClient#CuratorZookeeperClient
public CuratorZookeeperClient(URL url) {
super(url);
try {
int timeout = url.getParameter(Constants.TIMEOUT_KEY, 5000);
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.connectString(url.getBackupAddress())
.retryPolicy(new RetryNTimes(1, 1000))//重试策略
.connectionTimeoutMs(timeout);
//授权认证
String authority = url.getAuthority();
if (authority != null && authority.length() > 0) {
builder = builder.authorization("digest", authority.getBytes());
}
client = builder.build();
//连接状态变更通知
client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState state) {
if (state == ConnectionState.LOST) {
CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED);
} else if (state == ConnectionState.CONNECTED) {
CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED);
} else if (state == ConnectionState.RECONNECTED) {
CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED);
}
}
});
client.start();
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
回到服务暴露入口
private void doExportUrls() {
//加载注册中心列表
List<URL> registryURLs = loadRegistries(true);
//逐一向注册中心分组暴露服务
for (ProtocolConfig protocolConfig : protocols) {
//org.apache.dubbo.demo.DemoService
String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
ApplicationModel.initProviderModel(pathKey, providerModel);
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
ServiceConfig#doExportUrlsFor1Protocol
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
//本地scope
if (!Constants.SCOPE_REMOTE.equalsIgnoreCase(scope)) {
exportLocal(url);
}
//远程scope
if (!Constants.SCOPE_LOCAL.equalsIgnoreCase(scope)) {
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
}
本地暴露,使用proxyFactory#getInvoker创建服务代理切入点也就是invoker对象,然后protocol#export暴露服务
private void exportLocal(URL url) {
if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
//injvm://127.0.0.1/....
URL local = URLBuilder.from(url)
.setProtocol(Constants.LOCAL_PROTOCOL)
.setHost(LOCALHOST_VALUE)
.setPort(0)
.build();
Exporter<?> exporter = protocol.export(
proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
exporters.add(exporter);
}
}
ProxyFactory通过ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension()获得
如果扩展加载器getExtension找不到就创建(注:Dubbo使用了大量的双重检测机制),这里的injectExtension是通过set方法注入扩展加载器的依赖。注:原始JDK SPI不支持IOC注入,Dubbo SPI支持Set方法和构造方法注入,wrapperClass.getConstructor(type).newInstance(instance)
@SuppressWarnings("unchecked")
private T createExtension(String name) {
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
}
try {
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
injectExtension(instance);
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (CollectionUtils.isNotEmpty(wrapperClasses)) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance (name: " + name + ", class: " +
type + ") couldn't be instantiated: " + t.getMessage(), t);
}
}
如果自适应扩展找不到就创建。注:原始JDK SPI要用循环判断对象,Dubbo自己设计的可以通过getAdaptiveExtension灵活获取对象
private T createAdaptiveExtension() {
try {
return injectExtension((T) getAdaptiveExtensionClass().newInstance());
} catch (Exception e) {
throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage(), e);
}
}
getAdaptiveExtensionClass方法是生成code然后编译
private Class<?> createAdaptiveExtensionClass() {
String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();
//取得该Class对象的类装载器
ClassLoader classLoader = findClassLoader();
org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
return compiler.compile(code, classLoader);
}
code长下面这个样子,重点关注extension。ProtocolFactory$Adaptive的code就不显示了
package org.apache.dubbo.rpc;
import org.apache.dubbo.common.extension.ExtensionLoader;
public class ProxyFactory$Adaptive implements org.apache.dubbo.rpc.ProxyFactory {
public java.lang.Object getProxy(org.apache.dubbo.rpc.Invoker arg0, boolean arg1) throws org.apache.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
org.apache.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("proxy", "javassist");
if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])");
org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getProxy(arg0, arg1);
}
public org.apache.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, org.apache.dubbo.common.URL arg2) throws org.apache.dubbo.rpc.RpcException {
if (arg2 == null) throw new IllegalArgumentException("url == null");
org.apache.dubbo.common.URL url = arg2;
String extName = url.getParameter("proxy", "javassist");
if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])");
org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getInvoker(arg0, arg1, arg2);
}
}
默认类装载器是org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory,另外一种是org.apache.dubbo.rpc.proxy.jdk.JdkProxyFactory
public class JavassistProxyFactory extends AbstractProxyFactory {
@Override
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}
客户端调用(消费时)会用到getProxy方法获取代理对象,其中InvokerInvocationHandler#invoker中会过滤方法,toString、hashCode、equals方法不走rpc调用,rpc调用走Invoker.invoke(createInvocation(method, args)).recreate()
由于Invoker对象实际和Service实现对象是无法直接调用的,需要先将ref实现类包装成了一个Wrapper,利用字节码增强技术Javassist为Wrapper创建一个实现类,然后Invoker被调用的时候会触发doInvoke()方法,然后调用Wrapper的invokeMethod()方法
跳回到protocol#export的分析,它必须是幂等的,也就是暴露同一个URL的Invoker两次,和暴露一次没有区别,Invoker传入后,根据Invoker.url自动获得对应Protocol拓展实现。export()方法的调用顺序是:Protocol$Adaptive => QosProtocolWrapper => ProtocolListenerWrapper => ProtocolFilterWrapper => InjvmProtocol
ProtocolFilterWrapper#export
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
//注册中心export
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
//构建执行链后export
return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}
ProtocolFilterWrapper#buildInvokerChain,默认的filters包括EchoFilter、ClassFilter、GenericFilter、ContextFilter、TraceFilter、TimeOutFilter、MonitorFilter、ExceptionFilter
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
@Override
public Class<T> getInterface() {
return invoker.getInterface();
}
@Override
public URL getUrl() {
return invoker.getUrl();
}
@Override
public boolean isAvailable() {
return invoker.isAvailable();
}
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result result = filter.invoke(next, invocation);
if (result instanceof AsyncRpcResult) {
AsyncRpcResult asyncResult = (AsyncRpcResult) result;
asyncResult.thenApplyWithContext(r -> filter.onResponse(r, invoker, invocation));
return asyncResult;
} else {
return filter.onResponse(result, invoker, invocation);
}
}
@Override
public void destroy() {
invoker.destroy();
}
@Override
public String toString() {
return invoker.toString();
}
};
}
}
return last;
}
我们选取限流Filter看下,其他如最小活跃连接数、缓存和异常Filter推荐自行阅读
@Activate(group = Constants.PROVIDER, value = Constants.TPS_LIMIT_RATE_KEY)
public class TpsLimitFilter implements Filter {
private final TPSLimiter tpsLimiter = new DefaultTPSLimiter();
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
if (!tpsLimiter.isAllowable(invoker.getUrl(), invocation)) {
throw new RpcException(
"Failed to invoke service " +
invoker.getInterface().getName() +
"." +
invocation.getMethodName() +
" because exceed max service tps.");
}
return invoker.invoke(invocation);
}
}
StatItem#isAllowable,Dubbo使用的是AtomicInteger。首先根据时间偏差重置窗口,然后每次来一个请求就减少一个许可。业内常见的限流方式有计数器、滑动容器、漏斗、令牌
public boolean isAllowable() {
long now = System.currentTimeMillis();
if (now > lastResetTime + interval) {
token.set(rate);
lastResetTime = now;
}
int value = token.get();
boolean flag = false;
while (value > 0 && !flag) {
flag = token.compareAndSet(value, value - 1);
value = token.get();
}
return flag;
}
InjvmProtocol#export
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}
当scope为空时,会同时进行本地暴露和远程暴露,远程export流程中我们先看下ProtocolListenerWrapper#startQosServer。在Dubbo中,QoS这个概念被用于动态的对服务进行查询和控制,就是后台控制器角色,例如对获取当前提供和消费的所有服务,以及对服务进行动态的上下线,即从注册中心上进行注册和反注册操作
private void startQosServer(URL url) {
try {
//又看到了AtomicBoolean。在进行完比较和赋值操作之前,不会发生任何其他的操作,CAS锁
if (!hasStarted.compareAndSet(false, true)) {
return;
}
int port = url.getParameter(QOS_PORT, QosConstants.DEFAULT_PORT);
boolean acceptForeignIp = Boolean.parseBoolean(url.getParameter(ACCEPT_FOREIGN_IP, "false"));
Server server = Server.getInstance();
server.setPort(port);
server.setAcceptForeignIp(acceptForeignIp);
server.start();
} catch (Throwable throwable) {
logger.warn("Fail to start qos server: ", throwable);
}
}
Server#start
public void start() throws Throwable {
if (!started.compareAndSet(false, true)) {
return;
}
boss = new NioEventLoopGroup(1, new DefaultThreadFactory("qos-boss", true));
worker = new NioEventLoopGroup(0, new DefaultThreadFactory("qos-worker", true));
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss, worker);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true);
serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, true);
serverBootstrap.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new QosProcessHandler(welcome, acceptForeignIp));
}
});
try {
serverBootstrap.bind(port).sync();
logger.info("qos-server bind localhost:" + port);
} catch (Throwable throwable) {
logger.error("qos-server can not bind localhost:" + port, throwable);
throw throwable;
}
}
回到远程暴露export本身,RegistryProtocol#export
public void register(URL registryUrl, URL registeredProviderUrl) {
Registry registry = registryFactory.getRegistry(registryUrl);
registry.register(registeredProviderUrl);
}
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
registryUrl, registeredProviderUrl);
boolean register = registeredProviderUrl.getParameter("register", true);
if (register) {
register(registryUrl, registeredProviderUrl);
providerInvokerWrapper.setReg(true);
}
// Deprecated! Subscribe to override rules in 2.6.x or before.
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<>(exporter);
}
RegistryProtocol#doLocalExport
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
String key = getCacheKey(originInvoker);
return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
Invoker<?> invokerDelegete = new InvokerDelegate<>(originInvoker, providerUrl);
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
});
}
DubboProtocol#export
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
openServer(url);
optimizeSerialization(url);
return exporter;
}
private ExchangeServer createServer(URL url) {
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
}
上述Exchangers.bind(url, requestHandler)流程是 HeaderExchanger => Transporters$Adaptive => NettyTransporter => NettyServer。注:Exchanger、Transporter也是通过SPI扩展加载的
NettyServer#doOpen
@Override
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
new DefaultThreadFactory("NettyServerWorker", true));
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// FIXME: should we use getTimeout()?
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
.addLast("handler", nettyServerHandler);
}
});
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
接下来看下服务注册,ZookeeperRegistry#doRegister
public class ZookeeperRegistry extends FailbackRegistry {
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(Constants.PATH_SEPARATOR)) {
group = Constants.PATH_SEPARATOR + group;
}
this.root = group;
zkClient = zookeeperTransporter.connect(url);
zkClient.addStateListener(state -> {
if (state == StateListener.RECONNECTED) {
try {
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
});
}
@Override
public void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
}
ZookeeperTransporter
@SPI("curator")
public interface ZookeeperTransporter {
@Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
ZookeeperClient connect(URL url);
}
服务提供者启动后,向"/根/服务名/类型"目录写入自己的URL,比如/dubbo/org.apache.dubbo.demo.DemoService/providers/dubbo://172.23.205.2:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-annotation-provider&bean.name=providers:dubbo:org.apache.dubbo.demo.DemoService&default.deprecated=false&default.dynamic=false&default.register=true&deprecated=false&dubbo=2.0.2&dynamic=false&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=5316®ister=true&release=&side=provider×tamp=1557727085669,生成一个EPHEMERAL临时节点,这样即使服务提供者异常关闭,等待Zookeeper会话超时后,该临时节点也会自动删除,当然Dubbo通过封装JDK提供的ShutdownHook实现优雅停止,大概流程是标记不再新请求,已有请求处理完成后释放资源
注册完并不代表服务暴露的流程结束,还有一个很重要的过程需要处理,也就是监听配置变更,在ZookeeperRegistry#doSubscribe中创建持久化节点“dubbo/org.apache.dubbo.demo.DemoService/configurators”,然后设置监听回调地址,即回调给FailbackRegistry中的notify。如果有变更则更新本地Zookeeper信息缓存文件,然后判断是否需要重新export