前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >(八)高性能服务器架构设计总结2——以flamigo服务器代码为例

(八)高性能服务器架构设计总结2——以flamigo服务器代码为例

作者头像
范蠡
发布2018-07-25 16:05:48
9360
发布2018-07-25 16:05:48
举报

系列目录

第01篇 主线程与工作线程的分工

第02篇 Reactor模式

第03篇 一个服务器程序的架构介绍

第04篇 如何将socket设置为非阻塞模式

第05篇 如何编写高性能日志

第06篇 关于网络编程的一些实用技巧和细节

第07篇 开源一款即时通讯软件的源码

第08篇 高性能服务器架构设计总结1

第09篇 高性能服务器架构设计总结2

第10篇 高性能服务器架构设计总结3

第11篇 高性能服务器架构设计总结4

说了这么多,我们来以flamingo的服务器程序的网络框架设计为例来验证上述介绍的理论。flamingo的网络框架是基于陈硕的muduo库,改成C++11的版本,并修改了一些bug。在此感谢原作者陈硕。flamingo的源码可以在这里下载:https://github.com/baloonwj/flamingo,打不开github的可以移步csdn:http://download.csdn.net/detail/analogous_love/9805797。

上文介绍的核心线程函数的while循环位于eventloop.cpp中:

代码语言:javascript
复制
 1void EventLoop::loop()  
 2{  
 3    assert(!looping_);  
 4    assertInLoopThread();  
 5    looping_ = true;  
 6    quit_ = false;
 7  // FIXME: what if someone calls quit() before loop() ?  
 8    LOG_TRACE << "EventLoop " << this << " start looping";  
 9
10    while (!quit_)  
11    {  
12        activeChannels_.clear();  
13        pollReturnTime_ = poller_->poll(kPollTimeMs,
14                                        &activeChannels_);  
15        ++iteration_;  
16        if (Logger::logLevel() <= Logger::TRACE)  
17        {  
18            printActiveChannels();  
19        }  
20        // TODO sort channel by priority  
21        eventHandling_ = true;  
22        for (ChannelList::iterator it = activeChannels_.begin();  
23            it != activeChannels_.end(); ++it)  
24        {  
25            currentActiveChannel_ = *it;  
26            currentActiveChannel_->handleEvent(pollReturnTime_);  
27        }  
28        currentActiveChannel_ = NULL;  
29        eventHandling_ = false;  
30        doPendingFunctors();  
31
32        if (frameFunctor_)  
33        {  
34            frameFunctor_();  
35        }         
36    }  
37
38    LOG_TRACE << "EventLoop " << this << " stop looping";  
39    looping_ = false;  
40} 

poller_->poll利用epoll分离网络事件,然后接着处理分离出来的网络事件,每一个客户端socket对应一个连接,即一个TcpConnection和Channel通道对象。currentActiveChannel_->handleEvent(pollReturnTime_)根据是可读、可写、出错事件来调用对应的处理函数,这些函数都是回调函数,程序初始化阶段设置进来的:

代码语言:javascript
复制
 1void Channel::handleEvent(Timestamp receiveTime)  
 2{  
 3    std::shared_ptr<void> guard;  
 4    if (tied_)  
 5    {  
 6        guard = tie_.lock();  
 7        if (guard)  
 8        {  
 9            handleEventWithGuard(receiveTime);  
10        }  
11    }  
12    else  
13    {  
14        handleEventWithGuard(receiveTime);  
15    }  
16}  
17
18void Channel::handleEventWithGuard(Timestamp receiveTime)  
19{  
20    eventHandling_ = true;  
21    LOG_TRACE << reventsToString();  
22    if ((revents_ & POLLHUP) && !(revents_ & POLLIN))  
23    {  
24        if (logHup_)  
25        {  
26            LOG_WARN << "Channel::handle_event() POLLHUP";  
27        }  
28        if (closeCallback_) closeCallback_();  
29    }  
30
31    if (revents_ & POLLNVAL)  
32    {  
33        LOG_WARN << "Channel::handle_event() POLLNVAL";  
34    }  
35
36    if (revents_ & (POLLERR | POLLNVAL))  
37    {  
38        if (errorCallback_) errorCallback_();  
39    }  
40    if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))  
41    {  
42        //当是侦听socket时,readCallback_指向Acceptor::handleRead  
43        //当是客户端socket时,调用TcpConnection::handleRead   
44        if (readCallback_) readCallback_(receiveTime);  
45    }  
46    if (revents_ & POLLOUT)  
47    {  
48        //如果是连接状态服的socket,
49        //则writeCallback_指向Connector::handleWrite()  
50        if (writeCallback_) writeCallback_();  
51    }  
52    eventHandling_ = false;  
53}  

