为了更方便的使用netty。sofa-bolt基于netty进行了自己的封装,因此通过sofa-bolt,可以更好的了解服务端和客户端的交互流程。因此这里选择了sofa-bolt进行了学习。sofa-bolt中加入增加test测试类,进行流程执行测试。
自定义客户端:创建RpcServer实例,指定监听port,调用服务端的rpcServer,同时注册用户处理器,执行初始化,进行绑定:
public class MyServer {
public static boolean start() {
RpcServer server = new RpcServer(8899);
server.registerUserProcessor(new MyServerUserProcessor());
return server.start();
}
public static void main(String[] args) {
if (MyServer.start()) {
System.out.println("server start success!");
} else {
System.out.println("server start fail!");
}
}
}
自定义用户处理器:继承同步用户处理器,拿到响应,返回响应
public class MyServerUserProcessor extends SyncUserProcessor<MyRequest> {
@Override
public Object handleRequest(BizContext bizCtx, MyRequest request) throws Exception {
MyResponse response = new MyResponse();
if (request != null) {
System.out.println(request);
System.out.println("---------此时的请求----------"+request);
System.out.println("----调用自己定义的处理器MyServerUserProcessor------");
response.setResp("来自服务端的 -> " + request.getReq());
}
return response;
}
@Override
public String interest() {
System.out.println("----感兴趣的业务,获取处理器名称----------"+MyRequest.class.getName());
return MyRequest.class.getName();
}
}
客户端
public class MyClient {
private static RpcClient client;
public static void start() {
client = new RpcClient();
client.init();
}
public static void main(String[] args) throws RemotingException, InterruptedException {
MyClient.start();
// 构造请求体
MyRequest request = new MyRequest();
request.setReq("hello, bolt-server");
MyResponse response = (MyResponse)client.invokeSync("127.0.0.1:8899", request, 50 * 100000);
System.out.println("------客户端拿到响应信息MyResponse------");
System.out.println(response);
}
}
其中MyRequest和MyResponse里面的req和res是请求参数,是一个字符串类型。
有了上面的测试例子,我们可以基于上面的测试例子进行测试。
在只启动服务端时,联系一下netty会执行什么操作?
netty会执行OP_ACCEPT操作。这个过程经过了从0到16的这个过程,也即首先需要拿到端口,进行绑定,然后执行OP_ACCEPT操作。因此这里会首先执行OP_ACCEPT操作,然后执行OP_READ事件,执行读或者写操作。服务端:
----注册业务处理器registerUserProcessor-------------
----感兴趣的业务,获取处理器名称----------com.alipay.remoting.mytest.MyRequest
----感兴趣的业务,获取处理器名称----------com.alipay.remoting.mytest.MyRequest
---------执行初始化操作doInit()-----------
-----------------启动用户处理器userProcessor.startup()-----------
-----------执行绑定操作bind-----------------
server start success!
此时会执行一个前置处理器,将感兴趣的事件和处理器注册进业务处理器中。
UserProcessor<?> preProcessor = userProcessors.putIfAbsent(processor.interest(), processor);
启动完成后,会等待客户端的启动和请求,则会执行对应的业务处理器。客户端:
---------任务扫描服务启动this.taskScanner.startup()
----在业务处理器启动后,使用同步执行处理 ---this.rpcRemoting.invokeSync---------------
-----执行RpcRemoting的this.invokeSync()-----------
-------------执行写操作读事件(ResponseCommand)super.invokeSync(conn, requestCommand, timeoutMillis)----------------
---------------执行writeAndFlush操作同时添加监听conn.getChannel().writeAndFlush(request).addListener()-------------
-----等待响应future.waitResponse()---------------
此时会等待服务端的响应future,如果没有的话,会执行响应。当我在处理命令中增加断点的时候,可以看到其先在服务端中,服务端执行了:
----添加rpcHandler处理器pipeline.addLast("handler", rpcHandler)-----------
----------创建连接createConnection---------------
----------触发用户事件 触发连接事件fireUserEventTriggered---------------
---通过协议拿到命令处理,进行命令处理handleCommand-------
此时在断点中可以看到这个过程是一个轮询的过程:
---------处理命令handleCommand------------
-------进行process操作--------
---- 执行executor.execute(new ProcessTask(ctx, cmd))----
---通过协议拿到命令处理,进行命令处理handleCommand-------
---------处理命令handleCommand------------
--------执行处理process-----------
-------进行process操作--------
-----创建processTask-----
-----执行Task,使用默认执行器处理 defaultExecutor.execute(task)-----
--------执行处理process-----------
-------进行process操作--------
-----创建processTask-----
-----执行Task,使用默认执行器处理 defaultExecutor.execute(task)-----
--------执行处理process-----------
-------进行process操作--------
-----创建processTask-----
-----执行Task,使用默认执行器处理 defaultExecutor.execute(task)-----
------执行处理操作doPress-------
------执行处理操作doPress-------
------执行处理操作doPress-------
----添加rpcHandler处理器pipeline.addLast("handler", rpcHandler)-----------
----------创建连接createConnection---------------
----------触发用户事件 触发连接事件fireUserEventTriggered---------------
---通过协议拿到命令处理,进行命令处理handleCommand-------
---------处理命令handleCommand------------
--------执行处理process-----------
-------进行process操作--------
---- 执行executor.execute(new ProcessTask(ctx, cmd))----
--------执行处理process-----------
-------进行process操作--------
-----创建processTask-----
-----执行Task,使用默认执行器处理 defaultExecutor.execute(task)-----
----添加rpcHandler处理器pipeline.addLast("handler", rpcHandler)-----------
----------创建连接createConnection---------------
----------触发用户事件 触发连接事件fireUserEventTriggered---------------
------执行处理操作doPress-------
---通过协议拿到命令处理,进行命令处理handleCommand-------
---------处理命令handleCommand------------
--------执行处理process-----------
-------进行process操作--------
---- 执行executor.execute(new ProcessTask(ctx, cmd))----
--------执行处理process-----------
-------进行process操作--------
-----创建processTask-----
-----执行Task,使用默认执行器处理 defaultExecutor.execute(task)-----
--------执行处理process-----------
-------进行process操作--------
-----创建processTask-----
-----执行Task,使用默认执行器处理 defaultExecutor.execute(task)-----
--------执行处理process-----------
-------进行process操作--------
-----创建processTask-----
服务端如果没有等待客户端段的响应的时候,会一直轮询下去,等待超时,而当我将断点跳过的时候,客户端会执行:
-------进行process操作--------
-----创建processTask-----
-----执行Task,使用默认执行器处理 defaultExecutor.execute(task)-----
------执行处理操作doPress-------
拿到响应信息,此时在客户端可以看到服务端响应的信息:
-------------此时response不为空,进行返回-------com.alipay.remoting.rpc.protocol.RpcResponseCommand@6d23017e
----响应对象RpcResponseResolver.resolveResponseObject -------------------
------客户端拿到响应信息MyResponse------
来自服务端的 -> hello, bolt-server
可以我们可以总结服务端和客户端之间的通信过程:
服务端:
服务端
简单总结一下服务端:
首先启动netty服务,注册业务处理器,然后等待客户端启动的服务和进行process操作,执行后,执行处理操作doPress,此时会将转发到业务处理器进行业务处理dispatchToUserProcessor。然后执行业务处理器handlerRequest,执行业务处理,调用自己定义的处理器MyServerUserProcessor。
客户端执行流程:
客户端
客户端总结:
此时客户端会收到服务器端发过来的响应信息。客户端拿到响应信息MyResponse,进行展示,一次完整的客户端调用结束。
也即从中,我们可以看到sofa-bolt对netty进行了一层自己的封装,在原来我们使用的handler的层面上,增加了一层userProcessor的业务处理器封装,同时对于事件的处理采用转发的操作来完成了服务端对客户端业务的处理。这个过程中,为了更方便的使用netty。同时提供了四种调用的发送方式:oneWay、sync、callback、future四种方式。处理业务封装了自己的线程池。