首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >腾讯开源框架TarsCpp-rpc设计分析-client(三)

腾讯开源框架TarsCpp-rpc设计分析-client(三)

原创
作者头像
路小饭
修改2019-06-13 17:24:15
1.3K0
修改2019-06-13 17:24:15
举报

前面3节介绍了整体设计思路,下面的章节来一起浏览下实现细节

4 设计实现篇

从官方自带的例子TarsCpp/examples/QuickStartDemo/HelloServer/AsyncClient/main.cpp开始

//main.cpp
int main(int argc,char ** argv)
{
    //封装了CommunicatorEpoll、AsyncProcThread的实现
    Communicator comm;

    try
    {
        /*
        4.1 准备阶段,初始化了四大组件:
        CommunicatorEpoll、AsyncProcThread、ServantProxy和ObjectProxy
        */
        
        //HelloPrx 继承了ServantProxy,是一个ServantProxy
        HelloPrx prx;  
        comm.stringToProxy("TestApp.HelloServer.HelloObj@tcp -h 10.120.129.226 -p 20001" , prx);

        try
        {
            string sReq("hello world");
            HelloPrxCallbackPtr cb = new HelloCallBack();
            
            /*
            4.2 具体调用环节
            */
            prx->async_testHello(cb, sReq);
            cout<<" sReq:"<<sReq<<endl;
        }
…………
}

4.1 准备阶段

  • CommunicatorEpoll的初始化:
//Communicator.cpp
ServantProxy * Communicator::getServantProxy(const string& objectName,const string& setName)
{
    //initialize初始化了CommunicatorEpoll、AsyncProcThread
    Communicator::initialize();
    
    //返回封装好的ServantProxy
    return _servantProxyFactory->getServantProxy(objectName,setName);
}
//Communicator.cpp
void Communicator::initialize()
{
    …………
    //初始化CommunicatorEpoll
    for(size_t i = 0; i < _clientThreadNum; ++i)
    {
        _communicatorEpoll[i] = new CommunicatorEpoll(this, i);
        _communicatorEpoll[i]->start();
    }
    …………
}
  • AsyncProcThread的初始化:一个CommunicatorEpoll可以对应多个AsyncProcThread
//CommunicatorEpoll.cpp
CommunicatorEpoll::CommunicatorEpoll(Communicator * pCommunicator,size_t netThreadSeq)
…………
{
    …………
    //创建异步线程
    //注意每个异步线程里都有一个异步队列ReqInfoQueue
    for(size_t i = 0; i < _asyncThreadNum; ++i)
    {
        _asyncThread[i] = new AsyncProcThread(iAsyncQueueCap);
        _asyncThread[i]->start();
    }

    //初始化请求的事件通知
    //_notify提供了一个人为控制epoll触发机制的入口
    for(size_t i = 0; i < MAX_CLIENT_NOTIFYEVENT_NUM; ++i)
    {
        _notify[i].bValid = false;
    }
    …………
}

在AsyncProcThread中,有一个无锁的异步队列ReqInfoQueue(TC_LoopQueue),CommunicatorEpoll线程执行AsyncProcThread::push_back操作,AsyncProcThread线程中的run方法执行pop_front操作,但队列本身是不加锁的。只有在一生产者一消费者场景中才适用这种无锁队列。

在TC_LoopQueue中,push_back操作的是_iBegin,pop_front操作的是_iEnd,因为只有一读(push_back)一写(pop_front)两个线程,两个线程操作的不是同一个变量,因此不会有竞态关系产生。

  • ObjectProxy的初始化
//ObjectProxyFactory.cpp
ObjectProxy * ObjectProxyFactory::getObjectProxy(const string& sObjectProxyName,const string& setName)
{
    …………
    ObjectProxy * pObjectProxy = new ObjectProxy(_communicatorEpoll, sObjectProxyName,setName);
    …………
    return pObjectProxy;
}

4.2 具体调用环节

4.2.1分配请求

图1
图1

如图1所示,如果一个ServantProxy有多个ObjectProxy,也就意味着它对应着多个CommunicatorEpoll1,正常情况下它会以轮询方式把多次请求放到不同的ObjectProxy中,见下面代码:

//ServantProxy.cpp
void ServantProxy::selectNetThreadInfo(ServantProxyThreadData * pSptd, ObjectProxy * & pObjProxy, ReqInfoQueue * & pReqQ)
{
    …………
        {
            //用线程的私有数据来保存选到的seq
            pObjProxy = *(_objectProxy + pSptd->_netSeq);
            pReqQ     = pSptd->_reqQueue[pSptd->_netSeq];
            pSptd->_netSeq++;

            if(pSptd->_netSeq == _objectProxyNum)
                pSptd->_netSeq = 0;
        }
    }
}

如果是同步请求,ServantProxy所在线程会同步阻塞,异步请求会直接结束。见下面代码:

//ServantProxy.cpp
void ServantProxy::invoke(ReqMessage * msg, bool bCoroAsync)
{
…………
    //通知CommunicatorEpoll
    pObjProxy->getCommunicatorEpoll()->notify(pSptd->_reqQNo, pReqQ);

    //异步调用 另一个线程delele msg 如果是异步后面不能再用msg了

    if(bSync)
    {
        if(!msg->bCoroFlag)
        {
            if(!msg->bMonitorFin)
            {
                TC_ThreadLock::Lock lock(*(msg->pMonitor));

                //等待直到网络线程通知过来
                if(!msg->bMonitorFin)
                {
                    msg->pMonitor->wait();
                }
            }
        }
        else
        {
            msg->sched->yield(false);
        }
…………

    }
}

4.2.2 请求和结果调度

