专栏首页高性能服务器开发(八)高性能服务器架构设计总结2——以flamigo服务器代码为例

(八)高性能服务器架构设计总结2——以flamigo服务器代码为例

系列目录

第01篇 主线程与工作线程的分工

第02篇 Reactor模式

第03篇 一个服务器程序的架构介绍

第04篇 如何将socket设置为非阻塞模式

第05篇 如何编写高性能日志

第06篇 关于网络编程的一些实用技巧和细节

第07篇 开源一款即时通讯软件的源码

第08篇 高性能服务器架构设计总结1

第09篇 高性能服务器架构设计总结2

第10篇 高性能服务器架构设计总结3

第11篇 高性能服务器架构设计总结4

说了这么多,我们来以flamingo的服务器程序的网络框架设计为例来验证上述介绍的理论。flamingo的网络框架是基于陈硕的muduo库,改成C++11的版本,并修改了一些bug。在此感谢原作者陈硕。flamingo的源码可以在这里下载:https://github.com/baloonwj/flamingo,打不开github的可以移步csdn:http://download.csdn.net/detail/analogous_love/9805797。

上文介绍的核心线程函数的while循环位于eventloop.cpp中:

 1void EventLoop::loop()  
 2{  
 3    assert(!looping_);  
 4    assertInLoopThread();  
 5    looping_ = true;  
 6    quit_ = false;
 7  // FIXME: what if someone calls quit() before loop() ?  
 8    LOG_TRACE << "EventLoop " << this << " start looping";  
 9
10    while (!quit_)  
11    {  
12        activeChannels_.clear();  
13        pollReturnTime_ = poller_->poll(kPollTimeMs,
14                                        &activeChannels_);  
15        ++iteration_;  
16        if (Logger::logLevel() <= Logger::TRACE)  
17        {  
18            printActiveChannels();  
19        }  
20        // TODO sort channel by priority  
21        eventHandling_ = true;  
22        for (ChannelList::iterator it = activeChannels_.begin();  
23            it != activeChannels_.end(); ++it)  
24        {  
25            currentActiveChannel_ = *it;  
26            currentActiveChannel_->handleEvent(pollReturnTime_);  
27        }  
28        currentActiveChannel_ = NULL;  
29        eventHandling_ = false;  
30        doPendingFunctors();  
31
32        if (frameFunctor_)  
33        {  
34            frameFunctor_();  
35        }         
36    }  
37
38    LOG_TRACE << "EventLoop " << this << " stop looping";  
39    looping_ = false;  
40} 

poller_->poll利用epoll分离网络事件,然后接着处理分离出来的网络事件,每一个客户端socket对应一个连接,即一个TcpConnection和Channel通道对象。currentActiveChannel_->handleEvent(pollReturnTime_)根据是可读、可写、出错事件来调用对应的处理函数,这些函数都是回调函数,程序初始化阶段设置进来的:

 1void Channel::handleEvent(Timestamp receiveTime)  
 2{  
 3    std::shared_ptr<void> guard;  
 4    if (tied_)  
 5    {  
 6        guard = tie_.lock();  
 7        if (guard)  
 8        {  
 9            handleEventWithGuard(receiveTime);  
10        }  
11    }  
12    else  
13    {  
14        handleEventWithGuard(receiveTime);  
15    }  
16}  
17
18void Channel::handleEventWithGuard(Timestamp receiveTime)  
19{  
20    eventHandling_ = true;  
21    LOG_TRACE << reventsToString();  
22    if ((revents_ & POLLHUP) && !(revents_ & POLLIN))  
23    {  
24        if (logHup_)  
25        {  
26            LOG_WARN << "Channel::handle_event() POLLHUP";  
27        }  
28        if (closeCallback_) closeCallback_();  
29    }  
30
31    if (revents_ & POLLNVAL)  
32    {  
33        LOG_WARN << "Channel::handle_event() POLLNVAL";  
34    }  
35
36    if (revents_ & (POLLERR | POLLNVAL))  
37    {  
38        if (errorCallback_) errorCallback_();  
39    }  
40    if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))  
41    {  
42        //当是侦听socket时,readCallback_指向Acceptor::handleRead  
43        //当是客户端socket时,调用TcpConnection::handleRead   
44        if (readCallback_) readCallback_(receiveTime);  
45    }  
46    if (revents_ & POLLOUT)  
47    {  
48        //如果是连接状态服的socket,
49        //则writeCallback_指向Connector::handleWrite()  
50        if (writeCallback_) writeCallback_();  
51    }  
52    eventHandling_ = false;  
53}  

