在RPC中服务提供者和服务消费者都可以抽象成一个节点,节点包含了创建,销毁,节点描述信息,和节点链接,节点状态等。
public interface Node { void init(); void destroy(); boolean isAvailable(); String desc(); URL getUrl();
上面只是一个节点的通用属性描述,按照DDD的说法他只是包含了自己的原始能力。
在社会中每个对象被赋予不同的社会使命,RPC中一个消费者节点的社会使命是发起请求获取响应:
public interface Caller<T> extends Node { Class<T> getInterface(); Response call(Request request);
}public interface Provider<T> extends Caller<T> { Class<T> getInterface(); Method lookupMethod(String methodName, String methodDesc); T getImpl();
}
...
一个rpc的请求,我们通常需要上下文,将一些rpc请求信息进行传递。 一般通过threadLocal传递上下文:
private static final ThreadLocal<RpcContext> localContext = new ThreadLocal<RpcContext>() { @Override
protected RpcContext initialValue() { return new RpcContext();
}
};
这样在一个request过来是我们可以将请求放入上下文:
public static RpcContext init(Request request) {
RpcContext context = new RpcContext(); if (request != null) {
context.setRequest(request);
context.setClientRequestId(request.getAttachments().get(URLParamType.requestIdFromClient.getName()));
}
localContext.set(context); return context;
}
同样,当请求结束后,我们需要销毁上下文信息:
public static void destroy() {
localContext.remove();
}
当方法执行异常,或者链接trace时我们可以从上下文拿到requestId放到我们的trace系统保存起来:
RpcContext.getContext().getRequestId()
我们在请求过程中需要对整个请求数据进行采集,比如创建Filter进行请求采集:
private static ConcurrentHashMap<String, StatInfo> serviceStat = new ConcurrentHashMap<String, RpcStats.StatInfo>();public class ActiveLimitFilter implements Filter { @Override
public Response filter(Caller<?> caller, Request request) {
RpcStats.beforeCall(caller.getUrl(), request); try{
RpcStats.afterCall(caller.getUrl, request, true, time);
}
}
}
在请求和响应之间状态通知可以采用Object.Wait和Object.Notify实现。 并通过命令模式唤醒监听者。
private void notifyListeners() { if (listeners != null) { for (FutureListener listener : listeners) {
notifyListener(listener);
}
}
}
在值获取时,我们采用一个死循环进行请求结果返回,如果请求超时,则抛出异常,而这个超时时间,则是通过服务配置实现的。
public Object getValue() {
synchronized (lock) { if (!isDoing()) { return getValueOrThrowable();
} if (timeout <= 0) { try { lock.wait();
} catch (Exception e) {
cancel(new MotanServiceException(this.getClass().getName() + " getValue InterruptedException : "
+ MotanFrameworkUtil.toString(request) + " cost=" + (System.currentTimeMillis() - createTime), e));
} return getValueOrThrowable();
} else { long waitTime = timeout - (System.currentTimeMillis() - createTime); if (waitTime > 0) { for (; ; ) { try { lock.wait(waitTime);
} catch (InterruptedException e) {
} if (!isDoing()) { break;
} else {
waitTime = timeout - (System.currentTimeMillis() - createTime); if (waitTime <= 0) { break;
}
}
}
} if (isDoing()) {
timeoutSoCancel();
}
} return getValueOrThrowable();
}
}
我们在进行rpc请求时,在Request参数中会传递通过代理访问的目标类,我们在AbstractProvider声明一个抽象方法,在具体实现类进行实现:
protected abstract Response invoke(Request request);
通过反射进行方法调用,invoke方法如下:
@Override public Response invoke(Request request) {
DefaultResponse response = new DefaultResponse();
Method method = lookupMethod(request.getMethodName(), request.getParamtersDesc()); if (method == null) {
MotanServiceException exception = new MotanServiceException("Service method not exist: " + request.getInterfaceName() + "." + request.getMethodName()
+ "(" + request.getParamtersDesc() + ")", MotanErrorMsgConstant.SERVICE_UNFOUND);
response.setException(exception); return response;
} try {
Object value = method.invoke(proxyImpl, request.getArguments());
response.setValue(value);
} catch (Exception e) {
}
// 传递rpc版本和attachment信息方便不同rpc版本的codec使用。
response.setRpcProtocolVersion(request.getRpcProtocolVersion());
response.setAttachments(request.getAttachments()); return response;
}
方法及描述放在map中:
protected Map<String, Method> methodMap = new HashMap<String, Method>();
获取方法:
public Method lookupMethod(String methodName, String methodDesc) {
Method method = null;
String fullMethodName = ReflectUtil.getMethodDesc(methodName, methodDesc);
method = methodMap.get(fullMethodName); if (method == null && StringUtils.isBlank(methodDesc)) {
method = methodMap.get(methodName); if (method == null) {
method = methodMap.get(methodName.substring(0, 1).toLowerCase() + methodName.substring(1));
}
} return method;
}
© 著作权归作者所有