3. 源码分析---SOFARPC客户端服务调用

我们首先看看BoltClientProxyInvoker的关系图

所以当我们用BoltClientProxyInvoker#invoke的时候实际上是调用了父类的invoke方法 ClientProxyInvoker#invoke

    @Override
    public SofaResponse invoke(SofaRequest request) throws SofaRpcException {
        SofaResponse response = null;
        Throwable throwable = null;
        try {
            RpcInternalContext.pushContext();
            RpcInternalContext context = RpcInternalContext.getContext();
            context.setProviderSide(false);
            // 包装request请求
            decorateRequest(request);
            try {
                // 产生开始调用事件
                if (EventBus.isEnable(ClientStartInvokeEvent.class)) {
                    EventBus.post(new ClientStartInvokeEvent(request));
                }
                // 得到结果
                response = cluster.invoke(request);
            } catch (SofaRpcException e) {
                throwable = e;
                throw e;
            } finally {
                // 产生调用结束事件
                if (!request.isAsync()) {
                    if (EventBus.isEnable(ClientEndInvokeEvent.class)) {
                        EventBus.post(new ClientEndInvokeEvent(request, response, throwable));
                    }
                }
            }
            // 包装响应
            decorateResponse(response);
            return response;
        } finally {
            RpcInternalContext.removeContext();
            RpcInternalContext.popContext();
        }
    }

这个方法主要做了几件事:

  1. 包装request请求,设置必要的参数
  2. 调用FailOverCluster的invoke方法,将reques请求发送出去,并得到response相应
  3. 包装response响应

我们在调用FailOverCluster的时候实际上是调用的父类AbstractCluster的invoker方法,FailOverCluster关系图如下:

所以我们进入到AbstractCluster的invoker方法中:

    @Override
    public SofaResponse invoke(SofaRequest request) throws SofaRpcException {
        SofaResponse response = null;
        try {
            // 做一些初始化检查,例如未连接可以连接
            checkClusterState();
            // 开始调用
            countOfInvoke.incrementAndGet(); // 计数+1         
            response = doInvoke(request);
            return response;
        } catch (SofaRpcException e) {
            // 客户端收到异常(客户端自己的异常)
            throw e;
        } finally {
            countOfInvoke.decrementAndGet(); // 计数-1
        }
    }

checkClusterState方法主要是用来校验是否已销毁了,或是调用了init方法进行初始化了。 然后会在调用之前记一下数。 然后我们进入到doInvoke方法中:

    public SofaResponse doInvoke(SofaRequest request) throws SofaRpcException {
        String methodName = request.getMethodName();
        int retries = consumerConfig.getMethodRetries(methodName);
        int time = 0;
        SofaRpcException throwable = null;// 异常日志
        List<ProviderInfo> invokedProviderInfos = new ArrayList<ProviderInfo>(retries + 1);
        do {
            //负载均衡
            ProviderInfo providerInfo = select(request, invokedProviderInfos);
            try {
                //调用过滤器链
                SofaResponse response = filterChain(providerInfo, request);
                if (response != null) {
                    if (throwable != null) {
                        if (LOGGER.isWarnEnabled(consumerConfig.getAppName())) {
                            LOGGER.warnWithApp(consumerConfig.getAppName(),
                                LogCodes.getLog(LogCodes.WARN_SUCCESS_BY_RETRY,
                                    throwable.getClass() + ":" + throwable.getMessage(),
                                    invokedProviderInfos));
                        }
                    }
                    return response;
                } else {
                    throwable = new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR,
                        "Failed to call " + request.getInterfaceName() + "." + methodName
                            + " on remote server " + providerInfo + ", return null");
                    time++;
                }
            } catch (SofaRpcException e) { // 服务端异常+ 超时异常 才发起rpc异常重试
                if (e.getErrorType() == RpcErrorType.SERVER_BUSY
                    || e.getErrorType() == RpcErrorType.CLIENT_TIMEOUT) {
                    throwable = e;
                    time++;
                } else {
                    throw e;
                }
            } catch (Exception e) { // 其它异常不重试
                throw new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR,
                    "Failed to call " + request.getInterfaceName() + "." + request.getMethodName()
                        + " on remote server: " + providerInfo + ", cause by unknown exception: "
                        + e.getClass().getName() + ", message is: " + e.getMessage(), e);
            } finally {
                if (RpcInternalContext.isAttachmentEnable()) {
                    RpcInternalContext.getContext().setAttachment(RpcConstants.INTERNAL_KEY_INVOKE_TIMES,
                        time + 1); // 重试次数
                }
            }
            invokedProviderInfos.add(providerInfo);
        } while (time <= retries);

        throw throwable;
    }

这个方法里面主要做了这这件事:

  1. 如果失败的话就循环调用
  2. 负载均衡,选取provider
  3. 通过过滤器链调用服务端,并返回结果
  4. 异常处理

