Tars-cpp源码走读——网络模块初始化和网络连接处理

本文主要介绍Tars源码中,服务端如何开启网络接口监听,如何接收请求,转发请求(给业务处理模块), 其中涉及了Tars网络线程模型和IO复用模型。本文是我个人的笔记和总结,可能会有错误的地方,希望大家指正。网络线程

Tars-cpp服务端采用多线程模型。最多允许启动15个网络线程。可在配置文件中配置(/tars/application/server<netthread>),默认网络线程数是1.

## 网络线程

NetThread

网络线程使用class NetThread管理,每个NetThread对应一个网络线程。

class TC_EpollServer 保存一个NetThread的vector。

std::vector<NetThread*> _netThreads;

Server初始化时会构造TC_EpollServer对象,_epollServer = new (iNetThreadNum);; 在TC_EpollServer构造函数中会构造NetThread对象。

TC_EpollServer::TC_EpollServer(unsigned int iNetThreadNum)
{
    ......
    for (size_t i = 0; i < _netThreadNum; ++i)
    {
        TC_EpollServer::NetThread* netThreads = new TC_EpollServer::NetThread(this);
        _netThreads.push_back(netThreads);
    }
}

创建NetThread对象,并push到netThreads中。

创建、管理网络线程

有了NetThread对象,可以为每一个NetThread对象创建对应的线程。

Application::waitForQuit(),遍历了每一个Netthread对象,调用每个NetThread对象的start()函数,

void Application::waitForQuit()
{
    ......
    for (size_t i = 0; i < iNetThreadNum; ++i)
    {
        vNetThread[i]->start();
    }
    ......
}

TC_Thread::start() 中,创建线程。

    int ret = pthread_create(&_tid,
                   0,
                   (void *(*)(void *))&threadEntry,
                   (void *)this);

开启网络监听

tar网络服务跟一般的网络服务端程序一样需要执行以下步骤:

socket()-->bind()-->listen()-->accept() -->read()

tars-cpp 开启网络监听过程如下:

调用位置是

Application::main()
==> Application::bindAdapter()         //
==>TC_EpollServer::bind(TC_EpollServer::BindAdapterPtr &lsPtr);
==>NetThread::bind(BindAdapterPtr &lsPtr);         // 遍历每一个NetThread对象的bind
==>NetThread::bind(const TC_Endpoint &ep, TC_Socket &s)

NetThread::bind(const TC_Endpoint &ep, TC_Socket &s)中,

通过以下三个调用执行完成网路编程的socket()、bind()、和listen())

s.createSocket(SOCK_STREAM, type);
s.bind(ep.getHost(), ep.getPort());
s.listen(1024);

分别对应

TC_Socket::createSocket(int iSocketType, int iDomain)
TC_Socket::bind(const string &sServerAddr, int port)
TC_Socket::listen(int iConnBackLog)

至此,每一个NetThread对象绑定的线程都开启了网络监听。

IO复用模型

Tars-cpp使用epoll作为底层IO复用模块。

三个epoll函数的调用位置

int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

epoll_create

Application::main()
TC_EpollServer::createEpoll(); // _epollServer->createEpoll(); 触发
TC_EpollServer::NetThread::createEpoll(uint32_t iIndex); // _netThreads[i]->createEpoll(i+1); 触发
TC_Epoller::create(int max_connections) // 由_epoller.create(10240);触发
epoll_create()

epoll_ctl

Application::main()
TC_EpollServer::createEpoll(); // _epollServer->createEpoll(); 触发
TC_EpollServer::NetThread::createEpoll(uint32_t iIndex); // _netThreads[i]->createEpoll(i+1); 触发
TC_Epoller::add(...) // _epoller.add()
ctrl(fd, data, event, EPOLL_CTL_ADD);
epoll_ctl(_iEpollfd, op, fd, &ev);

epoll_wait

Application::waitForShutdown()
waitForQuit();
TC_Thread::start();   // 调用vNetThread[i]->start();
pthread_create(&_tid, 0, (void *(*)(void *))&threadEntry, (void *)this);
TC_Thread::threadEntry(TC_Thread *pThread); // pthread_create的执行例程
NetThread::run()  // 调用pThread->run();触发, 使用了多态
TC_Epoller::wait(int millsecond) //  _epoller.wait(2000);
epoll_wait()

TC_Epoller

前面介绍了三个epoll函数的调用过程。实际上,Tars-cpp使用TC_Epoller类封装了epoll的调用细节,在tars-cpp的server实现上只需要关注TC_Epoller的接口即可。

Tars-cpp的IO复用相关的编程,基于TC_Epoller。比如要遍历每个NetThread执行epoll_wait(),实际上是遍历调用TC_Epoller::wait(int millsecond)。