当然,这里利用了Channel对象的“多态性”

  • 如果是普通socket,可读事件就会调用预先设置的回调函数;
  • 但是如果是侦听socket,则调用Aceptor对象的handleRead()来接收新连接:
 1void Acceptor::handleRead()  
 2{  
 3    loop_->assertInLoopThread();  
 4    InetAddress peerAddr;  
 5    //FIXME loop until no more  
 6    int connfd = acceptSocket_.accept(&peerAddr);  
 7    if (connfd >= 0)  
 8    {  
 9        // string hostport = peerAddr.toIpPort();  
10        // LOG_TRACE << "Accepts of " << hostport;  
11        //newConnectionCallback_实际指向:
12        //TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)  
13        if (newConnectionCallback_)  
14        {  
15            newConnectionCallback_(connfd, peerAddr);  
16        }  
17        else  
18        {  
19            sockets::close(connfd);  
20        }  
21    }  
22    else  
23    {  
24        LOG_SYSERR << "in Acceptor::handleRead";  
25        // Read the section named "The special problem of  
26        // accept()ing when you can't" in libev's doc.  
27        // By Marc Lehmann, author of livev.  
28        if (errno == EMFILE)  
29        {  
30            ::close(idleFd_);  
31            idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL);  
32            ::close(idleFd_);  
33            idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC);  
34        }  
35    }  
36}

主循环里面的业务逻辑处理对应:

 1doPendingFunctors();  
 2
 3if (frameFunctor_)  
 4{  
 5   frameFunctor_();  
 6}         
 7
 8
 9void EventLoop::doPendingFunctors()  
10{  
11    std::vector<Functor> functors;  
12    callingPendingFunctors_ = true;  
13
14    {  
15        std::unique_lock<std::mutex> lock(mutex_);  
16        functors.swap(pendingFunctors_);  
17    }  
18
19    for (size_t i = 0; i < functors.size(); ++i)  
20    {  
21        functors[i]();  
22    }  
23    callingPendingFunctors_ = false;  
24}  

这里增加业务逻辑是增加执行任务的函数指针的,增加的任务保存在成员变量pendingFunctors_中,这个变量是一个函数指针数组(vector对象),执行的时候,调用每个函数就可以了。上面的代码先利用一个栈变量将成员变量pendingFunctors_里面的函数指针换过来,接下来对这个栈变量进行操作就可以了,这样减少了锁的粒度。

因为成员变量pendingFunctors_在增加任务的时候,也会被用到,设计到多个线程操作,所以要加锁,增加任务的地方是:

 1void EventLoop::queueInLoop(const Functor& cb)  
 2{  
 3    {  
 4        std::unique_lock<std::mutex> lock(mutex_);  
 5        pendingFunctors_.push_back(cb);  
 6    }  
 7
 8    if (!isInLoopThread() || callingPendingFunctors_)  
 9    {  
10        wakeup();  
11    }  
12}

frameFunctor_就更简单了,就是通过设置一个函数指针就可以了。当然这里有个技巧性的东西,即增加任务的时候,为了能够立即执行,使用唤醒机制,通过往一个fd里面写入简单的几个字节,来唤醒epoll,使其立刻返回,因为此时没有其它的socke有事件,这样接下来就执行刚才添加的任务了。

