鹅厂开源框架tars之网络层实现

tars开源框架地址:https://github.com/Tencent/Tars

系列文章:

鹅厂开源框架tars之日志服务

鹅厂开源框架tars之运营监控服务

鹅厂开源框架tars之基础组件

鹅厂开源框架tars之网络层实现

简介:Tars是腾讯从2008年到今天一直在使用的后台逻辑层的统一应用框架TAF(Total Application Framework),目前支持C++,Java,PHP,Nodejs语言。该框架为用户提供了涉及到开发、运维、以及测试的一整套解决方案,帮助一个产品或者服务快速开发、部署、测试、上线。 它集可扩展协议编解码、高性能RPC通信框架、名字路由与发现、发布监控、日志统计、配置管理等于一体,通过它可以快速用微服务的方式构建自己的稳定可靠的分布式应用,并实现完整有效的服务治理。目前该框架在腾讯内部,各大核心业务都在使用,颇受欢迎,基于该框架部署运行的服务节点规模达到上万个。

一、客户端同步调用远程服务部分:

使用过tars的人应该都能感受到RPC功能的方便性,tars调用远程服务的方法如下:

m_pRoleServantPrx = Application::getCommunicator()->stringToProxy<RoleServantPrx>(GAMECONFIG->sRoleServerObj);

这样一句代码就可以实现获取远程服务RoleServer的rpc接口了,接着接口调用远程服务RoleServer实现定义好的接口例如getRoleInfo();m_pRoleServantPrx->getRoleInfo(iUin, vData); 两行代码完成一个rpc调用。是我用过的最方便的rpc框架没有之一了

具体框架源码做了什么事情,通过下图然后后续展开来分析

如上图,可以看到首先调用application的获取Communicator通讯器(Communicator 通信器,用于创建和维护客户端proxy),本文主要描述整个核心的网络流程,对辅助类后续会有文章展开分析

直接进入Communicator的stringToProxy成员函数,该函数主要功能如下:

ServantProxy * pServantProxy = getServantProxy(objectName,setName);

根据传入的objname例如例子中的sRoleServerObj(HERMAN.GateServer.GateServantObj@tcp -h 101.37.70.*** -t 60000 -p 5052),获取一个Servant的代理类proxy(Servant的含义是:tars每一个应用下面可以有多个服务,一个服务下面可以有多个servant),表现形式就是一个servant负责处理一个ip和端口的服务,因此servant的官方文档描述如下:

1:远程对象的本地代理 2:同名servant在一个通信器中最多只有一个实例

要注意的一点就是在获取servant代理对象之前,客户端通讯器先调用了Communicator::initialize();

initialize干了这些事情:先new了一个_servantProxyFactory工厂类,然后根据服务配置的netthread网络线程数量,启动对应数量的CommunicatorEpoll(CommunicatorEpoll:用于处理epoll事件的线程类,线程定期run处理epoll的网络事件:handle((FDInfo*)data, ev.events)),CommunicatorEpoll构造的时候:注册了EPOLLIN事件,同时new了ObjectProxyFactory.

每个objectname在进程空间唯一有一个objectproxy 管理收发的消息队列

后续CommunicatorEpoll的run函数监听到EPOLLIN函数则处理来自网络接口的输入:handleInputImp处理收到的网络包,调用

遍历包链表,调用pTransceiver->getAdapterProxy()->finishInvoke(*it)函数,调用adapter代理类的成员函数:AdapterProxy::finishInvoke(ReqMessage * msg)如果是同步调用则唤醒ServantProxy线程。

至此,stringToProxy干的主要事情已经描述完毕,小结:就是启动epoll线程监听输入输出,处理输入并且根据同步或者异步方式进行不同的处理。那么还有问题,就是得到了远程服务的代理类之后,数据是怎么发出去的?

调用getRoleServantPrx()->reportRoleInfo(data);reportRoleInfo是远程服务的rpc接口名字。

这里看下一个最简单的例子:接口定义如下:

interface GateServant { int doRequest(string sRequest, out string sResponse); };

可以看下文件自动生成的同步调用源码:

tars::Int32 doRequest(const std::string & sRequest,std::string &sResponse,const map<string, string> &context = TARS_CONTEXT(),map<string, string> * pResponseCo ntext = NULL) { tars::TarsOutputStream<tars::BufferWriter> _os; _os.write(sRequest, 1); _os.write(sResponse, 2); tars::ResponsePacket rep; std::map<string, string> _mStatus; tars_invoke(tars::TARSNORMAL,"doRequest", _os.getByteBuffer(), context, _mStatus, rep); if(pResponseContext) { *pResponseContext = rep.context; }

tars::TarsInputStream<tars::BufferReader> _is; _is.setBuffer(rep.sBuffer); tars::Int32 _ret; _is.read(_ret, 0, true); _is.read(sResponse, 2, true); return _ret; }

如上可以看到这里调用了tars_invoke接口,回到框架源码,可以看到ServantProxy的成员函数taf_invoke实现如下:

先new一个消息类:ReqMessagePtr req = new ReqMessage(_comm);然后把rpc调用的参数等上下文参数进行填充。

然后调用ServantProxy::invoke(ReqMessagePtr& req)注意如果是同步调用,这里new了一个ReqMonitor,用于同步调用时的条件变量,阻塞wait指定时间,等待对端返回。

接着调用_objectProxy->invoke(req);(函数声明ObjectProxy::invoke(ReqMessage * msg))

//选择一个远程服务的AdapterProxy 来调用(关于AdapterProxy 的解释:每个AdapterProxy 对应一个Endpoint,也就是一个服务IP:端口) AdapterProxy * pAdapterProxy = NULL; bool bFirst = _pEndpointManger->selectAdapterProxy(msg,pAdapterProxy);

注意:这里为啥要进行proxy的select,因为客户端rpc调用远程服务器,远程同名字的服务器可能部署有多个,于是需要按照事先设置好的调用规则来访问,例如默认的轮询规则,也可以按照配置好的权重等,所以AdapterProxy就是代理连接到远程某一个指定服务的代理类)

调用 pAdapterProxy->invoke(msg);

这里判断发送队列如果为空则直接_pTrans->sendRequest发送数据,否则调用_pTimeoutQueue->push把发送数据接入发送队列(否则则等待CommunicatorEpoll的线程函数run接收到epollout事件则遍历发送缓存队列send发送),至此调用远程接口的数据则发送到远程服务了。

根据文章前部分的描述,等待远程服务返回数据的时候,CommunicatorEpoll线程则会接收到epollin事件,然后启动接收数据处理函数。完成一个rpc调用的流程,当然这里面还包括了超时处理例如远程服务超载等细节,本文只理清核心流程,细节这里不做分析。

另外如上面所述CommunicatorEpoll类接受到epollout时间则会调用handleOutputImp的时候会调用

doRequest();函数把//取adapter里面积攒的数据 _pAdapterProxy->doInvoke();如下图

void CommunicatorEpoll::handleOutputImp(Transceiver * pTransceiver)

