前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >腾讯开源框架TarsCpp-rpc设计分析-client(四)

腾讯开源框架TarsCpp-rpc设计分析-client(四)

原创
作者头像
路小饭
修改2019-12-27 18:10:08
1.2K0
修改2019-12-27 18:10:08
举报

5 细节说明

5.1 Transceiver缓冲区管理

设想下这样的情形:

  • client调用操作系统函数发送请求时,如果因为各种原因没有将请求内容全部发送完怎么办
  • client接收server结果时,只收到半个结果或者收到1.5个结果怎么办

基于上面情形,Transceiver引入了两个TC_Buffer,分别作为发送缓存_sendBuffer和接收缓存_recvBuffer。

5.1.1 发送缓存_sendBuffer

当Transceiver使用sendRequest发送数据时:

  • 如果当前缓存不为空,直接把当前请求放入_timeoutQueue,等下次epoll调度时候使用handleOutputImp发送本次请求和缓存中的内容
  • 当前缓存为空时候,使用系统函数发送请求,如果没发送完把数据放入缓存中,发送完直接返回成功。

上述流程见图1

图1
图1

关键代码如下:

代码语言:txt
复制
int Transceiver::sendRequest(const char * pData, size_t iSize, bool forceSend)
{
    //buf不为空,直接返回失败
    //等buffer可写了,epoll会通知写时间
    if(!_sendBuffer.IsEmpty())
    {
        return eRetError;
    }

    int iRet = this->send(pData,iSize,0);

    //失败,直接返回
    if(iRet < 0)
    {
        return eRetError;
    }

    //没有全部发送完,写buffer 返回成功
    if(iRet < (int)iSize)
    {
        _sendBuffer.PushData(pData+iRet,iSize-iRet);
        return eRetFull;
    }

    return eRetOk;
}

继续看handleOutputImp是如何处理_sendBuffer中数据的,handleOutputImp调用了Transceiver::doRequest来处理上次_sendBuffer中剩下的内容

