注:公众号关于dubbo解读文章均基于apache-dubbo-incubating-2.7.1版本,发版于5月26号,此版本注册中心(多数是zookeeper)在某些特殊场景下会出现重复URL地址数据无法删除,导致消费方拿到的是失效地址,从而导致调用失败的问题。如果你也在使用此版本进行源码学习,在网络漂移(下班回家再调试源码)的情况下需要手动删除zookeeper的dubbo节点路径
服务引用示例
public class Application {
/**
* In order to make sure multicast registry works, need to specify '-Djava.net.preferIPv4Stack=true' before
* launch the application
*/
public static void main(String[] args) {
ReferenceConfig<DemoService> reference = new ReferenceConfig<>();
reference.setApplication(new ApplicationConfig("dubbo-demo-api-consumer"));
reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
reference.setInterface(DemoService.class);
DemoService service = reference.get();
String message = service.sayHello("dubbo");
System.out.println(message);
}
}
这里得到的DemoService是什么?将下来一层一层地掀开迷底
ReferenceConfig#get=>ReferenceConfig#init,在init方法中会执行ref = createProxy(map);
@SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
private T createProxy(Map<String, String> map) {
//TODO 本地引用inJvm
if (shouldJvmRefer(map)) {
invoker = refprotocol.refer(interfaceClass, url);
} else {
//TODO urls为服务引用的接口信息,URL是dubbo中的统一数据模型
if (urls.size() == 1) {
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
for (URL url : urls) {
invokers.add(refprotocol.refer(interfaceClass, url));
}
if (registryURL != null) {
invoker = cluster.join(new StaticDirectory(u, invokers));
} else {
//TODO 直连
//TODO 适用于测试环境或者注册中心不可用时需要发布的情况
invoker = cluster.join(new StaticDirectory(invokers));
}
}
}
//TODO 创建服务代理
return (T) proxyFactory.getProxy(invoker);
}
直连中的StaticDirectory的作用是本文第一个重点关注对象,实际上集群目录服务实现父抽象类AbstractDirectory的doList模板方法,会返回经过路由过滤后的Invoker列表,路由过滤也就是服务路由,常用于设置分组调用、同机房调用优先、灰度分布、流量切换、读写分离等。另外还有RegistryDirectory、MockDirectory目录服务,可以不加思索地猜想RegistryDirectory会动态维护Invoker列表,StaticDirector则是直接返回
Protocol中的refer方法是被@Adaptive注解修饰的,说明它是一个自适应扩展点,自适应扩展点加在方法层面上,表示会动态生成一个自适应的适配器,比如这里的DubboProtocol$Adaptive,并且默认实现是"dubbo",最终实现可以通过在URL指定
/**
* Protocol. (API/SPI, Singleton, ThreadSafe)
*/
@SPI("dubbo")
public interface Protocol {
int getDefaultPort();
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
void destroy();
}
refprotocol#refer先后经过filter包装类ProtocolFilterWrapper、ProtocolListenerWrapper最后执行RegistryProtocol。这些包装类是在创建扩展器时,通过查找构造方法的参数类型获取Wrapper类,然后将自身注入,前者ProtocolFilterWrapper负责过滤器,Dubbo允许我们在provider端设置权限校验、缓存、限流等等一些Filter过滤器,在consumer端也可以设置一些Filter,这是一种责任链模式;后者ProtocolListenerWrapper负责监听器,Dubbo允许consumer端在调用之前、调用之后或出现异常时,触发oninvoke、onreturn、onthrow三个事件
private T createExtension(String name) {
try {
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (CollectionUtils.isNotEmpty(wrapperClasses)) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
}
RegistryProtocol#refer=>doRefer
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
//TODO 对多个invoker进行封装
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
//TODO 注册
registry.register(directory.getRegisteredConsumerUrl());
}
directory.buildRouterChain(subscribeUrl);
//TODO 订阅,监听变化
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
//TODO 根据容错模式和负载均衡算法获取invoker
Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
简单回顾下之前提到的服务注册,执行链大概是这样的,Registry根据URL通过registryFactory获取自适应适配类,最后会执行FailbackRegistry中的模块方法doRegister,真正的实现类是ZookeeperRegistry,创建路径节点,将url信息写入zookeeper中
/***************分隔线***********************/
接下来说下订阅监听zookeeper变化
RegistryDirectory#subscribe
public void subscribe(URL url) {
setConsumerUrl(url);
consumerConfigurationListener.addNotifyListener(this);
serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
registry.subscribe(url, this);
}
FailbackRegistry#subscribe
@Override
public void subscribe(URL url, NotifyListener listener) {
try {
doSubscribe(url, listener);
} catch (Exception e) {
List<URL> urls = getCacheUrls(url);
if (CollectionUtils.isNotEmpty(urls)) {
notify(url, listener, urls);
}
addFailedSubscribed(url, listener);
}
}
//TODO 模板方法
public abstract void doSubscribe(URL url, NotifyListener listener);
ZookeeperRegistry#doSubscribe
@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
List<URL> urls = new ArrayList<>();
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)));
zkListener = listeners.get(listener);
}
//TODO 服务提供者dubbo/org.apache.dubbo.demo.DemoService/providers
//TODO 服务配置dubbo/org.apache.dubbo.demo.DemoService/configurators
/TODO 服务路由/dubbo/org.apache.dubbo.demo.DemoService/routers
zkClient.create(path, false);
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
//TODO 只有服务提供者才有children,也就是具体某个实例
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
notify(url, listener, urls);
}
可以看到当有注册中心有服务列表更新的时候会执行通知,此接口接受三种类别的url,包括服务提供方provider、服务配置configurator、服务路由router。通知方法执行链大概是
FailbackRegistry#notify=>AbstractRegistry#notify=>RegistryDirectory#notify
AbstractRegistry#notify
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
//TODO 通知
listener.notify(categoryList);
// We will update our cache file after each notification.
// When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.
//TODO 更新缓存
saveProperties(url);
}
AbstractRegistry#saveProperties
File lockfile = new File(file.getAbsolutePath() + ".lock");
if (!lockfile.exists()) {
lockfile.createNewFile();
}
try (RandomAccessFile raf = new RandomAccessFile(lockfile, "rw");
FileChannel channel = raf.getChannel()) {
FileLock lock = channel.tryLock();
if (lock == null) {
throw new IOException("Can not lock the registry cache file " + file.getAbsolutePath() + ", ignore and retry later, maybe multi java process use the file, please config: dubbo.registry.file=xxx.properties");
}
// Save
try {
if (!file.exists()) {
file.createNewFile();
}
try (FileOutputStream outputFile = new FileOutputStream(file)) {
properties.store(outputFile, "Dubbo Registry Cache");
}
} finally {
lock.release();
}
}
平时我们更习惯使用输入流和输出流操作文件,将输入流的数据写入到输出流中,但是利用FileChannel会更加高效,它能直连输入和输出流的文件通道
/***************分隔线*************/
回到notify方法中,执行链
RegistryDirectory#notify=>RegistryDirectory#refreshOverrideAndInvoker=>RegistryDirectory#refreshInvoker
@Override
public URL getUrl() {
return this.overrideDirectoryUrl;
}
private void refreshOverrideAndInvoker(List<URL> urls) {
// mock zookeeper://xxx?mock=return null
overrideDirectoryUrl();
refreshInvoker(urls);
}
private void refreshInvoker(List<URL> invokerUrls) {
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
routerChain.setInvokers(newInvokers);
this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
this.urlInvokerMap = newUrlInvokerMap;
}
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>();
String key = url.toFullString(); // The parameter urls are sorted
invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
newUrlInvokerMap.put(key, invoker);
return newUrlInvokerMap;
}
在refreshOverrideAndInvoker中会更新routerChain,也会更新overrideDirectoryUrl等,invoker#getUrl实际上是取的overrideDirectoryUrl,而扩展点适配器选择具体实现是根据URL来的。注:URL在Dubbo中是统一的数据模式
protocol#refer又用到了Dubbo SPI机制,执行链是Protocol$Adaptive#refer=>ProtocolListenerWrapper#refer=>ProtocolFilterWrapper#refer=>DubboProtocol#refer,其中在 DubboProtocol#refer方法中会构建DubboInvoker对象
DubboProtocol#refer
@Override
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
refer方法参数中还有一个很重要的DubboProtocol#getClient方法,猜想是Netty客户端的建立,下篇文章再分析
/**************分隔线*************/
回到doRefer中的Cluster.join(directory),Cluster是一个集群容错接口,同时也是一个@Adaptive自适应扩展点,默认实现类是FailoverCluster.NAME。Dubbo主要内置了如下几种策略:失败自动切换Failover、安全失败Failsafe、快速失败Failfast、失败自动恢复Failback、并行调用Forking、广播Broadcast
@SPI(FailoverCluster.NAME)
public interface Cluster {
/**
* Merge the directory invokers to a virtual invoker.
*
* @param <T>
* @param directory
* @return cluster invoker
* @throws RpcException
*/
@Adaptive
<T> Invoker<T> join(Directory<T> directory) throws RpcException;
}
前面我们说过ExtensionLoader在实例化对象时,会将自己(这里是FailOverCluster)注入到包装类中,所以这里实际上是调用MockClusterWrapper#join,所以在ReferenceConfig#createProxy中的invoker = refprotocol.refer(interfaceClass, urls.get(0))中得到的invoker是一个MockClusterWrapper包装类。使用这种机制可以把一些公共处理放在Wrapper包装类中
/************************题外话-开始*************************/
这里插一句题外话,Dubbo是如何实现Forking调用?
// 创建线程池
ExecutorService executor =
Executors.newFixedThreadPool(3);
// 创建 CompletionService
CompletionService<Integer> cs =
new ExecutorCompletionService<>(executor);
// 用于保存 Future 对象
List<Future<Integer>> futures =
new ArrayList<>(3);
// 提交异步任务,并保存 future 到 futures
futures.add(
cs.submit(()->sayHello1()));
futures.add(
cs.submit(()->sayHello2()));
futures.add(
cs.submit(()->sayHeelo2()));
// 获取最快返回的任务执行结果
Integer r = 0;
try {
// 只要有一个成功返回,则 break
for (int i = 0; i < 3; ++i) {
r = cs.take().get();
// 简单地通过判空来检查是否成功返回
if (r != null) {
break;
}
}
} finally {
// 取消所有任务
for(Future<Integer> f : futures)
f.cancel(true);
}
return r;
当需要批量提交异步任务的时候使用CompletionService。CompletionService将线程池 Executor和阻塞队列BlockingQueue的功能融合在了一起,能够让批量异步任务的管理更简单。除此之外,CompletionService能够让异步任务的执行结果有序化,先执行完的先进入阻塞队列,利用这个特性,你可以轻松实现后续处理的有序性,避免无谓的等待。嗯,作者就是这么照顾读者,看源码一定要学到东西,学到东西也会分享出来,所有快快关注“松花皮蛋的黑板报”一起涨见识吧!
/***********题外话-结束******************/
说回到消费端入口ReferenceConfig
private T createProxy(Map<String, String> map) {
invoker = refprotocol.refer(interfaceClass, urls.get(0));
return (T) proxyFactory.getProxy(invoker);
}
前面我们讲清楚了invoker是一个引用了FailOverClusterInvoker的MockClusterInvoker,接下来看下getProxy,执行链是
ProxyFactory$Adaptive#getProxy=>StubProxyFactoryWrapper#getProxy=>AbstractProxyFactory#getProxy=>JavassistProxyFactory#getProxy
先看下包装类的方法,StubProxyFactoryWrapper#getProxy
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public <T> T getProxy(Invoker<T> invoker) throws RpcException {
//TODO 这里的invoker是MockClusterInvoker
//TODO proxyFactory是JavassistProxyFactory
T proxy = proxyFactory.getProxy(invoker);
//TODO 泛化调用入口
if (GenericService.class != invoker.getInterface()) {
Class<?> stubClass = ReflectUtils.forName(stub);
Constructor<?> constructor = ReflectUtils.findConstructor(stubClass, serviceType);
proxy = (T) constructor.newInstance(new Object[]{proxy});
}
}
return proxy;
}
泛化调用主要用于消费端没有API接口的情况;不需要引入接口JAR包,而是直接通过GenericService接口来发起服务调用,参数及返回值中的所有POJO均用Map表示。泛化调用对于服务端无需关注,按正常服务进行暴露即可。以下几种场景可以考虑使用泛化调用:服务测试平台、API服务网关
再来看下JavassistProxyFactory类
public class JavassistProxyFactory extends AbstractProxyFactory {
//TODO DUbbo中的URL数据模型中指定有proxyFactory实现,默认是javassist
@Override
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}
可以看到getProxy得到的是一个InvokerInvocationHandler,所以在服务端调用RPC接口方法会调用到InvokerInvocationHandler#invoker,其中toString\hashCode\equals方法不走RPC调用。此时开头提出的问题终于有了答案,reference#get得到的是InvokerInvocationHandler对象
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
//TODO 以下方法走本地调用
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
//TODO 调用MockClusterInvoker#invoker
return invoker.invoke(createInvocation(method, args)).recreate();
}
MockClusterInvoker#invoker
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result result = null;
//TODO URL中的mock值,默认是不开启
String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
if (value.length() == 0 || value.equalsIgnoreCase("false")) {
//TODO 不走mock
result = this.invoker.invoke(invocation);
} else if (value.startsWith("force")) {
//TODO 不管是否失败都直接走mock
result = doMockInvoke(invocation, null);
} else {
//TODO 失败时调用mock
try {
result = this.invoker.invoke(invocation);
} catch (RpcException e) {
result = doMockInvoke(invocation, e);
}
}
return result;
}
有了mock,再也不用担心联调慢和上游服务全部宕机
然后执行链会走到AbstractClusterInvoker#invoke
@Override
public Result invoke(final Invocation invocation) throws RpcException {
//TODO 从directory获取到invokerList
List<Invoker<T>> invokers = list(invocation);
//TODO 通过SPI扩展实例化LoadBalance,默认是随机选取
LoadBalance loadbalance = initLoadBalance(invokers, invocation);
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
//TODO FailoverClusterInvoker
return doInvoke(invocation, invokers, loadbalance);
}
而list的执行链是RegistryDirectory#doList=>RouterChain#route,之前在分析完成注册中心后订阅变更事件,然后会获取Directory中的RouterChain,这里也验证了开头描述的RegistryDirectory是动态维护的Invoker目录服务
而doInvoke会将loadbalance实例传到FailoverClusterInvoker#doInvoke
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
//TODO 获取invoker
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
//TODO 执行
Result result = invoker.invoke(invocation);
}
FailoverClusterInvoker#doInvoke=>InvokerWrapper#invoke=>ListenerInvokerWrapper#invoke=>ProtocolFilterWrapper#invoke=>AbstractInvoker#invoke=>DubboInvoker
DubboInvoker#doInvoke,异步调用、回调调用,同时这也是与高性能NIO通信框架Netty交互的入口
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
ExchangeClient currentClient;//TODO 之前创建的连接
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
if (isOneway) {//TODO 单向
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
// TODO 将RpcInvocation放到Netty处理流程中
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {//TODO 异步
//TODO 将RpcInvocation放到Netty处理流程中
ResponseFuture future = currentClient.request(inv, timeout);
FutureAdapter<Object> futureAdapter = new FutureAdapter<>(future);
RpcContext.getContext().setFuture(futureAdapter);
Result result;
if (isAsyncFuture) {//TODO 回调
result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
} else {
result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
}
return result;
} else {
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
} catch (RemotingException e) {
}
}