专栏首页luozhiyun的技术学习5.源码分析---SOFARPC调用服务

5.源码分析---SOFARPC调用服务

我们这一次来接着上一篇文章《4. 源码分析---SOFARPC服务端暴露》讲一下服务暴露之后被客户端调用之后服务端是怎么返回数据的。

示例我们还是和上篇文章一样使用一样的bolt协议来讲:

    public static void main(String[] args) {
        ServerConfig serverConfig = new ServerConfig()
                .setProtocol("bolt") // 设置一个协议,默认bolt
                .setPort(12200) // 设置一个端口,默认12200
                .setDaemon(false); // 非守护线程

        ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>()
            .setInterfaceId(HelloService.class.getName()) // 指定接口
            .setRef(new HelloServiceImpl()) // 指定实现
            .setServer(serverConfig); // 指定服务端

        providerConfig.export(); // 发布服务
    }

在Bolt协议下面,当服务端被调用的时候一个服务的流程如下所示: BoltServerProcessor->FilterChain->ProviderExceptionFilter->FilterInvoker->RpcServiceContextFilter->FilterInvoker->ProviderBaggageFilter->FilterInvoker->ProviderTracerFilter->ProviderInvoker

BoltServerProcessor#handleRequest

@Override
public void handleRequest(BizContext bizCtx, AsyncContext asyncCtx, SofaRequest request) {
    // RPC内置上下文
    RpcInternalContext context = RpcInternalContext.getContext();
    context.setProviderSide(true);

    String appName = request.getTargetAppName();
    if (appName == null) {
        // 默认全局appName
        appName = (String) RpcRuntimeContext.get(RpcRuntimeContext.KEY_APPNAME);
    }

    // 是否链路异步化中
    boolean isAsyncChain = false;
    try { // 这个 try-finally 为了保证Context一定被清理
        processingCount.incrementAndGet(); // 统计值加1

        context.setRemoteAddress(bizCtx.getRemoteHost(), bizCtx.getRemotePort()); // 远程地址
        context.setAttachment(RpcConstants.HIDDEN_KEY_ASYNC_CONTEXT, asyncCtx); // 远程返回的通道

        if (RpcInternalContext.isAttachmentEnable()) {
            InvokeContext boltInvokeCtx = bizCtx.getInvokeContext();
            if (boltInvokeCtx != null) {
                putToContextIfNotNull(boltInvokeCtx, InvokeContext.BOLT_PROCESS_WAIT_TIME,
                    context, RpcConstants.INTERNAL_KEY_PROCESS_WAIT_TIME); // rpc线程池等待时间 Long
            }
        }
        if (EventBus.isEnable(ServerReceiveEvent.class)) {
            EventBus.post(new ServerReceiveEvent(request));
        }

        // 开始处理
        SofaResponse response = null; // 响应,用于返回
        Throwable throwable = null; // 异常,用于记录
        ProviderConfig providerConfig = null;
        String serviceName = request.getTargetServiceUniqueName();

        try { // 这个try-catch 保证一定有Response
            invoke:
            {
                if (!boltServer.isStarted()) { // 服务端已关闭
                    throwable = new SofaRpcException(RpcErrorType.SERVER_CLOSED, LogCodes.getLog(
                        LogCodes.WARN_PROVIDER_STOPPED, SystemInfo.getLocalHost() + ":" +
                            boltServer.serverConfig.getPort()));
                    response = MessageBuilder.buildSofaErrorResponse(throwable.getMessage());
                    break invoke;
                }
                if (bizCtx.isRequestTimeout()) { // 加上丢弃超时的请求的逻辑
                    throwable = clientTimeoutWhenReceiveRequest(appName, serviceName, bizCtx.getRemoteAddress());
                    break invoke;
                }
                // 查找服务
                //在server.registerProcessor方法中设置 ProviderProxyInvoker
                Invoker invoker = boltServer.findInvoker(serviceName);
                if (invoker == null) {
                    throwable = cannotFoundService(appName, serviceName);
                    response = MessageBuilder.buildSofaErrorResponse(throwable.getMessage());
                    break invoke;
                }
                if (invoker instanceof ProviderProxyInvoker) {
                    providerConfig = ((ProviderProxyInvoker) invoker).getProviderConfig();
                    // 找到服务后,打印服务的appName
                    appName = providerConfig != null ? providerConfig.getAppName() : null;
                }
                // 查找方法
                String methodName = request.getMethodName();
                //在server.registerProcessor方法中设置
                Method serviceMethod = ReflectCache.getOverloadMethodCache(serviceName, methodName,
                    request.getMethodArgSigs());
                if (serviceMethod == null) {
                    throwable = cannotFoundServiceMethod(appName, methodName, serviceName);
                    response = MessageBuilder.buildSofaErrorResponse(throwable.getMessage());
                    break invoke;
                } else {
                    request.setMethod(serviceMethod);
                }

                // 真正调用
                response = doInvoke(serviceName, invoker, request);

                if (bizCtx.isRequestTimeout()) { // 加上丢弃超时的响应的逻辑
                    throwable = clientTimeoutWhenSendResponse(appName, serviceName, bizCtx.getRemoteAddress());
                    break invoke;
                }
            }
        } catch (Exception e) {
            // 服务端异常,不管是啥异常
            LOGGER.errorWithApp(appName, "Server Processor Error!", e);
            throwable = e;
            response = MessageBuilder.buildSofaErrorResponse(e.getMessage());
        }

        // Response不为空,代表需要返回给客户端
        if (response != null) {
            RpcInvokeContext invokeContext = RpcInvokeContext.peekContext();
            isAsyncChain = CommonUtils.isTrue(invokeContext != null ?
                (Boolean) invokeContext.remove(RemotingConstants.INVOKE_CTX_IS_ASYNC_CHAIN) : null);
            // 如果是服务端异步代理模式,特殊处理,因为该模式是在业务代码自主异步返回的
            if (!isAsyncChain) {
                // 其它正常请求
                try { // 这个try-catch 保证一定要记录tracer
                    asyncCtx.sendResponse(response);
                } finally {
                    if (EventBus.isEnable(ServerSendEvent.class)) {
                        EventBus.post(new ServerSendEvent(request, response, throwable));
                    }
                }
            }
        }
    } catch (Throwable e) {
        // 可能有返回时的异常
        if (LOGGER.isErrorEnabled(appName)) {
            LOGGER.errorWithApp(appName, e.getMessage(), e);
        }
    } finally {
        processingCount.decrementAndGet();
        if (!isAsyncChain) {
            if (EventBus.isEnable(ServerEndHandleEvent.class)) {
                EventBus.post(new ServerEndHandleEvent());
            }
        }
        RpcInvokeContext.removeContext();
        RpcInternalContext.removeAllContext();
    }
}