当然,这里利用了Channel对象的“多态性”

  • 如果是普通socket,可读事件就会调用预先设置的回调函数;
  • 但是如果是侦听socket,则调用Aceptor对象的handleRead()来接收新连接:
代码语言:javascript
复制
 1void Acceptor::handleRead()  
 2{  
 3    loop_->assertInLoopThread();  
 4    InetAddress peerAddr;  
 5    //FIXME loop until no more  
 6    int connfd = acceptSocket_.accept(&peerAddr);  
 7    if (connfd >= 0)  
 8    {  
 9        // string hostport = peerAddr.toIpPort();  
10        // LOG_TRACE << "Accepts of " << hostport;  
11        //newConnectionCallback_实际指向:
12        //TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)  
13        if (newConnectionCallback_)  
14        {  
15            newConnectionCallback_(connfd, peerAddr);  
16        }  
17        else  
18        {  
19            sockets::close(connfd);  
20        }  
21    }  
22    else  
23    {  
24        LOG_SYSERR << "in Acceptor::handleRead";  
25        // Read the section named "The special problem of  
26        // accept()ing when you can't" in libev's doc.  
27        // By Marc Lehmann, author of livev.  
28        if (errno == EMFILE)  
29        {  
30            ::close(idleFd_);  
31            idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL);  
32            ::close(idleFd_);  
33            idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC);  
34        }  
35    }  
36}

主循环里面的业务逻辑处理对应:

代码语言:javascript
复制
 1doPendingFunctors();  
 2
 3if (frameFunctor_)  
 4{  
 5   frameFunctor_();  
 6}         
 7
 8
 9void EventLoop::doPendingFunctors()  
10{  
11    std::vector<Functor> functors;  
12    callingPendingFunctors_ = true;  
13
14    {  
15        std::unique_lock<std::mutex> lock(mutex_);  
16        functors.swap(pendingFunctors_);  
17    }  
18
19    for (size_t i = 0; i < functors.size(); ++i)  
20    {  
21        functors[i]();  
22    }  
23    callingPendingFunctors_ = false;  
24}  

这里增加业务逻辑是增加执行任务的函数指针的,增加的任务保存在成员变量pendingFunctors_中,这个变量是一个函数指针数组(vector对象),执行的时候,调用每个函数就可以了。上面的代码先利用一个栈变量将成员变量pendingFunctors_里面的函数指针换过来,接下来对这个栈变量进行操作就可以了,这样减少了锁的粒度。

因为成员变量pendingFunctors_在增加任务的时候,也会被用到,设计到多个线程操作,所以要加锁,增加任务的地方是:

代码语言:javascript
复制
 1void EventLoop::queueInLoop(const Functor& cb)  
 2{  
 3    {  
 4        std::unique_lock<std::mutex> lock(mutex_);  
 5        pendingFunctors_.push_back(cb);  
 6    }  
 7
 8    if (!isInLoopThread() || callingPendingFunctors_)  
 9    {  
10        wakeup();  
11    }  
12}

frameFunctor_就更简单了,就是通过设置一个函数指针就可以了。当然这里有个技巧性的东西,即增加任务的时候,为了能够立即执行,使用唤醒机制,通过往一个fd里面写入简单的几个字节,来唤醒epoll,使其立刻返回,因为此时没有其它的socke有事件,这样接下来就执行刚才添加的任务了。

我们看一下数据收取的逻辑:

代码语言:javascript
复制
 1void TcpConnection::handleRead(Timestamp receiveTime)  
 2{  
 3    loop_->assertInLoopThread();  
 4    int savedErrno = 0;  
 5    ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);  
 6    if (n > 0)  
 7    {  
 8        /*messageCallback_指向
 9       CTcpSession::OnRead(const std::shared_ptr<TcpConnection>& conn,
10                           Buffer* pBuffer, 
11                           Timestamp receiveTime)*/  
12        messageCallback_(shared_from_this(),
13                         &inputBuffer_, receiveTime);  
14    }  
15    else if (n == 0)  
16    {  
17        handleClose();  
18    }  
19    else  
20    {  
21        errno = savedErrno;  
22        LOG_SYSERR << "TcpConnection::handleRead";  
23        handleError();  
24    }  
25} 

