前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >源码分析Dubbo异步调用与事件回调机制

源码分析Dubbo异步调用与事件回调机制

作者头像
丁威
发布2019-06-10 17:19:24
2.2K0
发布2019-06-10 17:19:24
举报
文章被收录于专栏:中间件兴趣圈中间件兴趣圈

微信公众号:[中间件兴趣圈] 作者简介:《RocketMQ技术内幕》作者

本文将详细分析Dubbo服务异步调用与事件回调机制。

异步调用与事件回调机制

TTY异步回调
事件回调

异步调用与事件回调机制

在Dubbo中,引入特定的过滤器FutureFilter来处理异步调用相关的逻辑,其定义如下:

1@Activate(group = Constants.CONSUMER)
2public class FutureFilter implements Filter {
3}

group=CONSUMER说明该过滤器属于消费端过滤器。 接下来从从invoke方法详细分析其实现逻辑。

 1public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
 2        final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);     // @1
 3        fireInvokeCallback(invoker, invocation);                                                     // @2
 4        // need to configure if there's return value before the invocation in order to help invoker to judge if it's
 5        // necessary to return future.
 6        Result result = invoker.invoke(invocation);                                                 // @3
 7        if (isAsync) {
 8            asyncCallback(invoker, invocation);                                                       // @4
 9        } else {
10            syncCallback(invoker, invocation, result);                                              // @5
11        }
12        return result;
13}

代码@1:首先从URL中获取是否是异步调用标志,其配置属性为< dubbo:service async=""/>获取其子标签< dubbo:method async=""/>。

代码@2:同步调用oninvoke事件,执行invoke方法之前的事件。

代码@3:继续沿着调用链调用,最终会到具体的协议Invoker,例如DubboInvoker,发生具体的服务调用,跟踪一下同步、异步调用的实现细节。

代码@4:如果调用方式是异步模式,则异步调用onreturn或onthrow事件。

代码@5:如果调用方式是同步模式,则同步调用onreturn或onthrow事件。

FutureFilter#fireInvokeCallback
 1private void fireInvokeCallback(final Invoker<?> invoker, final Invocation invocation) {
 2        final Method onInvokeMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), 
 3Constants.ON_INVOKE_METHOD_KEY));      // @1
 4        final Object onInvokeInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), 
 5Constants.ON_INVOKE_INSTANCE_KEY));   // @2
 6        if (onInvokeMethod == null && onInvokeInst == null) {    // @3
 7            return;
 8        }
 9        if (onInvokeMethod == null || onInvokeInst == null) {    // @4
10            throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onInvokeMethod == null ? "method" : 
11                         "instance") + " found. url:" + invoker.getUrl());
12        }
13        if (!onInvokeMethod.isAccessible()) {
14            onInvokeMethod.setAccessible(true);
15        }
16        Object[] params = invocation.getArguments();
17        try {
18            onInvokeMethod.invoke(onInvokeInst, params);        // @5
19        } catch (InvocationTargetException e) {
20            fireThrowCallback(invoker, invocation, e.getTargetException());    // @6
21        } catch (Throwable e) {
22            fireThrowCallback(invoker, invocation, e);                         // @7
23        }
24    }

代码@1:StaticContext.getSystemContext()中根据key:serviceKey + “.” + method + "." + "oninvoke.method" 获取配置的oninvoke.method方法名。其中serviceKey为[group]/interface:[version],其中group与version可能为空,忽略。

代码@2:同样根据key:serviceKey + “.” + method + "." + "oninvoke.instance" 从StaticContext.getSystemContext()获取oninvoke.method方法所在的实例名对象,也就是说该调用哪个对象的oninvoke.method指定的方法。这里就有一个疑问,这些数据是在什么时候存入StaticContext中的呢?下文会详细分析。

代码@3、@4:主要检测< dubbo:method oninvoke=""/>配置的正确性,其正确的配置方式如下:“实例名.方法名”,例如:

代码@5:根据发射机制,调用oninvoke中指定的实例的指定方法,注意,这里传入的参数为调用远程RPC服务的参数。

注意:从这里可以看出,如果要实现事件通知,也即在调用远程RPC服务之前,之后、抛出异常时执行回调函数,该回调事件的方法的参数列表需要与被调用服务的参数列表一致。

代码@6、@7,如果在执行调用前方法(oninvoke)事件方法失败,则会同步调用onthrow中定义的方法(如有定义)。关于dubbo:method oninvoke属性的解析以及在什么时候会向StaticContext.getSystemContext()中添加信息,将在下文统一介绍。

源码分析DubboInvoker关于同步异步调用处理

在上文提到FutureFilter#invoke中的第三步调用invoker.invoker方法时,我们应该会有兴趣了解一下真实的invoker是如何处理同步、异步请求的。 我们以dubbo协议DubboInvoker来重点分析一下其实现原理:

DubboInvoker#doInvoke

 1try {
 2            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);            // @1
 3            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
 4            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
 5            if (isOneway) {
 6                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
 7                currentClient.send(inv, isSent);                                             // @2
 8                RpcContext.getContext().setFuture(null);
 9                return new RpcResult();
10            } else if (isAsync) {
11                ResponseFuture future = currentClient.request(inv, timeout);                 // @3
12                RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
13                return new RpcResult();
14            } else {
15                RpcContext.getContext().setFuture(null);     // @4
16                return (Result) currentClient.request(inv, timeout).get();
17            }
18        } catch (TimeoutException e) {
19            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
20        } catch (RemotingException e) {
21            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
22        }

代码@1:首先获取async属性,如果为true表示异步请求,如果配置了return="false"表示调用模式为oneway,只发调用,不关注其调用结果。

代码@2:处理oneway的情况。如果设置了sent=true,表示等待网络数据发出才返回,如果sent=false,只是将待发送数据发到IO写缓存区就返回。

代码@3:处理异步的情况,代码@4处理同步调用的情况,细看其实都是通过调用网络客户端client的request,最终调用HeaderExchangeChannel#request方法:

这里是通过Future模式来实现异步调用的,同步调用也是通过异步调用来实现,只是同步调用发起后,直接调用future#get的方法来同步等待结果的返回,而异步调用只返回Future Response,在用户需要关心其结果时才调用get方法。

asyncCallback与syncCallback

前面介绍了方法执行之前oninvoker事件的调用分析,接下来分析RPC服务调用完成后,onreturn和onthrow方法的调用逻辑。

异步回调与同步回调的区别就是调用onreturn(fireReturnCallback)和onthrow(fireThrowCallback)调用的地方不同,如果是同步调用,也就是在完成RPC服务调用后,立即调用相关的回调方法,如果是异步调用的话,RPC服务完成后,通过Future模式异步执行。其实关于onreturn、onthrow属性的解析,执行与oninvoker属性的解析完全一样,再这里也就不重复介绍了。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-03-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 中间件兴趣圈 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 异步调用与事件回调机制
    • TTY异步回调
      • 事件回调
      • 异步调用与事件回调机制
        • FutureFilter#fireInvokeCallback
          • 源码分析DubboInvoker关于同步异步调用处理
            • asyncCallback与syncCallback
            相关产品与服务
            消息队列 TDMQ
            消息队列 TDMQ (Tencent Distributed Message Queue)是腾讯基于 Apache Pulsar 自研的一个云原生消息中间件系列,其中包含兼容Pulsar、RabbitMQ、RocketMQ 等协议的消息队列子产品,得益于其底层计算与存储分离的架构,TDMQ 具备良好的弹性伸缩以及故障恢复能力。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档