因此,了解TC_Epoller类,有助于更好理解Tars-cpp的IO复用的实现。

IO复用工作过程

void TC_EpollServer::NetThread::run() 可以认为是每个网络线程的执行例程。

NetThread::run()中,会有一个while训练,来调用TC_Epoller::wait(int millsecond)

void TC_EpollServer::NetThread::run()
{
    //循环监听网路连接请求
    while(!_bTerminate)
    {
        _list.checkTimeout(TNOW);

        int iEvNum = _epoller.wait(2000);

        for(int i = 0; i < iEvNum; ++i)
        {
            try
            {
                const epoll_event &ev = _epoller.get(i);

                uint32_t h = ev.data.u64 >> 32;

                switch(h)
                {
                case ET_LISTEN:
                    {
                        //监听端口有请求
                        auto it = _listeners.find(ev.data.u32);
                        if( it != _listeners.end())
                        {
                            if(ev.events & EPOLLIN)
                            {
                                bool ret;
                                do
                                {
                                    ret = accept(ev.data.u32, it->second->_ep.isIPv6() ? AF_INET6 : AF_INET);
                                }while(ret);
                            }
                        }
                    }
                    break;
                case ET_CLOSE:
                    //关闭请求
                    break;
                case ET_NOTIFY:
                    //发送通知
                    processPipe();
                    break;
                case ET_NET:
                    //网络请求
                    processNet(ev);
                    break;
                default:
                    assert(true);
                }
            }
            catch(exception &ex)
            {
                error("run exception:" + string(ex.what()));
            }
        }
    }
}

每当有事件到达或者超时时,_epoller.wait(2000);返回。

然后通过switch(h),判断事件类型。

通常,我们是直接使用switch(描述符) 来区分是哪个描述符上的事件。但是tars这里直接对描述符进行了分类,从而只判断是哪个类别的描述符时间。其关键点在于:uint32_t h = ev.data.u64 >> 32;。通过ev.data.u64左移32bit来区分事件类型。那么ev.data.u64保存什么东西?

ev.data.u64在TC_Epoller::ctrl 处赋值,

void TC_Epoller::ctrl(int fd, long long data, __uint32_t events, int op)
{
    struct epoll_event ev;
    ev.data.u64 = data;
    ......
    epoll_ctl(_iEpollfd, op, fd, &ev);
}

参数data的最初来源是:

void TC_EpollServer::NetThread::createEpoll(uint32_t iIndex)
{
        ... 
        //监听socket
        for (const auto& kv : _listeners)
        {
            if(kv.second->getEndpoint().isTcp())
            {
                ...
                _epoller.add(kv.first, H64(ET_LISTEN) | kv.first, EPOLLIN);
            }
            ...
        }
    ......
}

来自_epoller.add(kv.first, H64(ET_LISTEN) | kv.first, EPOLLIN);

这里的kv.first是监听的描述符(32bit), H64的定义如下

 #define H64(x) (((uint64_t)x) << 32)

可以见, ev.data.u64 是事件类型和事件描述符的组合。既存储了监听描述符,有存储了事件类型。

ET_LISTEN,表示有客户端发起连接。

监听到ET_LISTEN时,调用NetThread::accept(int fd, int domain)处理连接请求

处理连接请求

当有客户端想Tars-cpp server发起连接请求时,会触发NetThread::accept(int fd, int domain)函数;该函数会做一些基本的检查(如现有连接数是否已经达到最大值了)以及连接建立前的初始化工作。一切OK后,会创建新的连接。

bool TC_EpollServer::NetThread::accept(int fd, int domain)
{
    ......
    int iRetCode = s.accept(cs, stSockAddr, iSockAddrSize);
    ......
    Connection *cPtr = new Connection(_listeners[fd].get(), fd, (timeout < 2 ? 2 : timeout), cs.getfd(), ip, port);
    ......
        _epollServer->addConnection(cPtr, cs.getfd(), TCP_CONNECTION);
    ......
}

NetThread::addUdpConnection(TC_EpollServer::NetThread::Connection *cPtr)会添加对新建连接的监听。

虽然这里没有对ev.data.u64的前面32bit赋值(前32bit被置为0),但是0依然有意义的。因为在NetThread::run()中,case ET_NET:类型对应的值就是0.

        //定义事件类型
        enum
        {
            ET_LISTEN = 1,
            ET_CLOSE = 2,
            ET_NOTIFY = 3,
            ET_NET = 0,
        };

因此,与客户端的网络通信最终是在**case ET_NET:**中处理。

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区

领取腾讯云代金券