ServantProxy把msg消息通知到CommunicatorEpoll1中,利用epoll模型完成请求和结果的处理

图2
图2

(1)pObjectProxy->invoke里最终调用的AdapterProxy::invoke方法来完成请求发送,下面列出主要代码节点

int AdapterProxy::invoke(ReqMessage * msg)
{
    //生成requestid
    //tars调用 而且 不是单向调用
    if(!msg->bFromRpc)
    {
        msg->request.iRequestId = _objectProxy->generateId();
    }
    //对请求进行协议转化
    _objectProxy->getProxyProtocol().requestFunc(msg->request, msg->sReqData);

    //交给连接发送数据,连接连上,buffer不为空,直接发送数据成功
    //不管是否发送成功,都放到_timeoutQueue队列中
    if(_timeoutQueue->sendListEmpty() && _trans->sendRequest(msg->sReqData.c_str(),msg->sReqData.size()) != Transceiver::eRetError)
    {
        bool bFlag = _timeoutQueue->push(msg, msg->request.iRequestId, msg->request.iTimeout + msg->iBeginTime);
        if(!bFlag)
        {
            TLOGERROR("[TARS][AdapterProxy::invoke fail1 : insert timeout queue fail,queue size:" << _timeoutQueue->size() << ",objname" <<_objectProxy->name() << ",desc" << _endpoint.desc() <<endl);
            msg->eStatus = ReqMessage::REQ_EXC;
            finishInvoke(msg);
        }
    }
    else
    {
        TLOGINFO("[TARS][AdapterProxy::invoke push (no send) " << _objectProxy->name() << ", " << _endpoint.desc() << ",id " << msg->request.iRequestId <<endl);

        //请求发送失败了
        bool bFlag = _timeoutQueue->push(msg,msg->request.iRequestId, msg->request.iTimeout+msg->iBeginTime, false);
        if(!bFlag)
        {
            TLOGERROR("[TARS][AdapterProxy::invoke fail2 : insert timeout queue fail,queue size:" << _timeoutQueue->size() << "," <<_objectProxy->name() << ", " << _endpoint.desc() <<endl);
            
            msg->eStatus = ReqMessage::REQ_EXC;

            finishInvoke(msg);
        }
    }

    return 0;
}

(2)当有结果到达时,使用handleInputImp处理结果,handleInputImp调用了AdapterProxy::finishInvoke进行结果处理

void AdapterProxy::finishInvoke(ResponsePacket & rsp)
{
    …………
    else
    {
        //根据rsp.iRequestId从_timeoutQueue中取出对应请求的msg
        //这里的队列中的发送链表中的数据可能已经在timeout的时候删除了
        bool retErase = _timeoutQueue->erase(rsp.iRequestId, msg);

        //找不到此请求id信息
        if (!retErase)
        {
            if(_timeoutLogFlag)
            {
                TLOGERROR("[TARS][AdapterProxy::finishInvoke(ResponsePacket) objname:"<< _objectProxy->name() << ",get req-ptr NULL,may be timeout,id:" << rsp.iRequestId 
                    << ",desc:" << _endpoint.desc() << endl);
            }
            return ;
        }

        assert(msg->eStatus == ReqMessage::REQ_REQ);

        msg->eStatus = ReqMessage::REQ_RSP;
    }

    //将结果放到msg中
    msg->response = rsp;

    finishInvoke(msg);
}

从_timeoutQueue中找到msg后,使用finishInvoke(msg)方法进行通知,finishInvoke(msg)中包含了对同步和异步的处理

void AdapterProxy::finishInvoke(ReqMessage * msg)
{
    //同步调用,唤醒ServantProxy线程
    if(msg->eType == ReqMessage::SYNC_CALL)
    {
        if(!msg->bCoroFlag)
        {
            assert(msg->pMonitor);

            TC_ThreadLock::Lock sync(*(msg->pMonitor));
            msg->pMonitor->notify();
            msg->bMonitorFin = true;
        }
        else
        {
            msg->sched->put(msg->iCoroId);
        }

        return ;
    }

    //异步调用
    if(msg->eType == ReqMessage::ASYNC_CALL)
    {
           …………
            else
            {
                //异步回调,放入回调处理线程中
                _objectProxy->getCommunicatorEpoll()->pushAsyncThreadQueue(msg);
            }
        }
        else
        …………
        }
        return;
    }

    assert(false);

    return;
}

(3)handleOutputImp调用的是Transceiver::doRequest(),用来解决两个问题,一是Transceiver的发送缓存里如果还有内容,将继续发送出去。二是_timeoutQueue中如果有上次没有发送成功的请求,会在这里重新尝试发送一次,如果再发送失败就放弃

int Transceiver::doRequest()
{
    //buf不为空,先发生buffer的内容
    if(!_sendBuffer.IsEmpty())
    {
        size_t length = 0;
        void* data = NULL;
        _sendBuffer.PeekData(data, length);

        iRet = this->send(data, length, 0);

        //失败,直接返回
        if(iRet < 0)
        {
            return iRet;
        }

        if(iRet > 0)
        {
            _sendBuffer.Consume(iRet);
            if (_sendBuffer.IsEmpty())
                _sendBuffer.Shrink();
            else
                return 0;
        }
    }

    //取adapter里面积攒的数据
    _adapterProxy->doInvoke();

    //object里面应该是空的
    assert(_adapterProxy->getObjProxy()->timeoutQSize()  == 0);

    return 0;
}

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 4 设计实现篇
    • 4.1 准备阶段
      • 4.2 具体调用环节
        • 4.2.1分配请求
        • 4.2.2 请求和结果调度
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档