本文主要介绍Tars源码中,服务端如何开启网络接口监听,如何接收请求,转发请求(给业务处理模块), 其中涉及了Tars网络线程模型和IO复用模型。本文是我个人的笔记和总结,可能会有错误的地方,希望大家指正。网络线程
Tars-cpp服务端采用多线程模型。最多允许启动15个网络线程。可在配置文件中配置(/tars/application/server<netthread>
),默认网络线程数是1.
## 网络线程
网络线程使用class NetThread
管理,每个NetThread对应一个网络线程。
class TC_EpollServer
保存一个NetThread
的vector。
std::vector<NetThread*> _netThreads;
Server初始化时会构造TC_EpollServer对象,_epollServer = new (iNetThreadNum);
; 在TC_EpollServer构造函数中会构造NetThread
对象。
TC_EpollServer::TC_EpollServer(unsigned int iNetThreadNum)
{
......
for (size_t i = 0; i < _netThreadNum; ++i)
{
TC_EpollServer::NetThread* netThreads = new TC_EpollServer::NetThread(this);
_netThreads.push_back(netThreads);
}
}
创建NetThread对象,并push到netThreads中。
有了NetThread对象,可以为每一个NetThread对象创建对应的线程。
在Application::waitForQuit()
,遍历了每一个Netthread对象,调用每个NetThread对象的start()函数,
void Application::waitForQuit()
{
......
for (size_t i = 0; i < iNetThreadNum; ++i)
{
vNetThread[i]->start();
}
......
}
在TC_Thread::start()
中,创建线程。
int ret = pthread_create(&_tid,
0,
(void *(*)(void *))&threadEntry,
(void *)this);
tar网络服务跟一般的网络服务端程序一样需要执行以下步骤:
socket()-->bind()-->listen()-->accept() -->read()
tars-cpp 开启网络监听过程如下:
调用位置是
Application::main()
==> Application::bindAdapter() //
==>TC_EpollServer::bind(TC_EpollServer::BindAdapterPtr &lsPtr);
==>NetThread::bind(BindAdapterPtr &lsPtr); // 遍历每一个NetThread对象的bind
==>NetThread::bind(const TC_Endpoint &ep, TC_Socket &s)
在NetThread::bind(const TC_Endpoint &ep, TC_Socket &s)
中,
通过以下三个调用执行完成网路编程的socket()、bind()、和listen())
s.createSocket(SOCK_STREAM, type);
s.bind(ep.getHost(), ep.getPort());
s.listen(1024);
分别对应
TC_Socket::createSocket(int iSocketType, int iDomain)
TC_Socket::bind(const string &sServerAddr, int port)
TC_Socket::listen(int iConnBackLog)
至此,每一个NetThread对象绑定的线程都开启了网络监听。
Tars-cpp使用epoll作为底层IO复用模块。
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
Application::main()
TC_EpollServer::createEpoll(); // _epollServer->createEpoll(); 触发
TC_EpollServer::NetThread::createEpoll(uint32_t iIndex); // _netThreads[i]->createEpoll(i+1); 触发
TC_Epoller::create(int max_connections) // 由_epoller.create(10240);触发
epoll_create()
Application::main()
TC_EpollServer::createEpoll(); // _epollServer->createEpoll(); 触发
TC_EpollServer::NetThread::createEpoll(uint32_t iIndex); // _netThreads[i]->createEpoll(i+1); 触发
TC_Epoller::add(...) // _epoller.add()
ctrl(fd, data, event, EPOLL_CTL_ADD);
epoll_ctl(_iEpollfd, op, fd, &ev);
Application::waitForShutdown()
waitForQuit();
TC_Thread::start(); // 调用vNetThread[i]->start();
pthread_create(&_tid, 0, (void *(*)(void *))&threadEntry, (void *)this);
TC_Thread::threadEntry(TC_Thread *pThread); // pthread_create的执行例程
NetThread::run() // 调用pThread->run();触发, 使用了多态
TC_Epoller::wait(int millsecond) // _epoller.wait(2000);
epoll_wait()
前面介绍了三个epoll函数的调用过程。实际上,Tars-cpp使用TC_Epoller类封装了epoll的调用细节,在tars-cpp的server实现上只需要关注TC_Epoller的接口即可。
Tars-cpp的IO复用相关的编程,基于TC_Epoller。比如要遍历每个NetThread执行epoll_wait(),实际上是遍历调用TC_Epoller::wait(int millsecond)。
因此,了解TC_Epoller类,有助于更好理解Tars-cpp的IO复用的实现。
void TC_EpollServer::NetThread::run()
可以认为是每个网络线程的执行例程。
在NetThread::run()
中,会有一个while训练,来调用TC_Epoller::wait(int millsecond)
void TC_EpollServer::NetThread::run()
{
//循环监听网路连接请求
while(!_bTerminate)
{
_list.checkTimeout(TNOW);
int iEvNum = _epoller.wait(2000);
for(int i = 0; i < iEvNum; ++i)
{
try
{
const epoll_event &ev = _epoller.get(i);
uint32_t h = ev.data.u64 >> 32;
switch(h)
{
case ET_LISTEN:
{
//监听端口有请求
auto it = _listeners.find(ev.data.u32);
if( it != _listeners.end())
{
if(ev.events & EPOLLIN)
{
bool ret;
do
{
ret = accept(ev.data.u32, it->second->_ep.isIPv6() ? AF_INET6 : AF_INET);
}while(ret);
}
}
}
break;
case ET_CLOSE:
//关闭请求
break;
case ET_NOTIFY:
//发送通知
processPipe();
break;
case ET_NET:
//网络请求
processNet(ev);
break;
default:
assert(true);
}
}
catch(exception &ex)
{
error("run exception:" + string(ex.what()));
}
}
}
}
每当有事件到达或者超时时,_epoller.wait(2000);
返回。
然后通过switch(h)
,判断事件类型。
通常,我们是直接使用switch(描述符)
来区分是哪个描述符上的事件。但是tars这里直接对描述符进行了分类,从而只判断是哪个类别的描述符时间。其关键点在于:uint32_t h = ev.data.u64 >> 32;
。通过ev.data.u64左移32bit来区分事件类型。那么ev.data.u64
保存什么东西?
ev.data.u64在TC_Epoller::ctrl
处赋值,
void TC_Epoller::ctrl(int fd, long long data, __uint32_t events, int op)
{
struct epoll_event ev;
ev.data.u64 = data;
......
epoll_ctl(_iEpollfd, op, fd, &ev);
}
参数data的最初来源是:
void TC_EpollServer::NetThread::createEpoll(uint32_t iIndex)
{
...
//监听socket
for (const auto& kv : _listeners)
{
if(kv.second->getEndpoint().isTcp())
{
...
_epoller.add(kv.first, H64(ET_LISTEN) | kv.first, EPOLLIN);
}
...
}
......
}
来自_epoller.add(kv.first, H64(ET_LISTEN) | kv.first, EPOLLIN);
,
这里的kv.first是监听的描述符(32bit), H64的定义如下
#define H64(x) (((uint64_t)x) << 32)
可以见, ev.data.u64 是事件类型和事件描述符的组合。既存储了监听描述符,有存储了事件类型。
ET_LISTEN
,表示有客户端发起连接。
监听到ET_LISTEN
时,调用NetThread::accept(int fd, int domain)
处理连接请求
当有客户端想Tars-cpp server发起连接请求时,会触发NetThread::accept(int fd, int domain)
函数;该函数会做一些基本的检查(如现有连接数是否已经达到最大值了)以及连接建立前的初始化工作。一切OK后,会创建新的连接。
bool TC_EpollServer::NetThread::accept(int fd, int domain)
{
......
int iRetCode = s.accept(cs, stSockAddr, iSockAddrSize);
......
Connection *cPtr = new Connection(_listeners[fd].get(), fd, (timeout < 2 ? 2 : timeout), cs.getfd(), ip, port);
......
_epollServer->addConnection(cPtr, cs.getfd(), TCP_CONNECTION);
......
}
在NetThread::addUdpConnection(TC_EpollServer::NetThread::Connection *cPtr)
会添加对新建连接的监听。
虽然这里没有对ev.data.u64的前面32bit赋值(前32bit被置为0),但是0依然有意义的。因为在NetThread::run()
中,case ET_NET:
类型对应的值就是0.
//定义事件类型
enum
{
ET_LISTEN = 1,
ET_CLOSE = 2,
ET_NOTIFY = 3,
ET_NET = 0,
};
因此,与客户端的网络通信最终是在**case ET_NET:
**中处理。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。