首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >鹅厂开源框架tars之网络层实现

鹅厂开源框架tars之网络层实现

原创
作者头像
皮皮猪头
发布2019-03-01 16:07:07
5.3K0
发布2019-03-01 16:07:07
举报
文章被收录于专栏:开源框架开源框架开源框架

tars开源框架地址:https://github.com/Tencent/Tars

系列文章:

鹅厂开源框架tars之日志服务

鹅厂开源框架tars之运营监控服务

鹅厂开源框架tars之基础组件

鹅厂开源框架tars之网络层实现

简介:Tars是腾讯从2008年到今天一直在使用的后台逻辑层的统一应用框架TAF(Total Application Framework),目前支持C++,Java,PHP,Nodejs语言。该框架为用户提供了涉及到开发、运维、以及测试的一整套解决方案,帮助一个产品或者服务快速开发、部署、测试、上线。 它集可扩展协议编解码、高性能RPC通信框架、名字路由与发现、发布监控、日志统计、配置管理等于一体,通过它可以快速用微服务的方式构建自己的稳定可靠的分布式应用,并实现完整有效的服务治理。目前该框架在腾讯内部,各大核心业务都在使用,颇受欢迎,基于该框架部署运行的服务节点规模达到上万个。

一、客户端同步调用远程服务部分:

使用过tars的人应该都能感受到RPC功能的方便性,tars调用远程服务的方法如下:

m_pRoleServantPrx = Application::getCommunicator()->stringToProxy<RoleServantPrx>(GAMECONFIG->sRoleServerObj);

这样一句代码就可以实现获取远程服务RoleServer的rpc接口了,接着接口调用远程服务RoleServer实现定义好的接口例如getRoleInfo();m_pRoleServantPrx->getRoleInfo(iUin, vData); 两行代码完成一个rpc调用。是我用过的最方便的rpc框架没有之一了

具体框架源码做了什么事情,通过下图然后后续展开来分析

如上图,可以看到首先调用application的获取Communicator通讯器(Communicator 通信器,用于创建和维护客户端proxy),本文主要描述整个核心的网络流程,对辅助类后续会有文章展开分析

直接进入Communicator的stringToProxy成员函数,该函数主要功能如下:

ServantProxy * pServantProxy = getServantProxy(objectName,setName);

根据传入的objname例如例子中的sRoleServerObj(HERMAN.GateServer.GateServantObj@tcp -h 101.37.70.*** -t 60000 -p 5052),获取一个Servant的代理类proxy(Servant的含义是:tars每一个应用下面可以有多个服务,一个服务下面可以有多个servant),表现形式就是一个servant负责处理一个ip和端口的服务,因此servant的官方文档描述如下:

1:远程对象的本地代理 2:同名servant在一个通信器中最多只有一个实例

要注意的一点就是在获取servant代理对象之前,客户端通讯器先调用了Communicator::initialize();

initialize干了这些事情:先new了一个_servantProxyFactory工厂类,然后根据服务配置的netthread网络线程数量,启动对应数量的CommunicatorEpoll(CommunicatorEpoll:用于处理epoll事件的线程类,线程定期run处理epoll的网络事件:handle((FDInfo*)data, ev.events)),CommunicatorEpoll构造的时候:注册了EPOLLIN事件,同时new了ObjectProxyFactory.

每个objectname在进程空间唯一有一个objectproxy 管理收发的消息队列

后续CommunicatorEpoll的run函数监听到EPOLLIN函数则处理来自网络接口的输入:handleInputImp处理收到的网络包,调用

遍历包链表,调用pTransceiver->getAdapterProxy()->finishInvoke(*it)函数,调用adapter代理类的成员函数:AdapterProxy::finishInvoke(ReqMessage * msg)如果是同步调用则唤醒ServantProxy线程。

至此,stringToProxy干的主要事情已经描述完毕,小结:就是启动epoll线程监听输入输出,处理输入并且根据同步或者异步方式进行不同的处理。那么还有问题,就是得到了远程服务的代理类之后,数据是怎么发出去的?

调用getRoleServantPrx()->reportRoleInfo(data);reportRoleInfo是远程服务的rpc接口名字。