这个方法主要做了如下几件事:

  1. 设置上下文参数
  2. 从缓存中得到服务暴露时设置的invoker
  3. 为request设置method参数
  4. 调用doInvoke返回response
  5. 将response返回给客户端

BoltServerProcessor#doInvoke

我们直接进入到doInvoke方法中,看是如何生成response对象的。

private SofaResponse doInvoke(String serviceName, Invoker invoker, SofaRequest request) throws SofaRpcException {
    // 开始调用,先记下当前的ClassLoader
    ClassLoader rpcCl = Thread.currentThread().getContextClassLoader();
    try {
        // 切换线程的ClassLoader到 服务 自己的ClassLoader
        ClassLoader serviceCl = ReflectCache.getServiceClassLoader(serviceName);
        Thread.currentThread().setContextClassLoader(serviceCl);
        return invoker.invoke(request);
    } finally {
        Thread.currentThread().setContextClassLoader(rpcCl);
    }
}

这里主要是为了获取缓存里面加载被暴露服务的类加载器,这样可以防止不同的类加载器之间一个类被加载多次。

然后调用过滤器链,最后进入到ProviderInvoker中

ProviderInvoker#invoke

@Override
public SofaResponse invoke(SofaRequest request) throws SofaRpcException {
    SofaResponse sofaResponse = new SofaResponse();
    long startTime = RpcRuntimeContext.now();
    try {
        // 反射 真正调用业务代码
        Method method = request.getMethod();
        if (method == null) {
            throw new SofaRpcException(RpcErrorType.SERVER_FILTER, "Need decode method first!");
        }
        Object result = method.invoke(providerConfig.getRef(), request.getMethodArgs());

        sofaResponse.setAppResponse(result);
    } catch (IllegalArgumentException e) { // 非法参数,可能是实现类和接口类不对应)
        sofaResponse.setErrorMsg(e.getMessage());
    } catch (IllegalAccessException e) { // 如果此 Method 对象强制执行 Java 语言访问控制,并且底层方法是不可访问的
        sofaResponse.setErrorMsg(e.getMessage());
    } catch (InvocationTargetException e) { // 业务代码抛出异常
        cutCause(e.getCause());
        sofaResponse.setAppResponse(e.getCause());
    } finally {
        if (RpcInternalContext.isAttachmentEnable()) {
            long endTime = RpcRuntimeContext.now();
            RpcInternalContext.getContext().setAttachment(RpcConstants.INTERNAL_KEY_IMPL_ELAPSE,
                endTime - startTime);
        }
    }

    return sofaResponse;
}