接着我们进入到filterChain方法中,根据过滤器链最后会跳到ConsumerInvoker中的invoke方法

    @Override
    public SofaResponse invoke(SofaRequest sofaRequest) throws SofaRpcException {
        // 设置下服务器应用
        ProviderInfo providerInfo = RpcInternalContext.getContext().getProviderInfo();
        String appName = providerInfo.getStaticAttr(ProviderInfoAttrs.ATTR_APP_NAME);
        if (StringUtils.isNotEmpty(appName)) {
            sofaRequest.setTargetAppName(appName);
        }

        // 目前只是通过client发送给服务端
        return consumerBootstrap.getCluster().sendMsg(providerInfo, sofaRequest);
    }

consumerBootstrap.getCluster()会返回FailOverCluster实例,然后调用父类AbstractCluster的sendMsg方法

    public SofaResponse sendMsg(ProviderInfo providerInfo, SofaRequest request) throws SofaRpcException {
        ClientTransport clientTransport = connectionHolder.getAvailableClientTransport(providerInfo);
        if (clientTransport != null && clientTransport.isAvailable()) {
            return doSendMsg(providerInfo, clientTransport, request);
        } else {
            throw unavailableProviderException(request.getTargetServiceUniqueName(), providerInfo.getOriginUrl());
        }
    }
    
    
    protected SofaResponse doSendMsg(ProviderInfo providerInfo, ClientTransport transport,
                                     SofaRequest request) throws SofaRpcException {
        RpcInternalContext context = RpcInternalContext.getContext();
        // 添加调用的服务端远程地址
        RpcInternalContext.getContext().setRemoteAddress(providerInfo.getHost(), providerInfo.getPort());
        try {
            checkProviderVersion(providerInfo, request); // 根据服务端版本特殊处理
            String invokeType = request.getInvokeType();
            int timeout = resolveTimeout(request, consumerConfig, providerInfo);

            SofaResponse response = null;
            // 同步调用
            if (RpcConstants.INVOKER_TYPE_SYNC.equals(invokeType)) {
                long start = RpcRuntimeContext.now();
                try {
                    response = transport.syncSend(request, timeout);
                } finally {
                    if (RpcInternalContext.isAttachmentEnable()) {
                        long elapsed = RpcRuntimeContext.now() - start;
                        context.setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_ELAPSE, elapsed);
                    }
                }
            }
            // 单向调用
            else if (RpcConstants.INVOKER_TYPE_ONEWAY.equals(invokeType)) {
                long start = RpcRuntimeContext.now();
                try {
                    transport.oneWaySend(request, timeout);
                    response = buildEmptyResponse(request);
                } finally {
                    if (RpcInternalContext.isAttachmentEnable()) {
                        long elapsed = RpcRuntimeContext.now() - start;
                        context.setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_ELAPSE, elapsed);
                    }
                }
            }
            // Callback调用
            else if (RpcConstants.INVOKER_TYPE_CALLBACK.equals(invokeType)) {
                // 调用级别回调监听器
                SofaResponseCallback sofaResponseCallback = request.getSofaResponseCallback();
                if (sofaResponseCallback == null) {
                    SofaResponseCallback methodResponseCallback = consumerConfig
                        .getMethodOnreturn(request.getMethodName());
                    if (methodResponseCallback != null) { // 方法的Callback
                        request.setSofaResponseCallback(methodResponseCallback);
                    }
                }
                // 记录发送开始时间
                context.setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_SEND_TIME, RpcRuntimeContext.now());
                // 开始调用
                transport.asyncSend(request, timeout);
                response = buildEmptyResponse(request);
            }
            // Future调用
            else if (RpcConstants.INVOKER_TYPE_FUTURE.equals(invokeType)) {
                // 记录发送开始时间
                context.setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_SEND_TIME, RpcRuntimeContext.now());
                // 开始调用
                ResponseFuture future = transport.asyncSend(request, timeout);
                // 放入线程上下文
                RpcInternalContext.getContext().setFuture(future);
                response = buildEmptyResponse(request);
            } else {
                throw new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR, "Unknown invoke type:" + invokeType);
            }
            return response;
        } catch (SofaRpcException e) {
            throw e;
        } catch (Throwable e) { // 客户端其它异常
            throw new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR, e);
        }
    }

sendMsg方法最后会调用到doSendMsg。 soSendMsg里面主要做了如下几件事:

  1. 如果是同步调用,则直接返回封装好的参数
  2. 如果是单向调用,则调用buildEmptyResponse方法,返回一个空的response
  3. 如果是callback调用asyncSend,RPC在获取到服务端的结果后会自动执行该回调实现。
  4. 服务端返回响应结果被 RPC 缓存,当客户端需要响应结果的时候需要主动获取结果,获取结果的过程阻塞线程。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区

领取腾讯云代金券