前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >rpc系列3-支持异步调用,提供future、callback的能力。

rpc系列3-支持异步调用,提供future、callback的能力。

作者头像
topgunviper
发布2022-05-12 14:31:39
4390
发布2022-05-12 14:31:39
举报
文章被收录于专栏:啥都有的专栏啥都有的专栏

支持异步调用,提供future、callback的能力。

在实现新功能之前,先将RpcBuilder重构下,职责分离:

  • RpcConsumer:提供给客户端操作接口
  • RpcProvider:提供给服务端
代码语言:javascript
复制
public final class RpcConsumer implements InvocationHandler{

    private String host;

    private int port;

    private Class<?> interfaceClass;

    private int timeout;

    private static int nThreads = Runtime.getRuntime().availableProcessors() * 2;

    private static ExecutorService handlerPool = Executors.newFixedThreadPool(nThreads);


    public RpcConsumer targetHostPort(String host, int port){
        this.host = host;
        this.port = port;
        return this;
    }
    public RpcConsumer interfaceClass(Class<?> interfaceClass) {
        this.interfaceClass = interfaceClass;
        return this;
    }
    public RpcConsumer timeout(int timeout){
        this.timeout = timeout;
        return this;
    }
    public Object newProxy(){
        return Proxy.newProxyInstance(RpcConsumer.class.getClassLoader(), new Class<?>[]{this.interfaceClass}, this);
    }


    /**
     * 拦截目标方法->序列化method对象->发起socket连接
     */
    @Override
    public Object invoke(Object proxy, Method method,
            Object[] args) throws Throwable {
        Object retVal = null;
        
        RpcRequest request = new RpcRequest(method.getName(), method.getParameterTypes(),args,RpcContext.getAttributes());
        Object response;
        try{
            //网络传输模块分到doInvoke中
            response = doInvoke(request);
        }catch(Exception e){
            throw e;
        }
        if(response instanceof RpcResponse){
            RpcResponse rpcResp  = (RpcResponse)response;
            if(!rpcResp.isError()){
                retVal = rpcResp.getResponseBody();
            }else{
                throw new RpcException(rpcResp.getErrorMsg());
            }
        }
        return retVal;
    }
    private Object doInvoke(RpcRequest request) throws IOException, ClassNotFoundException{
        //创建连接,获取输入输出流
        Socket socket = new Socket(host,port);
        Object retVal = null;
        try{
            ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream());
            ObjectInputStream in = new ObjectInputStream(socket.getInputStream());
            try{
                //发送
                out.writeObject(request);
                //接受server端的返回信息---阻塞
                retVal = in.readObject();

            }finally{
                out.close();
                in.close();
            }
        }finally{
            socket.close();
        }
        return retVal;
    }
}

RpcProvider:

代码语言:javascript
复制
public final class RpcProvider {
    
    
    private static int nThreads = Runtime.getRuntime().availableProcessors() * 2;
    private static ExecutorService handlerPool = Executors.newFixedThreadPool(nThreads);

//发布服务
    public static void publish(final Object service, final int port) throws IOException{
        if (service == null)
            throw new IllegalArgumentException("service can not be null.");

        ServerSocket server = new ServerSocket(port);
        System.out.println("server started!!!");
        while(true){
            Socket socket = server.accept();//监听请求--阻塞
            //异步处理
            handlerPool.submit(new Handler(service,socket));
        }
    }
    static class Handler implements Runnable{

        private Object service;

        private Socket socket;

        public Handler(Object service,Socket socket){
            this.service = service;
            this.socket = socket;
        }
        public void run() {
            try{
                ObjectInputStream in = null;
                ObjectOutputStream out = null;
                RpcResponse response = new RpcResponse();
                try {
                    in = new ObjectInputStream(socket.getInputStream());
                    out = new ObjectOutputStream(socket.getOutputStream());

                    Object req = in.readObject();
                    if(req instanceof RpcRequest){
                        RpcRequest rpcRequest = (RpcRequest)req;
                        //关联客户端传来的上下文
                        RpcContext.context.set(rpcRequest.getContext());
                        Method method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());
                        Object retVal = method.invoke(service, rpcRequest.getArgs());
                        response.setResponseBody(retVal);
                        out.writeObject(response);
                    }
                } catch (InvocationTargetException e) {
                    response.setErrorMsg(e.getTargetException().getMessage());
                    response.setResponseBody(e.getTargetException());
                    out.writeObject(response);
                }finally{
                    in.close();
                    out.close();
                }
            }catch(Exception e){}
        }
    }
}