我们看一下数据收取的逻辑:

 1void TcpConnection::handleRead(Timestamp receiveTime)  
 2{  
 3    loop_->assertInLoopThread();  
 4    int savedErrno = 0;  
 5    ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);  
 6    if (n > 0)  
 7    {  
 8        /*messageCallback_指向
 9       CTcpSession::OnRead(const std::shared_ptr<TcpConnection>& conn,
10                           Buffer* pBuffer, 
11                           Timestamp receiveTime)*/  
12        messageCallback_(shared_from_this(),
13                         &inputBuffer_, receiveTime);  
14    }  
15    else if (n == 0)  
16    {  
17        handleClose();  
18    }  
19    else  
20    {  
21        errno = savedErrno;  
22        LOG_SYSERR << "TcpConnection::handleRead";  
23        handleError();  
24    }  
25} 

将收到的数据放到接收缓冲区里面,将来我们来解包:

 1void ClientSession::OnRead(const std::shared_ptr<TcpConnection>& conn, Buffer* pBuffer, Timestamp receivTime)  
 2{  
 3    while (true)  
 4    {  
 5        //不够一个包头大小  
 6        if (pBuffer->readableBytes() < (size_t)sizeof(msg))  
 7        {  
 8            LOG_INFO << "buffer is not enough for a package header, pBuffer->readableBytes()=" << pBuffer->readableBytes() << ", sizeof(msg)=" << sizeof(msg);  
 9            return;  
10        }  
11
12        //不够一个整包大小  
13        msg header;  
14        memcpy(&header, pBuffer->peek(), sizeof(msg));  
15        if (pBuffer->readableBytes() < (size_t)header.packagesize + sizeof(msg))  
16            return;  
17
18        pBuffer->retrieve(sizeof(msg));  
19        std::string inbuf;  
20        inbuf.append(pBuffer->peek(), header.packagesize);  
21        pBuffer->retrieve(header.packagesize);  
22        if (!Process(conn, inbuf.c_str(), inbuf.length()))  
23        {  
24            LOG_WARN << "Process error, close TcpConnection";  
25            conn->forceClose();  
26        }  
27    }// end while-loop  
28
29}  

判断接收缓冲区里面的数据是否够一个包头大小,如果够再判断够不够包头指定的包体大小,如果还是够的话,接着在Process函数里面处理该包。

再看看发送数据的逻辑:

 1void TcpConnection::sendInLoop(const void* data, 
 2                               size_t len)  
 3{  
 4    loop_->assertInLoopThread();  
 5    ssize_t nwrote = 0;  
 6    size_t remaining = len;  
 7    bool faultError = false;  
 8    if (state_ == kDisconnected)  
 9    {  
10        LOG_WARN << "disconnected, give up writing";  
11        return;  
12    }  
13    // if no thing in output queue, try writing directly  
14    if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0)  
15    {  
16        nwrote = sockets::write(channel_->fd(), data, len);  
17        if (nwrote >= 0)  
18        {  
19            remaining = len - nwrote;  
20            if (remaining == 0 && writeCompleteCallback_)  
21            {  
22                loop_->queueInLoop(std::bind(writeCompleteCallback_, 
23                                             shared_from_this()));  
24            }  
25        }  
26        else // nwrote < 0  
27        {  
28            nwrote = 0;  
29            if (errno != EWOULDBLOCK)  
30            {  
31                LOG_SYSERR << "TcpConnection::sendInLoop";
32                // FIXME: any others?   
33                if (errno == EPIPE || errno == ECONNRESET)  
34                {  
35                    faultError = true;  
36                }  
37            }  
38        }  
39    }  
40
41    assert(remaining <= len);  
42    if (!faultError && remaining > 0)  
43    {  
44        size_t oldLen = outputBuffer_.readableBytes();  
45        if (oldLen + remaining >= highWaterMark_                          
46            && oldLen < highWaterMark_  
47            && highWaterMarkCallback_)  
48        {  
49            loop_->queueInLoop(std::bind(highWaterMarkCallback_,
50                                         shared_from_this(),
51                                         oldLen + remaining));  
52        }  
53        outputBuffer_.append(static_cast<const char*>(data)+nwrote,
54                             remaining);  
55        if (!channel_->isWriting())  
56        {  
57            channel_->enableWriting();  
58        }  
59    }  
60} 

