前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Tars-cpp源码走读——网络模块初始化和网络连接处理

Tars-cpp源码走读——网络模块初始化和网络连接处理

原创
作者头像
windealli
修改2018-12-24 20:46:36
1.3K0
修改2018-12-24 20:46:36
举报
文章被收录于专栏:windealliwindealli

本文主要介绍Tars源码中,服务端如何开启网络接口监听,如何接收请求,转发请求(给业务处理模块), 其中涉及了Tars网络线程模型和IO复用模型。本文是我个人的笔记和总结,可能会有错误的地方,希望大家指正。网络线程

Tars-cpp服务端采用多线程模型。最多允许启动15个网络线程。可在配置文件中配置(/tars/application/server<netthread>),默认网络线程数是1.

## 网络线程

NetThread

网络线程使用class NetThread管理,每个NetThread对应一个网络线程。

class TC_EpollServer 保存一个NetThread的vector。

代码语言:txt
复制
std::vector<NetThread*> _netThreads;

Server初始化时会构造TC_EpollServer对象,_epollServer = new (iNetThreadNum);; 在TC_EpollServer构造函数中会构造NetThread对象。

代码语言:txt
复制
TC_EpollServer::TC_EpollServer(unsigned int iNetThreadNum)
{
    ......
    for (size_t i = 0; i < _netThreadNum; ++i)
    {
        TC_EpollServer::NetThread* netThreads = new TC_EpollServer::NetThread(this);
        _netThreads.push_back(netThreads);
    }
}

创建NetThread对象,并push到netThreads中。

创建、管理网络线程

有了NetThread对象,可以为每一个NetThread对象创建对应的线程。

Application::waitForQuit(),遍历了每一个Netthread对象,调用每个NetThread对象的start()函数,

代码语言:txt
复制
void Application::waitForQuit()
{
    ......
    for (size_t i = 0; i < iNetThreadNum; ++i)
    {
        vNetThread[i]->start();
    }
    ......
}

TC_Thread::start() 中,创建线程。

代码语言:txt
复制
    int ret = pthread_create(&_tid,
                   0,
                   (void *(*)(void *))&threadEntry,
                   (void *)this);

开启网络监听

tar网络服务跟一般的网络服务端程序一样需要执行以下步骤:

socket()-->bind()-->listen()-->accept() -->read()

tars-cpp 开启网络监听过程如下:

调用位置是

代码语言:txt
复制
Application::main()
==> Application::bindAdapter()         //
==>TC_EpollServer::bind(TC_EpollServer::BindAdapterPtr &lsPtr);
==>NetThread::bind(BindAdapterPtr &lsPtr);         // 遍历每一个NetThread对象的bind
==>NetThread::bind(const TC_Endpoint &ep, TC_Socket &s)

NetThread::bind(const TC_Endpoint &ep, TC_Socket &s)中,

通过以下三个调用执行完成网路编程的socket()、bind()、和listen())

代码语言:txt
复制
s.createSocket(SOCK_STREAM, type);
s.bind(ep.getHost(), ep.getPort());
s.listen(1024);

分别对应

代码语言:txt
复制
TC_Socket::createSocket(int iSocketType, int iDomain)
TC_Socket::bind(const string &sServerAddr, int port)
TC_Socket::listen(int iConnBackLog)

至此,每一个NetThread对象绑定的线程都开启了网络监听。

IO复用模型

Tars-cpp使用epoll作为底层IO复用模块。

三个epoll函数的调用位置

代码语言:txt
复制
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
epoll_create
代码语言:txt
复制
Application::main()
TC_EpollServer::createEpoll(); // _epollServer->createEpoll(); 触发
TC_EpollServer::NetThread::createEpoll(uint32_t iIndex); // _netThreads[i]->createEpoll(i+1); 触发
TC_Epoller::create(int max_connections) // 由_epoller.create(10240);触发
epoll_create()
epoll_ctl
代码语言:txt
复制
Application::main()
TC_EpollServer::createEpoll(); // _epollServer->createEpoll(); 触发
TC_EpollServer::NetThread::createEpoll(uint32_t iIndex); // _netThreads[i]->createEpoll(i+1); 触发
TC_Epoller::add(...) // _epoller.add()
ctrl(fd, data, event, EPOLL_CTL_ADD);
epoll_ctl(_iEpollfd, op, fd, &ev);
epoll_wait
代码语言:txt
复制
Application::waitForShutdown()
waitForQuit();
TC_Thread::start();   // 调用vNetThread[i]->start();
pthread_create(&_tid, 0, (void *(*)(void *))&threadEntry, (void *)this);
TC_Thread::threadEntry(TC_Thread *pThread); // pthread_create的执行例程
NetThread::run()  // 调用pThread->run();触发, 使用了多态
TC_Epoller::wait(int millsecond) //  _epoller.wait(2000);
epoll_wait()

TC_Epoller

前面介绍了三个epoll函数的调用过程。实际上,Tars-cpp使用TC_Epoller类封装了epoll的调用细节,在tars-cpp的server实现上只需要关注TC_Epoller的接口即可。