下面开始考虑如何实现future、callback功能。

谈到异步,我们首先想到了Java提供的Future机制,Future代表一个异步计算结果,提交一个任务后会立刻返回,通过future.get()方法来获取计算结果,该方法会阻塞当前线程,直到结果返回。使用形式如下:

代码语言:javascript
复制
//提交异步任务,立即返回
Future<Object> future = executePool.submit(new Callable<Object>(){
    @Override
    public Object call(){
        //do business
    }
});

//do othre business

Object retVal = future.get();//阻塞,直到计算出结果

思路

rpc中异步方法可以使用Future这个特性。支持异步调用效果和future类似,假设异步方法调用入口:

  • asyncCall(String methodName) 我们再asyncCall方法中构造一个异步任务,其目的就是通过socket将需要调用的方法传给server端,然后等待获取server返回的结果。这个异步任务我们可以直接实现一个FutureTask对象,如下:
代码语言:javascript
复制
FutureTask<RpcResponse> futureTask = new FutureTask<RpcResponse>(new Callable<RpcResponse>(){
        public RpcResponse call() throws Exception {
            //构造RpcRequest对象,发送给server并获取返回结果
             RpcResponse retVal = sendRequest(request);
             return retVal;
        }
    });
    new Thread(futureTask).start();

上面是一种实现方法,不过我这里没有新建Thread,而是直接将任务提交到线程池中,实现如下:

代码语言:javascript
复制
//公用的线程池
private static ExecutorService handlerPool = Executors.newFixedThreadPool(nThreads);

        //构造并提交FutureTask异步任务
Future<RpcResponse> f = (Future<RpcResponse>) handlerPool.submit(new Callable<RpcResponse>(){
        public RpcResponse call() throws Exception {
            //构造RpcRequest对象,发送给server并获取返回结果
             RpcResponse retVal = sendRequest(request);
            return retVal;
            }
        });

异步任务已经构造完毕了,那么异步结果如何获取?

最简单的方式是直接将Future实例返回给客户端即可,客户端通过获取的Future对象,调用相应方法获取异步结果。不过这样话有一个问题,我们获取的RpcResponse对象封装的是server端返回的结果,这个结果可能是我们期望的方法执行返回值,也可能是server端抛出的异常,这个获取结果的过程对用户应该是透明的,即用户进行一次方法调用,如果正常,则返回结果,不正常直接抛出对应的Exception即可,让用户自己通过RpcResponse的isError判断结果是不是异常显然是不合适的,所以这里使用了题目中提供的异步结果获取的一个工具类:ResponseFuture。ResponseFuture的作用就是将上面分析的结果获取过程进行封装,实现如下:

代码语言:javascript
复制
public class ResponseFuture {

    public static ThreadLocal<Future<RpcResponse>> futureThreadLocal = new ThreadLocal<Future<RpcResponse>>();

    public static Object getResponse(long timeout) throws InterruptedException {
        if (null == futureThreadLocal.get()) {
            throw new RuntimeException("Thread [" + Thread.currentThread() + "] have not set the response future!");
        }

        try {
            RpcResponse response =futureThreadLocal.get().get(timeout, TimeUnit.MILLISECONDS);
            //如果是异常,直接抛出
            if (response.isError()) {
                throw new RuntimeException(response.getErrorMsg());
            }
            return response.getResponseBody();
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            throw new RuntimeException("Time out", e);
        }
    }

    public static void setFuture(Future<RpcResponse> future){
        futureThreadLocal.set(future);
    }
}

客户端在进行异步方法调用之后,直接用ResponseFuture.get(timeout)即可获取结果。

异步方法能否多次调用?

考虑这么一个问题,如果客户端异步调用methodA方法,在结果返回之前,客户端能否再次调用methodA呢?显然是不可以!所以每次异步调用的时候,我们需要对异步调用方法进行记录,保证结果返回前只调用一次。保存方法的数据结构也是ThreadLocal实现,如下所示:

代码语言:javascript
复制
    /**
     * 存放当前线程正在执行的异步方法
     */
    private static final ThreadLocal<Set<String>> asyncMethods = new ThreadLocal<Set<String>>(){
        @Override
        public Set<String> initialValue()
        {
            return new LinkedHashSet<String>();
        }
    };

异步调用的的Future能力已经完成,下面考虑下callback如何实现。

同时在异步调用过程中添加callback函数。

题目提供了Callback接口:

代码语言:javascript
复制
public interface ResponseCallbackListener {
    
    public void onResponse(Object response);
    
    public void onTimeout();
    
    public void onException(Exception e);
}