如果剩余的数据remaining大于则调用channel_->enableWriting();开始监听可写事件,可写事件处理如下:

 1void TcpConnection::handleWrite()  
 2{  
 3    loop_->assertInLoopThread();  
 4    if (channel_->isWriting())  
 5    {  
 6        ssize_t n = sockets::write(channel_->fd(),  
 7            outputBuffer_.peek(),  
 8            outputBuffer_.readableBytes());  
 9        if (n > 0)  
10        {  
11            outputBuffer_.retrieve(n);  
12            if (outputBuffer_.readableBytes() == 0)  
13            {  
14                channel_->disableWriting();  
15                if (writeCompleteCallback_)  
16                {  
17                    loop_->queueInLoop(std::bind(writeCompleteCallback_, 
18                                       shared_from_this()));  
19                }   
20                if (state_ == kDisconnecting)  
21                {  
22                    shutdownInLoop();  
23                }  
24            }  
25        }  
26        else  
27        {  
28            LOG_SYSERR << "TcpConnection::handleWrite";  
29            // if (state_ == kDisconnecting)  
30            // {  
31            //   shutdownInLoop();  
32            // }  
33        }  
34    }  
35    else  
36    {  
37        LOG_TRACE << "Connection fd = " << channel_->fd()  
38            << " is down, no more writing";  
39    }  
40}

如果发送完数据以后调用channel_->disableWriting();移除监听可写事件。

很多读者可能一直想问,文中不是说解包数据并处理逻辑是业务代码而非网络通信的代码,你这里貌似都混在一起了,其实没有,这里实际的业务代码处理都是框架曾提供的回调函数里面处理的,具体怎么处理,由框架使用者——业务层自己定义。