这里看下一个最简单的例子:接口定义如下:

interface GateServant { int doRequest(string sRequest, out string sResponse); };

可以看下文件自动生成的同步调用源码:

tars::Int32 doRequest(const std::string & sRequest,std::string &sResponse,const map<string, string> &context = TARS_CONTEXT(),map<string, string> * pResponseCo ntext = NULL) { tars::TarsOutputStream<tars::BufferWriter> _os; _os.write(sRequest, 1); _os.write(sResponse, 2); tars::ResponsePacket rep; std::map<string, string> _mStatus; tars_invoke(tars::TARSNORMAL,"doRequest", _os.getByteBuffer(), context, _mStatus, rep); if(pResponseContext) { *pResponseContext = rep.context; }

tars::TarsInputStream<tars::BufferReader> _is; _is.setBuffer(rep.sBuffer); tars::Int32 _ret; _is.read(_ret, 0, true); _is.read(sResponse, 2, true); return _ret; }

如上可以看到这里调用了tars_invoke接口,回到框架源码,可以看到ServantProxy的成员函数taf_invoke实现如下:

先new一个消息类:ReqMessagePtr req = new ReqMessage(_comm);然后把rpc调用的参数等上下文参数进行填充。

然后调用ServantProxy::invoke(ReqMessagePtr& req)注意如果是同步调用,这里new了一个ReqMonitor,用于同步调用时的条件变量,阻塞wait指定时间,等待对端返回。

接着调用_objectProxy->invoke(req);(函数声明ObjectProxy::invoke(ReqMessage * msg))

//选择一个远程服务的AdapterProxy 来调用(关于AdapterProxy 的解释:每个AdapterProxy 对应一个Endpoint,也就是一个服务IP:端口) AdapterProxy * pAdapterProxy = NULL; bool bFirst = _pEndpointManger->selectAdapterProxy(msg,pAdapterProxy);

注意:这里为啥要进行proxy的select,因为客户端rpc调用远程服务器,远程同名字的服务器可能部署有多个,于是需要按照事先设置好的调用规则来访问,例如默认的轮询规则,也可以按照配置好的权重等,所以AdapterProxy就是代理连接到远程某一个指定服务的代理类)

调用 pAdapterProxy->invoke(msg);

这里判断发送队列如果为空则直接_pTrans->sendRequest发送数据,否则调用_pTimeoutQueue->push把发送数据接入发送队列(否则则等待CommunicatorEpoll的线程函数run接收到epollout事件则遍历发送缓存队列send发送),至此调用远程接口的数据则发送到远程服务了。

根据文章前部分的描述,等待远程服务返回数据的时候,CommunicatorEpoll线程则会接收到epollin事件,然后启动接收数据处理函数。完成一个rpc调用的流程,当然这里面还包括了超时处理例如远程服务超载等细节,本文只理清核心流程,细节这里不做分析。

另外如上面所述CommunicatorEpoll类接受到epollout时间则会调用handleOutputImp的时候会调用

doRequest();函数把//取adapter里面积攒的数据 _pAdapterProxy->doInvoke();如下图

void CommunicatorEpoll::handleOutputImp(Transceiver * pTransceiver)