{ //检查连接是否有错误 if(pTransceiver->isConnecting()) { pTransceiver->setConnected(); } pTransceiver->doRequest(); }

小结:客户端rpc调用远程服务器的流程就是:Communicator的stringToProxy成员函数会根据传参的服务器ip和端口还有名称等信息,select到远程服务器的一个AdapterProxy,然后调用invoke函数通过Transceiver(网络传输类)发送信息,而rpc调用的返回则通过CommunicatorEpoll线程接受epollin接收来自远程的返回,然后CommunicatorEpoll接收epollout事件之后处理发送缓存队列。而CommunicatorEpoll线程是在什么时候启动的呢?是在Communicator的stringToProxy时候调用initialize函数,根据netthread数量启动对应数目的CommunicatorEpoll线程

二、客户端异步调用远程服务部分:

void async_doRequest(GateServantPrxCallbackPtr callback,const std::string &sRequest,const map<string, string>& context = TARS_CONTEXT()) { tars::TarsOutputStream<tars::BufferWriter> _os; _os.write(sRequest, 1); std::map<string, string> _mStatus; tars_invoke_async(tars::TARSNORMAL,"doRequest", _os.getByteBuffer(), context, _mStatus, callback); }

异步调用最明显的不同在于多了一个callback,ServantProxyCallback异步回调对象的基类,其他发送部分类似于同步调用的流程,通过servantprxoy的invoke再调用objectprxoy的invoke写入发送缓冲队列。同样等待CommunicatorEpoll线程处理EPOLLIN事件,AdapterProxy::finishInvoke 异步调用。

//如果是本线程的回调,直接本线程处理:ReqMessagePtr msgPtr = msg;msg->callback->onDispatch(msgPtr);

//异步回调,放入回调处理线程中 _pObjectProxy->getCommunicatorEpoll()->pushAsyncThreadQueue(msg);

异步线程AsyncProcThread::run()处理异步请求回来的响应包:调用msg->callback->onDispatch(msgPtr);

jce自动生成的代码:调用了callback_doRequest如下图从而实现回包自动回调

virtual int onDispatch(tars::ReqMessagePtr msg) { static ::std::string __GateServant_all[]= { "doRequest", "test" }; pair<string*, string*> r = equal_range(__GateServant_all, __GateServant_all+2, string(msg->request.sFuncName)); if(r.first == r.second) return tars::TARSSERVERNOFUNCERR; switch(r.first - __GateServant_all) { case 0: { if (msg->response.iRet != tars::TARSSERVERSUCCESS) { callback_doRequest_exception(msg->response.iRet);

return msg->response.iRet; } tars::TarsInputStream<tars::BufferReader> _is;

_is.setBuffer(msg->response.sBuffer); tars::Int32 _ret; _is.read(_ret, 0, true);

std::string sResponse; _is.read(sResponse, 2, true); CallbackThreadData * pCbtd = CallbackThreadData::getData(); assert(pCbtd != NULL);

pCbtd->setResponseContext(msg->response.context);

callback_doRequest(_ret, sResponse);

pCbtd->delResponseContext();

return tars::TARSSERVERSUCCESS;

}

三、服务器启动:监听接收数据消息分发等

main函数调用 initializeServer //初始化Server部分 new出来TC_EpollServer 用于

* 注册协议解析器 * 注册逻辑处理器 * 注册管理端口处理器

并且设置TC_EpollServer 类的_netThreadNum网络线程个数:用于处理循环监听网路连接请求。如下:

_netThreads在初始化阶段是不启动的,只是先设置了参数,一般等到业务服务调用:waitForShutdown函数会启动所有netThreads线程:(_netThreads[i]->start();)

_netThreads具体做了什么事情如下:

void TC_EpollServer::NetThread::run()(需要检查该netthread管理的连接是否超时checkTimeout,如果超时的连接需要删除掉,然后开始调用epollwait等待epoll事件的发生,目前tars支持多个netthread线程,具体可以再配置文件配置 netthread参数,目前网络线程的配置数目不能大于15个)

const epoll_event &ev = _epoller.get(i);

case ET_LISTEN: (监听端口如果有请求则调用accept,进行ip地址检测并设置非阻塞、TcpNoDelay、CloseWait等参数,并增加连接数为该连接分配一个32位的唯一id 注还有册连接的epollin和epollout事件 备注:每个侦听端口都可以设置最大连接数和)

case ET_NET: //网络请求 processNet(ev);

函数读取网络数据 这里根据ev.data.u32的获取连接:Connection *cPtr = getConnectionPtr(uid);

然后调用NetThread::recvBuffer将数据缓存到_recvbuffer然后调用NetThread::Connection::parseProtocol

再将数据压入适配器的recv_queue _rbuffer;接收队列(后面会用业务线程处理接收队列,注意:这里会读配置文件每个Adapter的队列最大长度,超过接受队列长度的一半,需要进行overload处理)

创建好TC_EpollServer 接下来:

//绑定适配器对象和端口(支持多个) 回到Application::main函数 bindAdapter(adapters);该函数读取配置文件中配置的适配器创建TC_EpollServer::BindAdapterPtr,例如下图,配置了一个msdk适配器,endpoint字段配置了该适配器监听的IP和端口,protocol指定了是否使用taf协议,threads设置了处理收发包的网络线程数目

注意如果使用的taf协议则使用taf默认的协议处理函数,这里协议函数使用了loki库的封装机制,实现回调函数AppProtocol::parse的绑定,如果是非taf的协议,注意调用setProtocol设置自己的协议处理回调函数

if (bindAdapter->isTafProtocol()) { bindAdapter->setProtocol(AppProtocol::parse); }

接下来:调用_epollServer->bind(bindAdapter);调用NetThread::bind根据tcp或者udp创建socket;监听ip和端口

如果是tcp还会设置tcp参数:

s.setKeepAlive(); //心跳 s.setTcpNoDelay(); //禁用了Nagle算法 //不要设置close wait否则http服务回包主动关闭连接会有问题 s.setNoCloseWait();

//重新回到main函数:设置HandleGroup分组,启动线程

传入上图适配器配置文件设置的字段参数 threads代表线程个数 * 初始化处理线程,线程将会启动 */ template<typename T> void setHandle() { _pEpollServer->setHandleGroup<T>(_handleGroupName, _iHandleNum, this); }

//启动业务处理线程 _epollServer->startHandle();上一步设置的hds[i]->start();

_epollServer->createEpoll(); TC_Epoller是epoller操作类如果是TCP类,启动侦听端口:_epoller.add(it->first, H64(ET_LISTEN) | it->first, EPOLLIN);(注册id为1的ET_LISTEN事件,前面提到的netthread线程run会wait epoll的listen事件,处理新进的网络连接)

TC_EpollServer::Handle::run() 开始处理上面提到的netthread接收到的缓存队列

调用handleImp函数,这里遍历adapters所有适配器,adapter->waitForRecvQueue等待接收队列,如果队列中数据包超时了则调用handleTimeout,否则调用handle(stRecvData);进行消息包处理。

调用 ServantHandle::handle(const TC_EpollServer::tagRecvData &stRecvData)根据是否taf协议分别调用:handleTafProtocol(current);和handleNoTafProtocol(current)两个函数

如果是taf协议则继续调用sit->second->dispatch

这里要结合下jce定义rpc接口文件的生成代码,例如:

编译后自动生成代码 rpc调用 addVipGrowth接口,从而实现了rpc调用(从handleTafProtocol收到包,分发处理)

加入服务器监听端口处理完来自客户端的rpc调用之后,如果需要通过非taf协议发送给前台(例如:游戏网关收到来自逻辑服务器的下行push包,一般游戏服务器和客户端不采用rpc进行通讯,例如pb协议),那么需要调用current的sendResponse函数

cs->current->sendResponse(sMsgPack.c_str(), sMsgPack.length());触发 _pServantHandle->sendResponse(_uid, string(buff, len), _ip, _port);函数,触发 _pEpollServer->send(uid, sSendBuffer, ip, port);触发netThread->send(uid, s, ip, port);这里/

/通知epollserver的epoll响应, 有数据要发送 _epoller.mod(_notify.getfd(), H64(ET_NOTIFY), EPOLLOUT); 上面的TC_EpollServer::NetThread::run线程函数则会侦听到

EPOLLOUT事件,触发NetThread::sendBuffer(cPtr);触发NetThread::Connection::send触发write(_sock.getfd(), (const void*)(sendBegin + pos), sendLen - pos);于是数据就发送出去了

如果是普通taf协议的返回,则触发函数

JceCurrent::sendResponse(int iRet, const vector<char>& buffer, const map<string, string>& status, const string & sResultDesc)

触发:

JceOutputStream<BufferWriter> os;

string s = "";

s.append((const char*)&iHeaderLen, sizeof(taf::Int32)); //taf协议加入包头:包的长度

s.append(os.getBuffer(), os.getLength()); //加入包体

_pServantHandle->sendResponse(_uid, s, _ip, _port, _fd);如同上面的_pServantHandle->sendResponse流程,发rpc调用的返回参数发送回客户端

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

发表于

开源框架

1 篇文章2 人订阅

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区

领取腾讯云代金券