总结起来,实际上就是一个线程函数里一个loop那么点事情,不信你再看我曾经工作上的一个交易系统项目代码:

 1void CEventDispatcher::Run()  
 2{  
 3    m_bShouldRun = true;  
 4    while(m_bShouldRun)  
 5    {  
 6        DispatchIOs();        
 7        SyncTime();  
 8        CheckTimer();  
 9        DispatchEvents();  
10    }  
11}  
12
13
14void CEpollReactor::DispatchIOs()  
15{  
16    DWORD dwSelectTimeOut = SR_DEFAULT_EPOLL_TIMEOUT;  
17    if (HandleOtherTask())  
18    {  
19        dwSelectTimeOut = 0;  
20    }  
21
22    struct epoll_event ev;  
23    CEventHandlerIdMap::iterator itor = m_mapEventHandlerId.begin();  
24    for(; itor!=m_mapEventHandlerId.end(); itor++)  
25    {  
26        CEventHandler *pEventHandler = (CEventHandler *)(*itor).first;  
27        if(pEventHandler == NULL){  
28            continue;  
29        }  
30        ev.data.ptr = pEventHandler;  
31        ev.events = 0;  
32        int nReadID, nWriteID;  
33        pEventHandler->GetIds(&nReadID, &nWriteID);    
34        if (nReadID > 0)  
35        {  
36            ev.events |= EPOLLIN;  
37        }  
38        if (nWriteID > 0)  
39        {  
40            ev.events |= EPOLLOUT;  
41        }  
42
43        epoll_ctl(m_fdEpoll, EPOLL_CTL_MOD, (*itor).second, &ev);  
44    }  
45
46    struct epoll_event events[EPOLL_MAX_EVENTS];  
47
48    int nfds = epoll_wait(m_fdEpoll, events, 
49                          EPOLL_MAX_EVENTS, 
50                          dwSelectTimeOut/1000);  
51
52    for (int i=0; i<nfds; i++)  
53    {  
54        struct epoll_event &evref = events[i];  
55        CEventHandler *pEventHandler = (CEventHandler *)evref.data.ptr;  
56        if ((evref.events|EPOLLIN)!=0 && 
57             m_mapEventHandlerId.find(pEventHandler)!=m_mapEventHandlerId.end())  
58        {  
59            pEventHandler->HandleInput();  
60        }  
61        if ((evref.events|EPOLLOUT)!=0 &&
62             m_mapEventHandlerId.find(pEventHandler)!=m_mapEventHandlerId.end())  
63        {  
64            pEventHandler->HandleOutput();  
65        }  
66    }     
67}  
68
69
70void CEventDispatcher::DispatchEvents()  
71{  
72    CEvent event;  
73    CSyncEvent *pSyncEvent;  
74    while(m_queueEvent.PeekEvent(event))  
75    {  
76        int nRetval;  
77
78        if(event.pEventHandler != NULL)  
79        {  
80            nRetval = event.pEventHandler->HandleEvent(event.nEventID,
81                                                       event.dwParam,
82                                                       event.pParam);  
83        }  
84        else  
85        {  
86            nRetval = HandleEvent(event.nEventID,
87                                  event.dwParam, 
88                                  event.pParam);  
89        }  
90
91        if(event.pAdd != NULL)      //同步消息  
92        {  
93            pSyncEvent=(CSyncEvent *)event.pAdd;  
94            pSyncEvent->nRetval = nRetval;  
95            pSyncEvent->sem.UnLock();  
96        }  
97    }  
98}  