将收到的数据放到接收缓冲区里面,将来我们来解包:

代码语言:javascript
复制
 1void ClientSession::OnRead(const std::shared_ptr<TcpConnection>& conn, Buffer* pBuffer, Timestamp receivTime)  
 2{  
 3    while (true)  
 4    {  
 5        //不够一个包头大小  
 6        if (pBuffer->readableBytes() < (size_t)sizeof(msg))  
 7        {  
 8            LOG_INFO << "buffer is not enough for a package header, pBuffer->readableBytes()=" << pBuffer->readableBytes() << ", sizeof(msg)=" << sizeof(msg);  
 9            return;  
10        }  
11
12        //不够一个整包大小  
13        msg header;  
14        memcpy(&header, pBuffer->peek(), sizeof(msg));  
15        if (pBuffer->readableBytes() < (size_t)header.packagesize + sizeof(msg))  
16            return;  
17
18        pBuffer->retrieve(sizeof(msg));  
19        std::string inbuf;  
20        inbuf.append(pBuffer->peek(), header.packagesize);  
21        pBuffer->retrieve(header.packagesize);  
22        if (!Process(conn, inbuf.c_str(), inbuf.length()))  
23        {  
24            LOG_WARN << "Process error, close TcpConnection";  
25            conn->forceClose();  
26        }  
27    }// end while-loop  
28
29}  

判断接收缓冲区里面的数据是否够一个包头大小,如果够再判断够不够包头指定的包体大小,如果还是够的话,接着在Process函数里面处理该包。

再看看发送数据的逻辑:

代码语言:javascript
复制
 1void TcpConnection::sendInLoop(const void* data, 
 2                               size_t len)  
 3{  
 4    loop_->assertInLoopThread();  
 5    ssize_t nwrote = 0;  
 6    size_t remaining = len;  
 7    bool faultError = false;  
 8    if (state_ == kDisconnected)  
 9    {  
10        LOG_WARN << "disconnected, give up writing";  
11        return;  
12    }  
13    // if no thing in output queue, try writing directly  
14    if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0)  
15    {  
16        nwrote = sockets::write(channel_->fd(), data, len);  
17        if (nwrote >= 0)  
18        {  
19            remaining = len - nwrote;  
20            if (remaining == 0 && writeCompleteCallback_)  
21            {  
22                loop_->queueInLoop(std::bind(writeCompleteCallback_, 
23                                             shared_from_this()));  
24            }  
25        }  
26        else // nwrote < 0  
27        {  
28            nwrote = 0;  
29            if (errno != EWOULDBLOCK)  
30            {  
31                LOG_SYSERR << "TcpConnection::sendInLoop";
32                // FIXME: any others?   
33                if (errno == EPIPE || errno == ECONNRESET)  
34                {  
35                    faultError = true;  
36                }  
37            }  
38        }  
39    }  
40
41    assert(remaining <= len);  
42    if (!faultError && remaining > 0)  
43    {  
44        size_t oldLen = outputBuffer_.readableBytes();  
45        if (oldLen + remaining >= highWaterMark_                          
46            && oldLen < highWaterMark_  
47            && highWaterMarkCallback_)  
48        {  
49            loop_->queueInLoop(std::bind(highWaterMarkCallback_,
50                                         shared_from_this(),
51                                         oldLen + remaining));  
52        }  
53        outputBuffer_.append(static_cast<const char*>(data)+nwrote,
54                             remaining);  
55        if (!channel_->isWriting())  
56        {  
57            channel_->enableWriting();  
58        }  
59    }  
60} 

如果剩余的数据remaining大于则调用channel_->enableWriting();开始监听可写事件,可写事件处理如下:

代码语言:javascript
复制
 1void TcpConnection::handleWrite()  
 2{  
 3    loop_->assertInLoopThread();  
 4    if (channel_->isWriting())  
 5    {  
 6        ssize_t n = sockets::write(channel_->fd(),  
 7            outputBuffer_.peek(),  
 8            outputBuffer_.readableBytes());  
 9        if (n > 0)  
10        {  
11            outputBuffer_.retrieve(n);  
12            if (outputBuffer_.readableBytes() == 0)  
13            {  
14                channel_->disableWriting();  
15                if (writeCompleteCallback_)  
16                {  
17                    loop_->queueInLoop(std::bind(writeCompleteCallback_, 
18                                       shared_from_this()));  
19                }   
20                if (state_ == kDisconnecting)  
21                {  
22                    shutdownInLoop();  
23                }  
24            }  
25        }  
26        else  
27        {  
28            LOG_SYSERR << "TcpConnection::handleWrite";  
29            // if (state_ == kDisconnecting)  
30            // {  
31            //   shutdownInLoop();  
32            // }  
33        }  
34    }  
35    else  
36    {  
37        LOG_TRACE << "Connection fd = " << channel_->fd()  
38            << " is down, no more writing";  
39    }  
40}

如果发送完数据以后调用channel_->disableWriting();移除监听可写事件。

很多读者可能一直想问,文中不是说解包数据并处理逻辑是业务代码而非网络通信的代码,你这里貌似都混在一起了,其实没有,这里实际的业务代码处理都是框架曾提供的回调函数里面处理的,具体怎么处理,由框架使用者——业务层自己定义。

总结起来,实际上就是一个线程函数里一个loop那么点事情,不信你再看我曾经工作上的一个交易系统项目代码:

代码语言:javascript
复制
 1void CEventDispatcher::Run()  
 2{  
 3    m_bShouldRun = true;  
 4    while(m_bShouldRun)  
 5    {  
 6        DispatchIOs();        
 7        SyncTime();  
 8        CheckTimer();  
 9        DispatchEvents();  
10    }  
11}  
12
13
14void CEpollReactor::DispatchIOs()  
15{  
16    DWORD dwSelectTimeOut = SR_DEFAULT_EPOLL_TIMEOUT;  
17    if (HandleOtherTask())  
18    {  
19        dwSelectTimeOut = 0;  
20    }  
21
22    struct epoll_event ev;  
23    CEventHandlerIdMap::iterator itor = m_mapEventHandlerId.begin();  
24    for(; itor!=m_mapEventHandlerId.end(); itor++)  
25    {  
26        CEventHandler *pEventHandler = (CEventHandler *)(*itor).first;  
27        if(pEventHandler == NULL){  
28            continue;  
29        }  
30        ev.data.ptr = pEventHandler;  
31        ev.events = 0;  
32        int nReadID, nWriteID;  
33        pEventHandler->GetIds(&nReadID, &nWriteID);    
34        if (nReadID > 0)  
35        {  
36            ev.events |= EPOLLIN;  
37        }  
38        if (nWriteID > 0)  
39        {  
40            ev.events |= EPOLLOUT;  
41        }  
42
43        epoll_ctl(m_fdEpoll, EPOLL_CTL_MOD, (*itor).second, &ev);  
44    }  
45
46    struct epoll_event events[EPOLL_MAX_EVENTS];  
47
48    int nfds = epoll_wait(m_fdEpoll, events, 
49                          EPOLL_MAX_EVENTS, 
50                          dwSelectTimeOut/1000);  
51
52    for (int i=0; i<nfds; i++)  
53    {  
54        struct epoll_event &evref = events[i];  
55        CEventHandler *pEventHandler = (CEventHandler *)evref.data.ptr;  
56        if ((evref.events|EPOLLIN)!=0 && 
57             m_mapEventHandlerId.find(pEventHandler)!=m_mapEventHandlerId.end())  
58        {  
59            pEventHandler->HandleInput();  
60        }  
61        if ((evref.events|EPOLLOUT)!=0 &&
62             m_mapEventHandlerId.find(pEventHandler)!=m_mapEventHandlerId.end())  
63        {  
64            pEventHandler->HandleOutput();  
65        }  
66    }     
67}  
68
69
70void CEventDispatcher::DispatchEvents()  
71{  
72    CEvent event;  
73    CSyncEvent *pSyncEvent;  
74    while(m_queueEvent.PeekEvent(event))  
75    {  
76        int nRetval;  
77
78        if(event.pEventHandler != NULL)  
79        {  
80            nRetval = event.pEventHandler->HandleEvent(event.nEventID,
81                                                       event.dwParam,
82                                                       event.pParam);  
83        }  
84        else  
85        {  
86            nRetval = HandleEvent(event.nEventID,
87                                  event.dwParam, 
88                                  event.pParam);  
89        }  
90
91        if(event.pAdd != NULL)      //同步消息  
92        {  
93            pSyncEvent=(CSyncEvent *)event.pAdd;  
94            pSyncEvent->nRetval = nRetval;  
95            pSyncEvent->sem.UnLock();  
96        }  
97    }  
98}  