callback的实现其实很简单了,在asyncCall执行过程中在适当的位置执行callback函数,比如抛出异常了,那么执行onException函数,调用超时了,则执行onTimeout函数。

综合上述分析,下面看下asyncCall的整体实现:

代码语言:javascript
复制
public final class RpcConsumer implements InvocationHandler{

    //。。。
    
    private int timeout;

    private static int nThreads = Runtime.getRuntime().availableProcessors() * 2;

    private static ExecutorService handlerPool = Executors.newFixedThreadPool(nThreads);

    /**
     * 存放当前线程正在执行的异步方法
     */
    private static final ThreadLocal<Set<String>> asyncMethods = new ThreadLocal<Set<String>>(){
        @Override
        public Set<String> initialValue()
        {
            return new LinkedHashSet<String>();
        }
    };
    
    public void asynCall(String methodName) {
        asynCall(methodName, null);
    }

    /**
     * 异步方法,支持callback
     *
     * @param methodName
     * @param callbackListener
     */
    public <T extends ResponseCallbackListener> void asynCall(final String methodName, T callbackListener) {
        //记录异步方法调用
        asyncMethods.get().add(methodName);

        //构造并提交FutureTask异步任务
        Future<RpcResponse> f = (Future<RpcResponse>) handlerPool.submit(new Callable<RpcResponse>(){
            @Override
            public RpcResponse call() throws Exception {
                RpcRequest request = new RpcRequest(methodName,null,null,RpcContext.getAttributes());
                Object response;
                try{
                    response = doInvoke(request);
                }catch(Exception e){
                    throw e;
                }
                return (RpcResponse) response;
            }
        });
        
        RpcResponse response;
        if(callbackListener != null){
            try {
                //阻塞
                response = (RpcResponse) f.get(timeout,TimeUnit.MILLISECONDS);
                if(response.isError()){
                    //执行回调方法
                    callbackListener.onException(new RpcException(response.getErrorMsg()));
                }else{
                    callbackListener.onResponse(response.getResponseBody());
                }
            } catch(TimeoutException e){
                callbackListener.onTimeout();
            }catch (Exception e) {}
        }else{
            //client端将从ResponseFuture中获取结果
            ResponseFuture.setFuture(f);
        }
    }
    public void cancelAsyn(String methodName) {
        asyncMethods.get().remove(methodName);
    }
    
    @Override
    public Object invoke(Object proxy, Method method,
            Object[] args) throws Throwable {
        //如果是异步方法,立即返回null
        if(asyncMethods.get().contains(method.getName())) return null;
        //。。。。
    }

future功能测试代码:

代码语言:javascript
复制
@Test
public void testAsyncCall(){
    consumer.asynCall("test");//测试future能力
    //立即返回
    String nullValue = userService.test();
    System.out.println(nullValue);
    Assert.assertEquals(null, nullValue);
    try {
        String result = (String) ResponseFuture.getResponse(TIMEOUT);
        Assert.assertEquals("hello client, this is rpc server.", result);
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        consumer.cancelAsyn("test");
    }
}

callback测试:

自定义ResponseCallbackListener实现类UserServiceListener:

代码语言:javascript
复制
public class UserServiceListener implements ResponseCallbackListener {
    private CountDownLatch latch = new CountDownLatch(1);
    
    private Object response;

    public Object getResponse() throws InterruptedException {
        latch.await(10, TimeUnit.SECONDS);
        if(response == null)
            throw new RuntimeException("The response doesn't come back.");
        return response;
    }
    @Override
    public void onResponse(Object response) {
        System.out.println("This method is call when response arrived");
        this.response = response;
        latch.countDown();
    }

    @Override
    public void onTimeout() {
        throw new RuntimeException("This call has taken time more than timeout value");
    }

    @Override
    public void onException(Exception e) {
        throw new RuntimeException(e);
    }
}

ClientTest中测试代码:

代码语言:javascript
复制
    @Test
    public void testCallback() {
        UserServiceListener listener = new UserServiceListener();
        consumer.asynCall("test", listener);
        String nullStr = userService.test();//立刻返回null
        Assert.assertEquals(null, nullStr);
        try {
            String str = (String)listener.getResponse();
            Assert.assertEquals("hello client, this is rpc server.", str);
        } catch (InterruptedException e) {
        }
    }

输出:

This method is call when response arrived

好了,到此支持异步调用,提供future、callback的能力,基本实现,当然实现过程肯定还有很多改进的地方,不断学习,不断进步!!!

github上完整源码

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-05-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 支持异步调用,提供future、callback的能力。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档