前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >rpc系列4-处理超时场景.及提供hook

rpc系列4-处理超时场景.及提供hook

作者头像
topgunviper
发布2022-05-12 14:31:57
3210
发布2022-05-12 14:31:57
举报
文章被收录于专栏:啥都有的专栏啥都有的专栏

问题:客户端发起远程调用,如果服务端长时间不返回怎么办?

这就涉及到一个调用超时的问题,平时我们应用中很多场景都会规定超时时间,比如:sql查询超时,http请求超时等。那么如果服务端方法执行的时间超过规定的timeout时间,那么客户端就需要调出当前调用,抛出TimeoutException。

好了,下面开始对RpcBuidler进行改造了,让其支持超时情况的处理。同样,先给出预期的测试方案和结果:

代码语言:javascript
复制
// 业务类UserService在之前的基础上增加超时调用的方法:
public interface UserService {
    
    // other method
    
    /**
     * 超时测试
     */
    public boolean timeoutTest();
    
}
//实现类
public class UserServiceImpl implements UserService {
    
     // other method
    
    @Override
    public boolean timeoutTest() {
        try {
            //模拟长时间执行
            Thread.sleep(10 * 1000);
        } catch (InterruptedException e) {}
        return true;
    }
}

ClientTest中测试代码:

代码语言:javascript
复制
    @Test
    public void timeoutTest(){
        long beginTime = System.currentTimeMillis();
        try {
            boolean result = userService.timeoutTest(); 
        } catch (Exception e) {
            long period = System.currentTimeMillis() - beginTime;
            System.out.println("period:" + period);
            Assert.assertTrue(period < 3100);
        }
    }

有了异步方法的实现经验,其实这个超时处理过程和异步非常类似,都是利用Future机制来实现的,下面对doInvoke方法进行重构,返回一个异步任务:

代码语言:javascript
复制
    private Future<RpcResponse> doInvoke(final RpcRequest request) throws IOException, ClassNotFoundException{

        //构造并提交FutureTask异步任务
        Future<RpcResponse> retVal = (Future<RpcResponse>) handlerPool.submit(new Callable<RpcResponse>(){
            @Override
            public RpcResponse call() throws Exception {
                Object res = null;
                try{
                    //创建连接,获取输入输出流
                    Socket socket = new Socket(host,port);
                    try{
                        ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream());
                        ObjectInputStream in = new ObjectInputStream(socket.getInputStream());
                        try{
                            //发送
                            out.writeObject(request);
                            //接受server端的返回信息---阻塞
                            res = in.readObject();
                        }finally{
                            out.close();
                            in.close();
                        }
                    }finally{
                        socket.close();
                    }
                }catch(Exception e){
                    throw e;
                }
                return (RpcResponse)res;
            }
        });
        return retVal;
    }

回调方法invoke修改如下:

代码语言:javascript
复制
    @Override
    public Object invoke(Object proxy, Method method,
            Object[] args) throws Throwable {
        //如果是异步方法,立即返回null
        if(asyncMethods.get().contains(method.getName())) return null;
        Object retVal = null;

        RpcRequest request = new RpcRequest(method.getName(), method.getParameterTypes(),args,RpcContext.getAttributes());
        RpcResponse rpcResp  = null;
        try{
            Future<RpcResponse> response = doInvoke(request);
            //获取异步结果
            rpcResp  = (RpcResponse)response.get(TIMEOUT,TimeUnit.MILLISECONDS);
        }catch(TimeoutException e){
            throw e;
        }catch(Exception e){}
        
        if(!rpcResp.isError()){
            retVal = rpcResp.getResponseBody();
        }else{
            throw new RpcException(rpcResp.getErrorMsg());
        }
        return retVal;
    }

可见,经过这样改造后,所有的方法调用都是通过Future获取结果。

提供Hook,让开发人员进行RPC层面的AOP。

首先看下题目提供的Hook接口:

代码语言:javascript
复制
public interface ConsumerHook {
    public void before(RpcRequest request);
    public void after(RpcRequest request);
}
//实现类
public class UserConsumerHook implements ConsumerHook{
    @Override
    public void before(RpcRequest request) {
        RpcContext.addAttribute("hook key","this is pass by hook");
    }

    @Override
    public void after(RpcRequest request) {
        System.out.println("I have finished Rpc calling.");
    }
}

hook实现的功能很简单,即在客户端进行远程调用的前后执行before和after方法。

代码语言:javascript
复制
public final class RpcConsumer implements InvocationHandler{

    //。。。
    
    //钩子
    private ConsumerHook hook;

    public RpcConsumer hook(ConsumerHook hook){
        this.hook = hook;
        return this;
    }

static{
        userService = (UserService)consumer.targetHostPort(host, port)
                            .interfaceClass(UserService.class)
                            .timeout(TIMEOUT)
                            .hook(new UserConsumerHook())//新增钩子
                            .newProxy();
    }
//。。。
}

//UserServiceImpl中的测试方法
public Map<String, Object> getMap() {
        Map<String,Object> newMap = new HashMap<String,Object>();
        newMap.put("name","getMap");
        newMap.putAll(RpcContext.getAttributes());
        return newMap;
}

我们只需要在doInvoke方法开始出添加钩子函数的执行逻辑即可。如下:

代码语言:javascript
复制
    private Future<RpcResponse> doInvoke(final RpcRequest request) throws IOException, ClassNotFoundException{
        //插入钩子
        hook.before(request);
        //。。。
}

同时在asyncCall和invoke方法的结束添加after的执行逻辑。具体实现可以看源码。

github附上源码

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-05-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 问题:客户端发起远程调用,如果服务端长时间不返回怎么办?
  • 提供Hook,让开发人员进行RPC层面的AOP。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档