再看看蘑菇街开源的TeamTalk的源码(代码下载地址:https://github.com/baloonwj/TeamTalk):

代码语言:javascript
复制
 1void CEventDispatch::StartDispatch(uint32_t wait_timeout)  
 2{  
 3    fd_set read_set, write_set, excep_set;  
 4    timeval timeout;  
 5    timeout.tv_sec = 0;
 6    // 10 millisecond    
 7    timeout.tv_usec = wait_timeout * 1000;
 8
 9    if(running)  
10        return;  
11    running = true;  
12
13    while (running)  
14   {  
15        _CheckTimer();  
16        _CheckLoop();  
17
18        if (!m_read_set.fd_count && 
19            !m_write_set.fd_count &&
20            !m_excep_set.fd_count)  
21        {  
22            Sleep(MIN_TIMER_DURATION);  
23            continue;  
24        }  
25
26        m_lock.lock();  
27        memcpy(&read_set, &m_read_set, 
28               sizeof(fd_set));  
29        memcpy(&write_set, &m_write_set, 
30               sizeof(fd_set));  
31        memcpy(&excep_set, &m_excep_set, 
32               sizeof(fd_set));  
33        m_lock.unlock();  
34
35        int nfds = select(0, &read_set, 
36                          &write_set,
37                          &excep_set,
38                          &timeout);  
39
40        if (nfds == SOCKET_ERROR)  
41        {  
42            log("select failed, error code: %d", GetLastError());  
43            Sleep(MIN_TIMER_DURATION);  
44            continue;           // select again  
45        }  
46
47        if (nfds == 0)  
48        {  
49            continue;  
50        }  
51
52        for (u_int i = 0; i < read_set.fd_count; i++)  
53        {  
54            //log("select return read count=%d\n", read_set.fd_count);  
55            SOCKET fd = read_set.fd_array[i];  
56            CBaseSocket* pSocket = FindBaseSocket((net_handle_t)fd);  
57            if (pSocket)  
58            {  
59                pSocket->OnRead();  
60                pSocket->ReleaseRef();  
61            }  
62        }  
63
64        for (u_int i = 0; i < write_set.fd_count; i++)  
65        {  
66            //log("select return write count=%d\n", write_set.fd_count);  
67            SOCKET fd = write_set.fd_array[i];  
68            CBaseSocket* pSocket = FindBaseSocket((net_handle_t)fd);  
69            if (pSocket)  
70            {  
71                pSocket->OnWrite();  
72                pSocket->ReleaseRef();  
73            }  
74        }  
75
76        for (u_int i = 0; i < excep_set.fd_count; i++)  
77        {  
78            //log("select return exception count=%d\n", excep_set.fd_count);  
79            SOCKET fd = excep_set.fd_array[i];  
80            CBaseSocket* pSocket = FindBaseSocket((net_handle_t)fd);  
81            if (pSocket)  
82            {  
83                pSocket->OnClose();  
84                pSocket->ReleaseRef();  
85            }  
86        }  
87
88    }  
89}

系列目录

第01篇 主线程与工作线程的分工

第02篇 Reactor模式

第03篇 一个服务器程序的架构介绍

第04篇 如何将socket设置为非阻塞模式

第05篇 如何编写高性能日志

第06篇 关于网络编程的一些实用技巧和细节

第07篇 开源一款即时通讯软件的源码

第08篇 高性能服务器架构设计总结1

第09篇 高性能服务器架构设计总结2

第10篇 高性能服务器架构设计总结3

第11篇 高性能服务器架构设计总结4

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-04-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 高性能服务器开发 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
即时通信 IM
即时通信 IM(Instant Messaging)基于腾讯二十余年的 IM 技术积累,支持Android、iOS、Mac、Windows、Web、H5、小程序平台且跨终端互通,低代码 UI 组件助您30分钟集成单聊、群聊、关系链、消息漫游、群组管理、资料管理、直播弹幕和内容审核等能力。适用于直播互动、电商带货、客服咨询、社交沟通、在线课程、企业办公、互动游戏、医疗健康等场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档