注: Dubbo版本是2.5.7
图1 MergeableClusterInvoker的类继承图
Mergeable,即对结果集进行合并。
核心代码在MergeableClusterInvoker的doInvoke(Invocation,List<Invoker<T>>,LoadBalance)中,源码如下。代码看似有点多,但是分析主要逻辑的话,不复杂。
@Override
public Result invoke(final Invocation invocation) throws RpcException {
List<Invoker<T>> invokers = directory.list(invocation);
String merger = getUrl().getMethodParameter(invocation.getMethodName(), Constants.MERGER_KEY);
Class<?> returnType;
try {
returnType = getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes()).getReturnType();
} catch (NoSuchMethodException e) {
returnType = null;
}
Map<String, Future<Result>> results = new HashMap<String, Future<Result>>();
for (final Invoker<T> invoker : invokers) {
Future<Result> future = executor.submit(new Callable<Result>() {
public Result call() throws Exception {
return invoker.invoke(new RpcInvocation(invocation, invoker));
}
});
results.put(invoker.getUrl().getServiceKey(), future);
}
Object result = null;
List<Result> resultList = new ArrayList<Result>(results.size());
int timeout = getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
for (Map.Entry<String, Future<Result>> entry : results.entrySet()) {
Future<Result> future = entry.getValue();
try {
Result r = future.get(timeout, TimeUnit.MILLISECONDS);
if (r.hasException()) {
log.error(new StringBuilder(32).append("Invoke ")
.append(getGroupDescFromServiceKey(entry.getKey()))
.append(" failed: ")
.append(r.getException().getMessage()).toString(),
r.getException());
} else {
resultList.add(r);
}
} catch (Exception e) {
throw new RpcException(new StringBuilder(32)
.append("Failed to invoke service ")
.append(entry.getKey())
.append(": ")
.append(e.getMessage()).toString(),
e);
}
}
if (resultList.size() == 0) {
return new RpcResult((Object) null);
} else if (resultList.size() == 1) {
return resultList.iterator().next();
}
if (returnType == void.class) {
return new RpcResult((Object) null);
}
if (merger.startsWith(".")) {
merger = merger.substring(1);
Method method;
try {
method = returnType.getMethod(merger, returnType);
} catch (NoSuchMethodException e) {
throw new RpcException(new StringBuilder(32)
.append("Can not merge result because missing method [ ")
.append(merger)
.append(" ] in class [ ")
.append(returnType.getClass().getName())
.append(" ]")
.toString());
}
if (method != null) {
if (!Modifier.isPublic(method.getModifiers())) {
method.setAccessible(true);
}
result = resultList.remove(0).getValue();
try {
if (method.getReturnType() != void.class
&& method.getReturnType().isAssignableFrom(result.getClass())) {
for (Result r : resultList) {
result = method.invoke(result, r.getValue());
}
} else {
for (Result r : resultList) {
method.invoke(result, r.getValue());
}
}
} catch (Exception e) {
throw new RpcException(
new StringBuilder(32)
.append("Can not merge result: ")
.append(e.getMessage()).toString(),
e);
}
} else {
throw new RpcException(
new StringBuilder(32)
.append("Can not merge result because missing method [ ")
.append(merger)
.append(" ] in class [ ")
.append(returnType.getClass().getName())
.append(" ]")
.toString());
}
} else {
Merger resultMerger;
if (ConfigUtils.isDefault(merger)) {
resultMerger = MergerFactory.getMerger(returnType);
} else {
resultMerger = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(merger);
}
if (resultMerger != null) {
List<Object> rets = new ArrayList<Object>(resultList.size());
for (Result r : resultList) {
rets.add(r.getValue());
}
result = resultMerger.merge(
rets.toArray((Object[]) Array.newInstance(returnType, 0)));
} else {
throw new RpcException("There is no merger to merge result.");
}
}
return new RpcResult(result);
}
1.首先得到服务提供者列表,遍历服务提供者,对每个服务提供者调用服务,这个调用过程是封装在Callable中的,放在线程池中执行的。
2.步骤1中得到的是个Future集合,即上面代码段中的results。
3.对results中的Future,进行Future.get(),即阻塞等待线程执行完成。这样得到所有的结果集,即上面代码段中的resultList。
4.为简化的目的,我们注重分析下面代码段中的源码。不考虑Merger是怎么来的话,代码比较简单,取出集合resultList中Result的value,结果也是个集合,即下面代码段中的rets,最后用Merger对rets集合合并操作。
} else {
Merger resultMerger;
if (ConfigUtils.isDefault(merger)) {
resultMerger = MergerFactory.getMerger(returnType);
} else {
resultMerger = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(merger);
}
if (resultMerger != null) {
List<Object> rets = new ArrayList<Object>(resultList.size());
for (Result r : resultList) {
rets.add(r.getValue());
}
result = resultMerger.merge(
rets.toArray((Object[]) Array.newInstance(returnType, 0)));
} else {
throw new RpcException("There is no merger to merge result.");
}
}
重点是使用多线程,调用服务提供者,最后将得到的结果集,用Merger进行合并。
(adsbygoogle = window.adsbygoogle || []).push({});