Dubbo作为一款高性能Java RPC框架,RPC是其最重要的流程之一。Dubbo RPC涉及到consumer端和provider端的流程,本文主要分析consumer端的RPC流程实现,包括集群容错、dubbo路由、负载均衡、Filter处理链、DubboInvoker和RPC结果返回等流程。
在分析dubbo consumer端的RPC实现之前,首先来看下dubbo的整体架构,有个整体概念。
dubbo架构图如下:
注意,dubbo服务调用连接是长连接,dubbo服务调用是小数据量的通信,针对每一次RPC通信,都会生成一个唯一的id来标识,这样就能区分出一次RPC请求对应的RPC响应了。
由于RPC流程涉及consumer和provider端,先来看一下在二者之间RPC流程的线程模型图,有个初步认识:
consumer端的Dubbo业务线程池,可以是cached或者fixed类型的线程池,该线程的业务逻辑主要是读取返回结果,然后响应对应defaultFuture,默认是cached类型线程池。线程池配置可以通过SPI方式来配置。provider端的Dubbo业务线程池,默认是fixed类型线程池。
以如下consumer端代码为例开始进行讲解:
DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxy
while (true) {
try {
String hello = demoService.sayHello("world"); // call remote method
System.out.println(hello); // get result
System.in.read();
} catch (Throwable throwable) {
throwable.printStackTrace();
}
}
当consumer端调用一个@Reference
的RPC服务,在consumer端的cluster层首先从Driectory
中获取invocation对应的invokerList,经过Router
过滤符合路由策略的invokerList,然后执行LoadBalance
,选择出某个Invoker,最后进行RPC调用操作。
调用某个Invoker(经过cluter之后)进行RPC时,依次会经过Filter、DubboInvoker、HeaderExchangeClient,将RPC消息类RPCInvocation传递到netty channel.eventLoop中。
最后由netty Channel经过Serializer之后将RPC请求发送给provider端。
从上面的RPC执行流程看出,一个重要的流程是集群容错Cluster
,Dubbo提供了多种容错方案,默认模式为Failover Cluster,也就是失败重试。目前dubbo支持的集群容错策略如下:
Directory是RPC服务类的目录服务,一个服务接口对应一个Directory实例,比如com.xxx.xx.dubbo.api.HelloService
就是一个服务接口。
public interface Directory<T> extends Node {
Class<T> getInterface();
List<Invoker<T>> list(Invocation invocation) throws RpcException;
}
Directory有2个实现类,一个是StaticDirectory
,一个是RegistryDirectory
。前者是静态类型,其内部的Invocation在初始化时就已确定(public StaticDirectory(URL url, List<Invoker<T>> invokers, List<Router> routers)
,运行过程中不再变化;后者是动态类型,实现了接口NotifyListener,notify时动态更新invokers。Directory的重点在于list(invocation)和notify
更新机制,list(invocation)
就是获取invokerList过程。
Router是RPC的路由策略,通过Directory获取到invokerList
之后,会执行对应的路由策略。Dubbo的默认路由策略是MockInvokersSelector。Dubbo路由策略接口是Router,其有3个实现类,Router的作用就是根据invocation和invokerList,选择出符合路由策略的invokerList。
LoadBalance是RPC的负载均衡策略,通过Directory获取到invokerList并且执行对应的路由策略之后,就会执行LoadBalance(负载均衡)了。
@SPI(RandomLoadBalance.NAME)
public interface LoadBalance {
/**
* select one invoker in list.
*/
@Adaptive("loadbalance")
<T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
}
filter处理机制使用的是调用链模式,启动流程中会初始化该filter链,对应逻辑是ProtocolFilterWrapper.buildInvokerChain()
方法,filter链默认包括几个filter,依次是ConsumerContextFilter(设置上下文信息)、FutureFilter(执行某些hook方法)和MonitorFilter(monitor RPC统计信息)等。
DubboInvoker的主要逻辑就是从provider的长连接中选择某个连接,然后根据不同的策略(同步/异步/单向)来进行操作。
// DubboInvoker
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
// consumer和provider默认保持一个长连接
currentClient = clients[0];
} else { // 如果有多个长连接,则使用轮训方式选择某个连接
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
// 默认isAsync为false,isOneWay为false
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isAsyncFuture = RpcUtils.isReturnTypeFuture(inv);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (isOneway) {
// 单向发送,不管结果,在日志收集中可能会用到该模式
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
// 异步模式,真正的RPC结果处理在ResponseCallback中来做,
// ResponseCallback在FutureAdapter中设置
ResponseFuture future = currentClient.request(inv, timeout);
// For compatibility
FutureAdapter<Object> futureAdapter = new FutureAdapter<>(future);
RpcContext.getContext().setFuture(futureAdapter);
Result result;
if (isAsyncFuture) {
// register resultCallback, sometimes we need the async result being processed by the filter chain.
result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
} else {
result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
}
return result;
} else {
// 同步模式,带超时时间,默认1s
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
} catch (RemotingException e) {
}
}
注意,dubbo 2.7版本的DubboInvoker.doInvoke流程已和上述流程不太一样了,不过实现思路是类似的。
最后会调用channel.writeAndFlush,之后的流程就是netty channel内部的处理流程了,这部分暂可不关注,只需要知道后续流程会走到我们设定的NettyHandler中对应的方法中,比如channel.write就会走到NettyHandler.writeRequested方法中逻辑,也就是针对RPC请求数据进行序列化操作。
数据序列化操作是由netty ChannelHandler来处理的,对应的初始化逻辑如下:
bootstrap.handler(new ChannelInitializer() {
@Override
protected void initChannel(Channel ch) throws Exception {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder()) // encoder编码
.addLast("handler", nettyClientHandler);
}
});
在consumer与provider建立连接之后,initChannel是会添加对应的encoder、decoder。
接收到provider端返回的RPC结果进行反序列化之后,就该将结果数据提交到consuemr端dubbo业务线程池了,如下所示:
// NettyClientHandler msg是Response类型
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
handler.received(channel, msg);
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
// .. -> AllChannelHandler
public void received(Channel channel, Object message) throws RemotingException {
// 响应结果反序列化后,就会提交任务给DubboClientHandler线程池(cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED))),
// 进行后续的结果处理操作
ExecutorService cexecutor = getExecutorService();
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
}
DubboClientHandler线程池里的逻辑比较简单,首先根据response.getId()获取从FUTURES(Map)中获取该次通信对应的DefaultFuture,将response设置到DefaultFuture中并唤醒等待的线程。
// HeaderExchangeHandler
public static void received(Channel channel, Response response) {
try {
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
}
} finally {
CHANNELS.remove(response.getId());
}
}
当唤醒在DefaultFuture阻塞的线程(也就是业务线程)之后,也就是以下代码返回了:
DubboInvoker.doInvoke return (Result) currentClient.request(inv, timeout).get();
获取到Response之后,就获取到了provider返回结果,也就是整个RPC的consumer端流程结束了。