Tars-cpp的IO复用相关的编程,基于TC_Epoller。比如要遍历每个NetThread执行epoll_wait(),实际上是遍历调用TC_Epoller::wait(int millsecond)。

因此,了解TC_Epoller类,有助于更好理解Tars-cpp的IO复用的实现。

IO复用工作过程

void TC_EpollServer::NetThread::run() 可以认为是每个网络线程的执行例程。

NetThread::run()中,会有一个while训练,来调用TC_Epoller::wait(int millsecond)

代码语言:txt
复制
void TC_EpollServer::NetThread::run()
{
    //循环监听网路连接请求
    while(!_bTerminate)
    {
        _list.checkTimeout(TNOW);

        int iEvNum = _epoller.wait(2000);

        for(int i = 0; i < iEvNum; ++i)
        {
            try
            {
                const epoll_event &ev = _epoller.get(i);

                uint32_t h = ev.data.u64 >> 32;

                switch(h)
                {
                case ET_LISTEN:
                    {
                        //监听端口有请求
                        auto it = _listeners.find(ev.data.u32);
                        if( it != _listeners.end())
                        {
                            if(ev.events & EPOLLIN)
                            {
                                bool ret;
                                do
                                {
                                    ret = accept(ev.data.u32, it->second->_ep.isIPv6() ? AF_INET6 : AF_INET);
                                }while(ret);
                            }
                        }
                    }
                    break;
                case ET_CLOSE:
                    //关闭请求
                    break;
                case ET_NOTIFY:
                    //发送通知
                    processPipe();
                    break;
                case ET_NET:
                    //网络请求
                    processNet(ev);
                    break;
                default:
                    assert(true);
                }
            }
            catch(exception &ex)
            {
                error("run exception:" + string(ex.what()));
            }
        }
    }
}

每当有事件到达或者超时时,_epoller.wait(2000);返回。

然后通过switch(h),判断事件类型。

通常,我们是直接使用switch(描述符) 来区分是哪个描述符上的事件。但是tars这里直接对描述符进行了分类,从而只判断是哪个类别的描述符时间。其关键点在于:uint32_t h = ev.data.u64 >> 32;。通过ev.data.u64左移32bit来区分事件类型。那么ev.data.u64保存什么东西?

ev.data.u64在TC_Epoller::ctrl 处赋值,

代码语言:txt
复制
void TC_Epoller::ctrl(int fd, long long data, __uint32_t events, int op)
{
    struct epoll_event ev;
    ev.data.u64 = data;
    ......
    epoll_ctl(_iEpollfd, op, fd, &ev);
}

参数data的最初来源是:

代码语言:txt
复制
void TC_EpollServer::NetThread::createEpoll(uint32_t iIndex)
{
        ... 
        //监听socket
        for (const auto& kv : _listeners)
        {
            if(kv.second->getEndpoint().isTcp())
            {
                ...
                _epoller.add(kv.first, H64(ET_LISTEN) | kv.first, EPOLLIN);
            }
            ...
        }
    ......
}

来自_epoller.add(kv.first, H64(ET_LISTEN) | kv.first, EPOLLIN);

这里的kv.first是监听的描述符(32bit), H64的定义如下

代码语言:txt
复制
 #define H64(x) (((uint64_t)x) << 32)

可以见, ev.data.u64 是事件类型和事件描述符的组合。既存储了监听描述符,有存储了事件类型。

ET_LISTEN,表示有客户端发起连接。

监听到ET_LISTEN时,调用NetThread::accept(int fd, int domain)处理连接请求

处理连接请求

当有客户端想Tars-cpp server发起连接请求时,会触发NetThread::accept(int fd, int domain)函数;该函数会做一些基本的检查(如现有连接数是否已经达到最大值了)以及连接建立前的初始化工作。一切OK后,会创建新的连接。

代码语言:txt
复制
bool TC_EpollServer::NetThread::accept(int fd, int domain)
{
    ......
    int iRetCode = s.accept(cs, stSockAddr, iSockAddrSize);
    ......
    Connection *cPtr = new Connection(_listeners[fd].get(), fd, (timeout < 2 ? 2 : timeout), cs.getfd(), ip, port);
    ......
        _epollServer->addConnection(cPtr, cs.getfd(), TCP_CONNECTION);
    ......
}

NetThread::addUdpConnection(TC_EpollServer::NetThread::Connection *cPtr)会添加对新建连接的监听。

虽然这里没有对ev.data.u64的前面32bit赋值(前32bit被置为0),但是0依然有意义的。因为在NetThread::run()中,case ET_NET:类型对应的值就是0.

代码语言:txt
复制
        //定义事件类型
        enum
        {
            ET_LISTEN = 1,
            ET_CLOSE = 2,
            ET_NOTIFY = 3,
            ET_NET = 0,
        };

因此,与客户端的网络通信最终是在**case ET_NET:**中处理。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • NetThread
  • 创建、管理网络线程
  • 开启网络监听
  • IO复用模型
    • 三个epoll函数的调用位置
      • epoll_create
      • epoll_ctl
      • epoll_wait
    • TC_Epoller
      • IO复用工作过程
      • 处理连接请求
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档