代码语言:txt
复制
int Transceiver::doRequest()
{
    //buf不为空,先发生buffer的内容
    if(!_sendBuffer.IsEmpty())
    {
        size_t length = 0;
        void* data = NULL;
        _sendBuffer.PeekData(data, length);

        iRet = this->send(data, length, 0);

        //失败,直接返回
        if(iRet < 0)
        {
            return iRet;
        }

        //即使这次也没发送完也没关系,还会保留在_sendBuffer中,下次继续发送
        if(iRet > 0)
        {
            _sendBuffer.Consume(iRet);
            if (_sendBuffer.IsEmpty())
                _sendBuffer.Shrink();
            else
                return 0;
        }
    }

刚才说到,Transceiver使用sendRequest时,如果发现_sendBuffer有内容,就把本次请求放到_timeoutQueue中。这个请求也是在上面的Transceiver::doRequest方法中被发送的,关键代码如下。

代码语言:txt
复制
int Transceiver::doRequest()
{
    //取adapter里面积攒的数据
    _adapterProxy->doInvoke();
}
代码语言:txt
复制
void AdapterProxy::doInvoke()
{
    while(!_timeoutQueue->sendListEmpty())
    {
        ReqMessage * msg = NULL;

        _timeoutQueue->getSend(msg);

        int iRet = _trans->sendRequest(msg->sReqData.c_str(), msg->sReqData.size());

        //发送失败 返回
        if(iRet == Transceiver::eRetError)
        {
            TLOGINFO("[TARS][AdapterProxy::doInvoke fail,errono:" << iRet << endl);
            return;
        }

        //请求发送成功了 处理采样
        //...

        //发送完成
        _timeoutQueue->popSend(msg->eType == ReqMessage::ONE_WAY);
    }
}

5.1.2 接收缓存_recvBuffer

_recvBuffer使用相对简单,从套接字得到的数据直接放到_recvBuffer中,然后解析_recvBuffer中的内容即可。

代码语言:txt
复制
int TcpTransceiver::doResponse(list<ResponsePacket>& done)
{
    //第一部分,接收数据放到_recvBuffer中
    do
    {
        _recvBuffer.AssureSpace(8 * 1024);
        char stackBuffer[64 * 1024];

        struct iovec vecs[2];
        vecs[0].iov_base = _recvBuffer.WriteAddr();
        vecs[0].iov_len = _recvBuffer.WritableSize();
        vecs[1].iov_base = stackBuffer;
        vecs[1].iov_len = sizeof stackBuffer;

        if ((iRet = this->readv(vecs, 2)) > 0)
        {
            if (static_cast<size_t>(iRet) <= vecs[0].iov_len)
            {
                _recvBuffer.Produce(iRet);
            }
            else
            {
                _recvBuffer.Produce(vecs[0].iov_len);
                size_t stackBytes = static_cast<size_t>(iRet) - vecs[0].iov_len;
                _recvBuffer.PushData(stackBuffer, stackBytes);
            }
        }
    }
    while (iRet>0);
    
    //第二部分,反序列化_recvBuffer中的内容,结构化为ResponsePacket供上层应用使用
    if(!_recvBuffer.IsEmpty())
    {
        try
        {
            const char* data = _recvBuffer.ReadAddr();
            size_t len = _recvBuffer.ReadableSize();

            size_t pos = 0;
            ProxyProtocol& proto = _adapterProxy->getObjProxy()->getProxyProtocol();

            if (proto.responseExFunc) 
            {
                long id = _adapterProxy->getId();
                pos = proto.responseExFunc(data, len, done, (void*)id);
            }
            else
            {
                pos = proto.responseFunc(data, len, done);
            }

            if(pos > 0)
            {
                {
                    _recvBuffer.Consume(pos);
                    if (_recvBuffer.Capacity() > 8 * 1024 * 1024)
                        _recvBuffer.Shrink();
                }
            }
        }

    }

    return done.empty()?0:1;
}

5.2 _timeoutQueue说明

_timeoutQueue定义在AdapterProxy中

代码语言:txt
复制
std::unique_ptr<TC_TimeoutQueueNew<ReqMessage*>> _timeoutQueue;

引入_timeoutQueue的意义在于解耦请求发送和结果接收。对于一个线程来说,发送完请求就完成了任务,不用额外等待结果,将事情交给epoll去主动调度。

  • 第一个功能:桥梁。将请求和结果放到同一个msg中,供client使用。在AdapterProxy::invoke中,每次发送请求时会生成一个iRequestId,标示一个msg,然后放到_timeoutQueue中。服务端处理请求发送结果时,会带上这个iRequestId。这样客户端收到结果时,根据iRequestId得知这个结果是属于哪个请求,最后将结果放到msg中
代码语言:txt
复制
//请求放入队列中
int AdapterProxy::invoke(ReqMessage * msg)
{
    bool bFlag = _timeoutQueue->push(msg, msg->request.iRequestId, msg->request.iTimeout + msg->iBeginTime);
}       
代码语言:txt
复制
//接收到结果后,根据iRequestId找到对应的msg
void AdapterProxy::finishInvoke(ResponsePacket & rsp)
{
    ReqMessage * msg = NULL;
    else
    {
        //这里的队列中的发送链表中的数据可能已经在timeout的时候删除了
        bool retErase = _timeoutQueue->erase(rsp.iRequestId, msg);

        //找不到此请求id信息
        if (!retErase)
        {
            if(_timeoutLogFlag)
            {
                TLOGERROR("[TARS][AdapterProxy::finishInvoke(ResponsePacket) objname:"<< _objectProxy->name() << ",get req-ptr NULL,may be timeout,id:" << rsp.iRequestId 
                    << ",desc:" << _endpoint.desc() << endl);
            }
            return ;
        }

        assert(msg->eStatus == ReqMessage::REQ_REQ);

        msg->eStatus = ReqMessage::REQ_RSP;
    }

    //将结果赋给msg的response
    msg->response = rsp;

    finishInvoke(msg);
}
  • 第二个功能:请求超时判断。设想这样一种情景。请求发送成功,但这时服务端挂了,如果不处理,请求就会一直留在_timeoutQueue中。所以client应该能够处理超时的请求。
代码语言:txt
复制
void CommunicatorEpoll::run()
{
    while (!_terminate)
    {
        doTimeout();
    }
}

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 5 细节说明
    • 5.1 Transceiver缓冲区管理
      • 5.1.1 发送缓存_sendBuffer
    • 5.2 _timeoutQueue说明
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档