前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Dubbo集群容错模式之Forking实现 原

Dubbo集群容错模式之Forking实现 原

作者头像
克虏伯
发布2019-04-15 14:47:34
7150
发布2019-04-15 14:47:34
举报
文章被收录于专栏:软件开发-青出于蓝

注:Dubbo版本是2.6.2。

                                                        图1 Dubbo的ForkingClusterInvoker类继承图

1.Forking容错的含义

    并行调用多个服务,只要一个成功即返回,但是这要消耗更多的资源。

2.Forking的实现

    核心代码在ForkingClusterInvoker的doInvoke(Invocation,List<Invoker<T>>,LoadBalance)中,源码如下。代码看起来比较多,但是我们分析主要逻辑的话,不复杂。

代码语言:javascript
复制
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    checkInvokers(invokers, invocation);
    final List<Invoker<T>> selected;
    final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
    final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
    if (forks <= 0 || forks >= invokers.size()) {
        selected = invokers;
    } else {
        selected = new ArrayList<Invoker<T>>();
        for (int i = 0; i < forks; i++) {
            Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
            if (!selected.contains(invoker)) {//Avoid add the same invoker several times.
                selected.add(invoker);
            }
        }
    }
    RpcContext.getContext().setInvokers((List) selected);
    final AtomicInteger count = new AtomicInteger();
    final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();
    for (final Invoker<T> invoker : selected) {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Result result = invoker.invoke(invocation);
                    ref.offer(result);
                } catch (Throwable e) {
                    int value = count.incrementAndGet();
                    if (value >= selected.size()) {
                        ref.offer(e);
                    }
                }
            }
        });
    }
    try {
        Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
        if (ret instanceof Throwable) {
            Throwable e = (Throwable) ret;
            throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
        }
        return (Result) ret;
    } catch (InterruptedException e) {
        throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
    }
}
  • 首先获取我们设定的forks,是个int类型值,为了便于描述,假设这个forks的值是a。
  • 从invokers中选出a个Invoker放到selected中。
  • 遍历selected,将每个请求封装在Runnable中,Runnable中将请求的结果放入到队列ref中。注意Runnable中的实现细节如下,只有在最后一次请求失败时,将异常exception放如到offer中。
代码语言:javascript
复制
} catch (Throwable e) {
    int value = count.incrementAndGet();
    if (value >= selected.size()) {
        ref.offer(e);
    }
}

    可以看到,我们的请求是异常的。在最后,从ref队列中取出第一个,假设为r,而且是带有超时的那种等待。如果从队列中poll时,抛出InterruptedException异常,则将其封装后抛出;如果r是一个Throwable类型,则说明所有的请求都失败了,抛出异常;如果r不是Throwable则说明存在请求成功的情况,返回r。

代码语言:javascript
复制
try {
    Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
    if (ret instanceof Throwable) {
        Throwable e = (Throwable) ret;
        throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
    }
    return (Result) ret;
} catch (InterruptedException e) {
    throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
}

    重点是:并行调用多个服务,只要有一个成功,则返回结果,不等其它的线程执行完成。假设并行请求A1、A2、A3,A1请求使用了3s,A2请求用了2s,A3请求用了5s,且A2请求的结果最先放入到队列中,那么主线程就返回A2请求的结果,主线程不等A1和A3线程执行完成。

(adsbygoogle = window.adsbygoogle || []).push({});

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018/05/18 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.Forking容错的含义
  • 2.Forking的实现
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档