前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Dubbo集群调用模式之Mergeable实现

Dubbo集群调用模式之Mergeable实现

作者头像
克虏伯
发布2019-04-15 14:27:50
6980
发布2019-04-15 14:27:50
举报

注: Dubbo版本是2.5.7

                                                        图1 MergeableClusterInvoker的类继承图

1.Mergeable的含义

    Mergeable,即对结果集进行合并。

2.Dubbo中是怎么实现

    核心代码在MergeableClusterInvoker的doInvoke(Invocation,List<Invoker<T>>,LoadBalance)中,源码如下。代码看似有点多,但是分析主要逻辑的话,不复杂。

代码语言:javascript
复制
@Override
public Result invoke(final Invocation invocation) throws RpcException {
    List<Invoker<T>> invokers = directory.list(invocation);
    String merger = getUrl().getMethodParameter(invocation.getMethodName(), Constants.MERGER_KEY);
    Class<?> returnType;
    try {
        returnType = getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes()).getReturnType();
    } catch (NoSuchMethodException e) {
        returnType = null;
    }
    Map<String, Future<Result>> results = new HashMap<String, Future<Result>>();
    for (final Invoker<T> invoker : invokers) {
        Future<Result> future = executor.submit(new Callable<Result>() {
            public Result call() throws Exception {
                return invoker.invoke(new RpcInvocation(invocation, invoker));
            }
        });
        results.put(invoker.getUrl().getServiceKey(), future);
    }
    Object result = null;
    List<Result> resultList = new ArrayList<Result>(results.size());
    int timeout = getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
    for (Map.Entry<String, Future<Result>> entry : results.entrySet()) {
        Future<Result> future = entry.getValue();
        try {
            Result r = future.get(timeout, TimeUnit.MILLISECONDS);
            if (r.hasException()) {
                log.error(new StringBuilder(32).append("Invoke ")
                                .append(getGroupDescFromServiceKey(entry.getKey()))
                                .append(" failed: ")
                                .append(r.getException().getMessage()).toString(),
                        r.getException());
            } else {
                resultList.add(r);
            }
        } catch (Exception e) {
            throw new RpcException(new StringBuilder(32)
                    .append("Failed to invoke service ")
                    .append(entry.getKey())
                    .append(": ")
                    .append(e.getMessage()).toString(),
                    e);
        }
    }
    if (resultList.size() == 0) {
        return new RpcResult((Object) null);
    } else if (resultList.size() == 1) {
        return resultList.iterator().next();
    }
    if (returnType == void.class) {
        return new RpcResult((Object) null);
    }
    if (merger.startsWith(".")) {
        merger = merger.substring(1);
        Method method;
        try {
            method = returnType.getMethod(merger, returnType);
        } catch (NoSuchMethodException e) {
            throw new RpcException(new StringBuilder(32)
                    .append("Can not merge result because missing method [ ")
                    .append(merger)
                    .append(" ] in class [ ")
                    .append(returnType.getClass().getName())
                    .append(" ]")
                    .toString());
        }
        if (method != null) {
            if (!Modifier.isPublic(method.getModifiers())) {
                method.setAccessible(true);
            }
            result = resultList.remove(0).getValue();
            try {
                if (method.getReturnType() != void.class
                        && method.getReturnType().isAssignableFrom(result.getClass())) {
                    for (Result r : resultList) {
                        result = method.invoke(result, r.getValue());
                    }
                } else {
                    for (Result r : resultList) {
                        method.invoke(result, r.getValue());
                    }
                }
            } catch (Exception e) {
                throw new RpcException(
                        new StringBuilder(32)
                                .append("Can not merge result: ")
                                .append(e.getMessage()).toString(),
                        e);
            }
        } else {
            throw new RpcException(
                    new StringBuilder(32)
                            .append("Can not merge result because missing method [ ")
                            .append(merger)
                            .append(" ] in class [ ")
                            .append(returnType.getClass().getName())
                            .append(" ]")
                            .toString());
        }
    } else {
        Merger resultMerger;
        if (ConfigUtils.isDefault(merger)) {
            resultMerger = MergerFactory.getMerger(returnType);
        } else {
            resultMerger = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(merger);
        }
        if (resultMerger != null) {
            List<Object> rets = new ArrayList<Object>(resultList.size());
            for (Result r : resultList) {
                rets.add(r.getValue());
            }
            result = resultMerger.merge(
                    rets.toArray((Object[]) Array.newInstance(returnType, 0)));
        } else {
            throw new RpcException("There is no merger to merge result.");
        }
    }
    return new RpcResult(result);
}

    1.首先得到服务提供者列表,遍历服务提供者,对每个服务提供者调用服务,这个调用过程是封装在Callable中的,放在线程池中执行的。

    2.步骤1中得到的是个Future集合,即上面代码段中的results。

    3.对results中的Future,进行Future.get(),即阻塞等待线程执行完成。这样得到所有的结果集,即上面代码段中的resultList。

    4.为简化的目的,我们注重分析下面代码段中的源码。不考虑Merger是怎么来的话,代码比较简单,取出集合resultList中Result的value,结果也是个集合,即下面代码段中的rets,最后用Merger对rets集合合并操作。

代码语言:javascript
复制
} else {
    Merger resultMerger;
    if (ConfigUtils.isDefault(merger)) {
        resultMerger = MergerFactory.getMerger(returnType);
    } else {
        resultMerger = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(merger);
    }
    if (resultMerger != null) {
        List<Object> rets = new ArrayList<Object>(resultList.size());
        for (Result r : resultList) {
            rets.add(r.getValue());
        }
        result = resultMerger.merge(
                rets.toArray((Object[]) Array.newInstance(returnType, 0)));
    } else {
        throw new RpcException("There is no merger to merge result.");
    }
}

    重点是使用多线程,调用服务提供者,最后将得到的结果集,用Merger进行合并。

(adsbygoogle = window.adsbygoogle || []).push({});

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018/05/18 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.Mergeable的含义
  • 2.Dubbo中是怎么实现
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档