到最后我们发现,服务端会通过反射调用被暴露服务的方法,封装成Response类返回。

我们再次回到BoltServerProcessor#handleRequest方法中

....//忽略其他内容
// Response不为空,代表需要返回给客户端
if (response != null) {
    RpcInvokeContext invokeContext = RpcInvokeContext.peekContext();
    isAsyncChain = CommonUtils.isTrue(invokeContext != null ?
        (Boolean) invokeContext.remove(RemotingConstants.INVOKE_CTX_IS_ASYNC_CHAIN) : null);
    // 如果是服务端异步代理模式,特殊处理,因为该模式是在业务代码自主异步返回的
    if (!isAsyncChain) {
        // 其它正常请求
        try { // 这个try-catch 保证一定要记录tracer
            asyncCtx.sendResponse(response);
        } finally {
            if (EventBus.isEnable(ServerSendEvent.class)) {
                EventBus.post(new ServerSendEvent(request, response, throwable));
            }
        }
    }
}
....//忽略其他内容

最后我们的response实例会使用netty传给客户端。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 10.源码分析---SOFARPC内置链路追踪SOFATRACER是怎么做的?

    大家在看链路追踪之前可以先去看看官方的这篇文章SOFARPC 链路追踪剖析,有助于理解。

    luozhiyun
  • 源码分析--dubbo服务端暴露

    服务暴露的入口方法是 ServiceBean 的 onApplicationEvent。onApplicationEvent 是一个事件响应方法,该方法会在收到...

    luozhiyun
  • 理解ConcurrentHashMap1.8源码

    其实ConcurrentHashMap我自己已经看过很多遍了,但是今天在面试阿里的时候自己在描述ConcurrentHashMap发现自己根本讲不清楚什么是Co...

    luozhiyun
  • c# 查询本机可用的代理ip

    现在有很多网站都提供免费的代理ip,但是你会发现很多网站显示的可以用的 ,在自己电脑上是用不了,写个小代码提取出自己电脑上可以用的代理,没什么技术含量,只是为了...

    冰封一夏
  • Zookeeper——Watcher原理详解

    Zookeeper引入了Watcher机制来实现分布式数据的发布/订阅功能,使得多个订阅者可以同时监听某一个主题对象,当主题对象自身状态发生改变时,就会通知所有...

    夜勿语
  • okhttp——RetryAndFollowUpInterceptor

    okhttp的网络请求采用interceptors链的模式。每一级interceptor只处理自己的工作,然后将剩余的工作,交给下一级interceptor。本...

    Oceanlong
  • 教你用 Python 生成 GIF 动图 !

    最近啊 ,微信订阅号改变频繁 ,很多读者后台说 :小詹啊 ,我总是容易错过你公号的消息 ,现在没有置顶功能很难过啊 !

    小小詹同学
  • 秒开率达90%:腾讯看点客户端 GIF 转视频优化方案

    导语 |众所周知,在动图场景中, GIF 一直是应用得最广泛的技术,然而 GIF 文件体积太大的劣势,导致了一些诸如客户端 GIF 加载慢、服务器占用带宽大等...

    腾讯大讲堂
  • 52张牌

    有52张牌,使它们全部正面朝上,从第2张开始,凡是2的倍数位置上的牌翻成正面朝下;接着从第3张牌开始,凡是3的倍数位置上的牌,正面朝上的翻成正面朝下,正面朝下的...

    attack
  • 计算机视觉与图像处理学习笔记(一)

    写在前面:因学习需要,本人根据章毓晋的《计算机视觉教程》和冈萨雷斯的《数字图像处理》两本书进行学习,中间会穿插相关实践,会有对opencv的学习,以此笔记记录学...

    triplebee

扫码关注云+社区

领取腾讯云代金券