再看看蘑菇街开源的TeamTalk的源码(代码下载地址:https://github.com/baloonwj/TeamTalk):

 1void CEventDispatch::StartDispatch(uint32_t wait_timeout)  
 2{  
 3    fd_set read_set, write_set, excep_set;  
 4    timeval timeout;  
 5    timeout.tv_sec = 0;
 6    // 10 millisecond    
 7    timeout.tv_usec = wait_timeout * 1000;
 8
 9    if(running)  
10        return;  
11    running = true;  
12
13    while (running)  
14   {  
15        _CheckTimer();  
16        _CheckLoop();  
17
18        if (!m_read_set.fd_count && 
19            !m_write_set.fd_count &&
20            !m_excep_set.fd_count)  
21        {  
22            Sleep(MIN_TIMER_DURATION);  
23            continue;  
24        }  
25
26        m_lock.lock();  
27        memcpy(&read_set, &m_read_set, 
28               sizeof(fd_set));  
29        memcpy(&write_set, &m_write_set, 
30               sizeof(fd_set));  
31        memcpy(&excep_set, &m_excep_set, 
32               sizeof(fd_set));  
33        m_lock.unlock();  
34
35        int nfds = select(0, &read_set, 
36                          &write_set,
37                          &excep_set,
38                          &timeout);  
39
40        if (nfds == SOCKET_ERROR)  
41        {  
42            log("select failed, error code: %d", GetLastError());  
43            Sleep(MIN_TIMER_DURATION);  
44            continue;           // select again  
45        }  
46
47        if (nfds == 0)  
48        {  
49            continue;  
50        }  
51
52        for (u_int i = 0; i < read_set.fd_count; i++)  
53        {  
54            //log("select return read count=%d\n", read_set.fd_count);  
55            SOCKET fd = read_set.fd_array[i];  
56            CBaseSocket* pSocket = FindBaseSocket((net_handle_t)fd);  
57            if (pSocket)  
58            {  
59                pSocket->OnRead();  
60                pSocket->ReleaseRef();  
61            }  
62        }  
63
64        for (u_int i = 0; i < write_set.fd_count; i++)  
65        {  
66            //log("select return write count=%d\n", write_set.fd_count);  
67            SOCKET fd = write_set.fd_array[i];  
68            CBaseSocket* pSocket = FindBaseSocket((net_handle_t)fd);  
69            if (pSocket)  
70            {  
71                pSocket->OnWrite();  
72                pSocket->ReleaseRef();  
73            }  
74        }  
75
76        for (u_int i = 0; i < excep_set.fd_count; i++)  
77        {  
78            //log("select return exception count=%d\n", excep_set.fd_count);  
79            SOCKET fd = excep_set.fd_array[i];  
80            CBaseSocket* pSocket = FindBaseSocket((net_handle_t)fd);  
81            if (pSocket)  
82            {  
83                pSocket->OnClose();  
84                pSocket->ReleaseRef();  
85            }  
86        }  
87
88    }  
89}

系列目录

第01篇 主线程与工作线程的分工

第02篇 Reactor模式

第03篇 一个服务器程序的架构介绍

第04篇 如何将socket设置为非阻塞模式

第05篇 如何编写高性能日志

第06篇 关于网络编程的一些实用技巧和细节

第07篇 开源一款即时通讯软件的源码

第08篇 高性能服务器架构设计总结1

第09篇 高性能服务器架构设计总结2

第10篇 高性能服务器架构设计总结3

第11篇 高性能服务器架构设计总结4

本文分享自微信公众号 - 高性能服务器开发(easyserverdev),作者:张小方

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2018-04-24

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • (八)高性能服务器架构设计总结2——以flamigo服务器代码为例

    说了这么多,我们来以flamingo的服务器程序的网络框架设计为例来验证上述介绍的理论。flamingo的网络框架是基于陈硕的muduo库,改成C++11的版本...

    范蠡
  • epoll LT 模式和 ET 模式详解(文末赠书)

    与 poll 的事件宏相比,epoll 新增了一个事件宏 EPOLLET,这就是所谓的边缘触发模式(Edge Trigger,ET),而默认的模式我们称为 水平...

    范蠡
  • libevent源码深度剖析(六) 初见事件处理框架

    (1)libevent源码深度剖析一 序 (2)libevent源码深度剖析二 Reactor模式 (3)libevent源码深度剖析三 libevent基...

    范蠡
  • (八)高性能服务器架构设计总结2——以flamigo服务器代码为例

    说了这么多,我们来以flamingo的服务器程序的网络框架设计为例来验证上述介绍的理论。flamingo的网络框架是基于陈硕的muduo库,改成C++11的版本...

    范蠡
  • ISO C forbids comparison between pointer and integer [-fpermissive]

    异常:ISO C forbids comparison between pointer and integer [-fpermissive] 意思是:指针和...

    公众号-不为谁写的歌
  • Excel大批量数据的导入和导出,如何做优化?

    Java对Excel的操作一般都是用POI,但是数据量大的话可能会导致频繁的FGC或OOM,这篇文章跟大家说下如果避免踩POI的坑,以及分别对于xls和xlsx...

    趣学程序-shaofeer
  • Sphinx源码学习笔记(一):索引创建

      因为项目开发需要在游戏内部实现玩家名称的模糊查找功能,本身直接使用Sphinx配置mysql可以直接搭建一套模糊匹配的即可支持功能的实现。

    dylan_若水
  • leetcode: explore-strings-33 反转字符串

    1、输入:一个整数,可正可负可为0 2、输出:输入值反转,但不包括符号,正数的前置0要省略 3、注意:输入与输出的数值范围都是[−2^31, 2^31 − 1...

    用户7685359
  • 仿淘宝、京东拖拽商品详情(可嵌套ViewPager、ListView、WebView、FragmentTabhost)实现效果图实现

    看书的小蜗牛
  • JS实现运算符重载

    IT故事会

扫码关注云+社区

领取腾讯云代金券