{ //检查连接是否有错误 if(pTransceiver->isConnecting()) { pTransceiver->setConnected(); } pTransceiver->doRequest(); }

小结:客户端rpc调用远程服务器的流程就是:Communicator的stringToProxy成员函数会根据传参的服务器ip和端口还有名称等信息,select到远程服务器的一个AdapterProxy,然后调用invoke函数通过Transceiver(网络传输类)发送信息,而rpc调用的返回则通过CommunicatorEpoll线程接受epollin接收来自远程的返回,然后CommunicatorEpoll接收epollout事件之后处理发送缓存队列。而CommunicatorEpoll线程是在什么时候启动的呢?是在Communicator的stringToProxy时候调用initialize函数,根据netthread数量启动对应数目的CommunicatorEpoll线程

二、客户端异步调用远程服务部分:

void async_doRequest(GateServantPrxCallbackPtr callback,const std::string &sRequest,const map<string, string>& context = TARS_CONTEXT()) { tars::TarsOutputStream<tars::BufferWriter> _os; _os.write(sRequest, 1); std::map<string, string> _mStatus; tars_invoke_async(tars::TARSNORMAL,"doRequest", _os.getByteBuffer(), context, _mStatus, callback); }

异步调用最明显的不同在于多了一个callback,ServantProxyCallback异步回调对象的基类,其他发送部分类似于同步调用的流程,通过servantprxoy的invoke再调用objectprxoy的invoke写入发送缓冲队列。同样等待CommunicatorEpoll线程处理EPOLLIN事件,AdapterProxy::finishInvoke 异步调用。

//如果是本线程的回调,直接本线程处理:ReqMessagePtr msgPtr = msg;msg->callback->onDispatch(msgPtr);

//异步回调,放入回调处理线程中 _pObjectProxy->getCommunicatorEpoll()->pushAsyncThreadQueue(msg);

异步线程AsyncProcThread::run()处理异步请求回来的响应包:调用msg->callback->onDispatch(msgPtr);

jce自动生成的代码:调用了callback_doRequest如下图从而实现回包自动回调

virtual int onDispatch(tars::ReqMessagePtr msg) { static ::std::string __GateServant_all[]= { "doRequest", "test" }; pair<string*, string*> r = equal_range(__GateServant_all, __GateServant_all+2, string(msg->request.sFuncName)); if(r.first == r.second) return tars::TARSSERVERNOFUNCERR; switch(r.first - __GateServant_all) { case 0: { if (msg->response.iRet != tars::TARSSERVERSUCCESS) { callback_doRequest_exception(msg->response.iRet);

return msg->response.iRet; } tars::TarsInputStream<tars::BufferReader> _is;

_is.setBuffer(msg->response.sBuffer); tars::Int32 _ret; _is.read(_ret, 0, true);

std::string sResponse; _is.read(sResponse, 2, true); CallbackThreadData * pCbtd = CallbackThreadData::getData(); assert(pCbtd != NULL);

pCbtd->setResponseContext(msg->response.context);

callback_doRequest(_ret, sResponse);

pCbtd->delResponseContext();

return tars::TARSSERVERSUCCESS;

}

三、服务器启动:监听接收数据消息分发等

main函数调用 initializeServer //初始化Server部分 new出来TC_EpollServer 用于

* 注册协议解析器 * 注册逻辑处理器 * 注册管理端口处理器

并且设置TC_EpollServer 类的_netThreadNum网络线程个数:用于处理循环监听网路连接请求。如下:

_netThreads在初始化阶段是不启动的,只是先设置了参数,一般等到业务服务调用:waitForShutdown函数会启动所有netThreads线程:(_netThreads[i]->start();)

_netThreads具体做了什么事情如下:

void TC_EpollServer::NetThread::run()(需要检查该netthread管理的连接是否超时checkTimeout,如果超时的连接需要删除掉,然后开始调用epollwait等待epoll事件的发生,目前tars支持多个netthread线程,具体可以再配置文件配置 netthread参数,目前网络线程的配置数目不能大于15个)

const epoll_event &ev = _epoller.get(i);

case ET_LISTEN: (监听端口如果有请求则调用accept,进行ip地址检测并设置非阻塞、TcpNoDelay、CloseWait等参数,并增加连接数为该连接分配一个32位的唯一id 注还有册连接的epollin和epollout事件 备注:每个侦听端口都可以设置最大连接数和)

case ET_NET: //网络请求 processNet(ev);

函数读取网络数据 这里根据ev.data.u32的获取连接:Connection *cPtr = getConnectionPtr(uid);

然后调用NetThread::recvBuffer将数据缓存到_recvbuffer然后调用NetThread::Connection::parseProtocol

再将数据压入适配器的recv_queue _rbuffer;接收队列(后面会用业务线程处理接收队列,注意:这里会读配置文件每个Adapter的队列最大长度,超过接受队列长度的一半,需要进行overload处理)

创建好TC_EpollServer 接下来:

//绑定适配器对象和端口(支持多个) 回到Application::main函数 bindAdapter(adapters);该函数读取配置文件中配置的适配器创建TC_EpollServer::BindAdapterPtr,例如下图,配置了一个msdk适配器,endpoint字段配置了该适配器监听的IP和端口,protocol指定了是否使用taf协议,threads设置了处理收发包的网络线程数目

注意如果使用的taf协议则使用taf默认的协议处理函数,这里协议函数使用了loki库的封装机制,实现回调函数AppProtocol::parse的绑定,如果是非taf的协议,注意调用setProtocol设置自己的协议处理回调函数

if (bindAdapter->isTafProtocol()) { bindAdapter->setProtocol(AppProtocol::parse); }

接下来:调用_epollServer->bind(bindAdapter);调用NetThread::bind根据tcp或者udp创建socket;监听ip和端口

如果是tcp还会设置tcp参数:

s.setKeepAlive(); //心跳 s.setTcpNoDelay(); //禁用了Nagle算法 //不要设置close wait否则http服务回包主动关闭连接会有问题 s.setNoCloseWait();

//重新回到main函数:设置HandleGroup分组,启动线程

传入上图适配器配置文件设置的字段参数 threads代表线程个数 * 初始化处理线程,线程将会启动 */ template<typename T> void setHandle() { _pEpollServer->setHandleGroup<T>(_handleGroupName, _iHandleNum, this); }

//启动业务处理线程 _epollServer->startHandle();上一步设置的hds[i]->start();

_epollServer->createEpoll(); TC_Epoller是epoller操作类如果是TCP类,启动侦听端口:_epoller.add(it->first, H64(ET_LISTEN) | it->first, EPOLLIN);(注册id为1的ET_LISTEN事件,前面提到的netthread线程run会wait epoll的listen事件,处理新进的网络连接)

TC_EpollServer::Handle::run() 开始处理上面提到的netthread接收到的缓存队列

调用handleImp函数,这里遍历adapters所有适配器,adapter->waitForRecvQueue等待接收队列,如果队列中数据包超时了则调用handleTimeout,否则调用handle(stRecvData);进行消息包处理。

调用 ServantHandle::handle(const TC_EpollServer::tagRecvData &stRecvData)根据是否taf协议分别调用:handleTafProtocol(current);和handleNoTafProtocol(current)两个函数

如果是taf协议则继续调用sit->second->dispatch

这里要结合下jce定义rpc接口文件的生成代码,例如:

编译后自动生成代码 rpc调用 addVipGrowth接口,从而实现了rpc调用(从handleTafProtocol收到包,分发处理)

加入服务器监听端口处理完来自客户端的rpc调用之后,如果需要通过非taf协议发送给前台(例如:游戏网关收到来自逻辑服务器的下行push包,一般游戏服务器和客户端不采用rpc进行通讯,例如pb协议),那么需要调用current的sendResponse函数

cs->current->sendResponse(sMsgPack.c_str(), sMsgPack.length());触发 _pServantHandle->sendResponse(_uid, string(buff, len), _ip, _port);函数,触发 _pEpollServer->send(uid, sSendBuffer, ip, port);触发netThread->send(uid, s, ip, port);这里/

/通知epollserver的epoll响应, 有数据要发送 _epoller.mod(_notify.getfd(), H64(ET_NOTIFY), EPOLLOUT); 上面的TC_EpollServer::NetThread::run线程函数则会侦听到

EPOLLOUT事件,触发NetThread::sendBuffer(cPtr);触发NetThread::Connection::send触发write(_sock.getfd(), (const void*)(sendBegin + pos), sendLen - pos);于是数据就发送出去了

如果是普通taf协议的返回,则触发函数

JceCurrent::sendResponse(int iRet, const vector<char>& buffer, const map<string, string>& status, const string & sResultDesc)

触发:

JceOutputStream<BufferWriter> os;

string s = "";

s.append((const char*)&iHeaderLen, sizeof(taf::Int32)); //taf协议加入包头:包的长度

s.append(os.getBuffer(), os.getLength()); //加入包体

_pServantHandle->sendResponse(_uid, s, _ip, _port, _fd);如同上面的_pServantHandle->sendResponse流程,发rpc调用的返回参数发送回客户端

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档