前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Motan源码阅读--设计一个RPC

Motan源码阅读--设计一个RPC

作者头像
春哥大魔王
发布2018-09-21 11:20:04
5990
发布2018-09-21 11:20:04
举报
文章被收录于专栏:服务端技术杂谈

节点

在RPC中服务提供者和服务消费者都可以抽象成一个节点,节点包含了创建,销毁,节点描述信息,和节点链接,节点状态等。

代码语言:javascript
复制
public interface Node {    void init();    void destroy();    boolean isAvailable();    String desc();    URL getUrl();

上面只是一个节点的通用属性描述,按照DDD的说法他只是包含了自己的原始能力。

在社会中每个对象被赋予不同的社会使命,RPC中一个消费者节点的社会使命是发起请求获取响应:

代码语言:javascript
复制
public interface Caller<T> extends Node {    Class<T> getInterface();    Response call(Request request);
}public interface Provider<T> extends Caller<T> {    Class<T> getInterface();    Method lookupMethod(String methodName, String methodDesc);    T getImpl();
}

...

上下文

一个rpc的请求,我们通常需要上下文,将一些rpc请求信息进行传递。 一般通过threadLocal传递上下文:

代码语言:javascript
复制
private static final ThreadLocal<RpcContext> localContext = new ThreadLocal<RpcContext>() {        @Override
        protected RpcContext initialValue() {            return new RpcContext();
        }
    };

这样在一个request过来是我们可以将请求放入上下文:

代码语言:javascript
复制
public static RpcContext init(Request request) {
        RpcContext context = new RpcContext();        if (request != null) {
            context.setRequest(request);
            context.setClientRequestId(request.getAttachments().get(URLParamType.requestIdFromClient.getName()));
        }
        localContext.set(context);        return context;
    }

同样,当请求结束后,我们需要销毁上下文信息:

代码语言:javascript
复制
public static void destroy() {
        localContext.remove();
    }

当方法执行异常,或者链接trace时我们可以从上下文拿到requestId放到我们的trace系统保存起来:

代码语言:javascript
复制
RpcContext.getContext().getRequestId()

请求状态

我们在请求过程中需要对整个请求数据进行采集,比如创建Filter进行请求采集:

代码语言:javascript
复制
private static ConcurrentHashMap<String, StatInfo> serviceStat = new ConcurrentHashMap<String, RpcStats.StatInfo>();public class ActiveLimitFilter implements Filter {    @Override
    public Response filter(Caller<?> caller, Request request) {
        RpcStats.beforeCall(caller.getUrl(), request);		try{
			RpcStats.afterCall(caller.getUrl, request, true, time);
		}
    }
}

响应状态

在请求和响应之间状态通知可以采用Object.Wait和Object.Notify实现。 并通过命令模式唤醒监听者。

代码语言:javascript
复制
private void notifyListeners() {        if (listeners != null) {            for (FutureListener listener : listeners) {
                notifyListener(listener);
            }
        }
    }

在值获取时,我们采用一个死循环进行请求结果返回,如果请求超时,则抛出异常,而这个超时时间,则是通过服务配置实现的。

代码语言:javascript
复制
public Object getValue() {
        synchronized (lock) {            if (!isDoing()) {                return getValueOrThrowable();
            }            if (timeout <= 0) {                try {                    lock.wait();
                } catch (Exception e) {
                    cancel(new MotanServiceException(this.getClass().getName() + " getValue InterruptedException : "
                            + MotanFrameworkUtil.toString(request) + " cost=" + (System.currentTimeMillis() - createTime), e));
                }                return getValueOrThrowable();
            } else {                long waitTime = timeout - (System.currentTimeMillis() - createTime);                if (waitTime > 0) {                    for (; ; ) {                        try {                            lock.wait(waitTime);
                        } catch (InterruptedException e) {
                        }                        if (!isDoing()) {                            break;
                        } else {
                            waitTime = timeout - (System.currentTimeMillis() - createTime);                            if (waitTime <= 0) {                                break;
                            }
                        }
                    }
                }                if (isDoing()) {
                    timeoutSoCancel();
                }
            }            return getValueOrThrowable();
        }
    }

获取request的目标方法

我们在进行rpc请求时,在Request参数中会传递通过代理访问的目标类,我们在AbstractProvider声明一个抽象方法,在具体实现类进行实现:

代码语言:javascript
复制
protected abstract Response invoke(Request request);

通过反射进行方法调用,invoke方法如下:

代码语言:javascript
复制
@Override    public Response invoke(Request request) {
        DefaultResponse response = new DefaultResponse();

        Method method = lookupMethod(request.getMethodName(), request.getParamtersDesc());        if (method == null) {
            MotanServiceException exception =                    new MotanServiceException("Service method not exist: " + request.getInterfaceName() + "." + request.getMethodName()
                            + "(" + request.getParamtersDesc() + ")", MotanErrorMsgConstant.SERVICE_UNFOUND);

            response.setException(exception);            return response;
        }        try {
            Object value = method.invoke(proxyImpl, request.getArguments());
            response.setValue(value);
        } catch (Exception e) {
		
		}		
        // 传递rpc版本和attachment信息方便不同rpc版本的codec使用。
        response.setRpcProtocolVersion(request.getRpcProtocolVersion());
        response.setAttachments(request.getAttachments());        return response;
    }

方法及描述放在map中:

代码语言:javascript
复制
protected Map<String, Method> methodMap = new HashMap<String, Method>();

获取方法:

代码语言:javascript
复制
public Method lookupMethod(String methodName, String methodDesc) {
        Method method = null;
        String fullMethodName = ReflectUtil.getMethodDesc(methodName, methodDesc);
        method = methodMap.get(fullMethodName);        if (method == null && StringUtils.isBlank(methodDesc)) {
            method = methodMap.get(methodName);            if (method == null) {
                method = methodMap.get(methodName.substring(0, 1).toLowerCase() + methodName.substring(1));
            }
        }        return method;
    }

© 著作权归作者所有

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2018-09-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 春哥talk 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 节点
  • 上下文
  • 请求状态
  • 响应状态
  • 获取request的目标方法
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档