系列目录
第01篇 主线程与工作线程的分工
第02篇 Reactor模式
第03篇 一个服务器程序的架构介绍
第04篇 如何将socket设置为非阻塞模式
第05篇 如何编写高性能日志
第06篇 关于网络编程的一些实用技巧和细节
第07篇 开源一款即时通讯软件的源码
第08篇 高性能服务器架构设计总结1
第09篇 高性能服务器架构设计总结2
第10篇 高性能服务器架构设计总结3
第11篇 高性能服务器架构设计总结4
说了这么多,我们来以flamingo的服务器程序的网络框架设计为例来验证上述介绍的理论。flamingo的网络框架是基于陈硕的muduo库,改成C++11的版本,并修改了一些bug。在此感谢原作者陈硕。flamingo的源码可以在这里下载:https://github.com/baloonwj/flamingo,打不开github的可以移步csdn:http://download.csdn.net/detail/analogous_love/9805797。
上文介绍的核心线程函数的while循环位于eventloop.cpp中:
1void EventLoop::loop() 2{ 3 assert(!looping_); 4 assertInLoopThread(); 5 looping_ = true; 6 quit_ = false; 7 // FIXME: what if someone calls quit() before loop() ? 8 LOG_TRACE << "EventLoop " << this << " start looping"; 9 10 while (!quit_) 11 { 12 activeChannels_.clear(); 13 pollReturnTime_ = poller_->poll(kPollTimeMs, 14 &activeChannels_); 15 ++iteration_; 16 if (Logger::logLevel() <= Logger::TRACE) 17 { 18 printActiveChannels(); 19 } 20 // TODO sort channel by priority 21 eventHandling_ = true; 22 for (ChannelList::iterator it = activeChannels_.begin(); 23 it != activeChannels_.end(); ++it) 24 { 25 currentActiveChannel_ = *it; 26 currentActiveChannel_->handleEvent(pollReturnTime_); 27 } 28 currentActiveChannel_ = NULL; 29 eventHandling_ = false; 30 doPendingFunctors(); 31 32 if (frameFunctor_) 33 { 34 frameFunctor_(); 35 } 36 } 37 38 LOG_TRACE << "EventLoop " << this << " stop looping"; 39 looping_ = false; 40}
poller_->poll利用epoll分离网络事件,然后接着处理分离出来的网络事件,每一个客户端socket对应一个连接,即一个TcpConnection和Channel通道对象。currentActiveChannel_->handleEvent(pollReturnTime_)根据是可读、可写、出错事件来调用对应的处理函数,这些函数都是回调函数,程序初始化阶段设置进来的:
1void Channel::handleEvent(Timestamp receiveTime) 2{ 3 std::shared_ptr<void> guard; 4 if (tied_) 5 { 6 guard = tie_.lock(); 7 if (guard) 8 { 9 handleEventWithGuard(receiveTime); 10 } 11 } 12 else 13 { 14 handleEventWithGuard(receiveTime); 15 } 16} 17 18void Channel::handleEventWithGuard(Timestamp receiveTime) 19{ 20 eventHandling_ = true; 21 LOG_TRACE << reventsToString(); 22 if ((revents_ & POLLHUP) && !(revents_ & POLLIN)) 23 { 24 if (logHup_) 25 { 26 LOG_WARN << "Channel::handle_event() POLLHUP"; 27 } 28 if (closeCallback_) closeCallback_(); 29 } 30 31 if (revents_ & POLLNVAL) 32 { 33 LOG_WARN << "Channel::handle_event() POLLNVAL"; 34 } 35 36 if (revents_ & (POLLERR | POLLNVAL)) 37 { 38 if (errorCallback_) errorCallback_(); 39 } 40 if (revents_ & (POLLIN | POLLPRI | POLLRDHUP)) 41 { 42 //当是侦听socket时,readCallback_指向Acceptor::handleRead 43 //当是客户端socket时,调用TcpConnection::handleRead 44 if (readCallback_) readCallback_(receiveTime); 45 } 46 if (revents_ & POLLOUT) 47 { 48 //如果是连接状态服的socket, 49 //则writeCallback_指向Connector::handleWrite() 50 if (writeCallback_) writeCallback_(); 51 } 52 eventHandling_ = false; 53}
当然,这里利用了Channel对象的“多态性”
1void Acceptor::handleRead() 2{ 3 loop_->assertInLoopThread(); 4 InetAddress peerAddr; 5 //FIXME loop until no more 6 int connfd = acceptSocket_.accept(&peerAddr); 7 if (connfd >= 0) 8 { 9 // string hostport = peerAddr.toIpPort(); 10 // LOG_TRACE << "Accepts of " << hostport; 11 //newConnectionCallback_实际指向: 12 //TcpServer::newConnection(int sockfd, const InetAddress& peerAddr) 13 if (newConnectionCallback_) 14 { 15 newConnectionCallback_(connfd, peerAddr); 16 } 17 else 18 { 19 sockets::close(connfd); 20 } 21 } 22 else 23 { 24 LOG_SYSERR << "in Acceptor::handleRead"; 25 // Read the section named "The special problem of 26 // accept()ing when you can't" in libev's doc. 27 // By Marc Lehmann, author of livev. 28 if (errno == EMFILE) 29 { 30 ::close(idleFd_); 31 idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL); 32 ::close(idleFd_); 33 idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC); 34 } 35 } 36}
主循环里面的业务逻辑处理对应:
1doPendingFunctors(); 2 3if (frameFunctor_) 4{ 5 frameFunctor_(); 6} 7 8 9void EventLoop::doPendingFunctors() 10{ 11 std::vector<Functor> functors; 12 callingPendingFunctors_ = true; 13 14 { 15 std::unique_lock<std::mutex> lock(mutex_); 16 functors.swap(pendingFunctors_); 17 } 18 19 for (size_t i = 0; i < functors.size(); ++i) 20 { 21 functors[i](); 22 } 23 callingPendingFunctors_ = false; 24}
这里增加业务逻辑是增加执行任务的函数指针的,增加的任务保存在成员变量pendingFunctors_中,这个变量是一个函数指针数组(vector对象),执行的时候,调用每个函数就可以了。上面的代码先利用一个栈变量将成员变量pendingFunctors_里面的函数指针换过来,接下来对这个栈变量进行操作就可以了,这样减少了锁的粒度。
因为成员变量pendingFunctors_在增加任务的时候,也会被用到,设计到多个线程操作,所以要加锁,增加任务的地方是:
1void EventLoop::queueInLoop(const Functor& cb) 2{ 3 { 4 std::unique_lock<std::mutex> lock(mutex_); 5 pendingFunctors_.push_back(cb); 6 } 7 8 if (!isInLoopThread() || callingPendingFunctors_) 9 { 10 wakeup(); 11 } 12}
而frameFunctor_就更简单了,就是通过设置一个函数指针就可以了。当然这里有个技巧性的东西,即增加任务的时候,为了能够立即执行,使用唤醒机制,通过往一个fd里面写入简单的几个字节,来唤醒epoll,使其立刻返回,因为此时没有其它的socke有事件,这样接下来就执行刚才添加的任务了。
我们看一下数据收取的逻辑:
1void TcpConnection::handleRead(Timestamp receiveTime) 2{ 3 loop_->assertInLoopThread(); 4 int savedErrno = 0; 5 ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno); 6 if (n > 0) 7 { 8 /*messageCallback_指向 9 CTcpSession::OnRead(const std::shared_ptr<TcpConnection>& conn, 10 Buffer* pBuffer, 11 Timestamp receiveTime)*/ 12 messageCallback_(shared_from_this(), 13 &inputBuffer_, receiveTime); 14 } 15 else if (n == 0) 16 { 17 handleClose(); 18 } 19 else 20 { 21 errno = savedErrno; 22 LOG_SYSERR << "TcpConnection::handleRead"; 23 handleError(); 24 } 25}
将收到的数据放到接收缓冲区里面,将来我们来解包:
1void ClientSession::OnRead(const std::shared_ptr<TcpConnection>& conn, Buffer* pBuffer, Timestamp receivTime) 2{ 3 while (true) 4 { 5 //不够一个包头大小 6 if (pBuffer->readableBytes() < (size_t)sizeof(msg)) 7 { 8 LOG_INFO << "buffer is not enough for a package header, pBuffer->readableBytes()=" << pBuffer->readableBytes() << ", sizeof(msg)=" << sizeof(msg); 9 return; 10 } 11 12 //不够一个整包大小 13 msg header; 14 memcpy(&header, pBuffer->peek(), sizeof(msg)); 15 if (pBuffer->readableBytes() < (size_t)header.packagesize + sizeof(msg)) 16 return; 17 18 pBuffer->retrieve(sizeof(msg)); 19 std::string inbuf; 20 inbuf.append(pBuffer->peek(), header.packagesize); 21 pBuffer->retrieve(header.packagesize); 22 if (!Process(conn, inbuf.c_str(), inbuf.length())) 23 { 24 LOG_WARN << "Process error, close TcpConnection"; 25 conn->forceClose(); 26 } 27 }// end while-loop 28 29}
先判断接收缓冲区里面的数据是否够一个包头大小,如果够再判断够不够包头指定的包体大小,如果还是够的话,接着在Process函数里面处理该包。
再看看发送数据的逻辑:
1void TcpConnection::sendInLoop(const void* data, 2 size_t len) 3{ 4 loop_->assertInLoopThread(); 5 ssize_t nwrote = 0; 6 size_t remaining = len; 7 bool faultError = false; 8 if (state_ == kDisconnected) 9 { 10 LOG_WARN << "disconnected, give up writing"; 11 return; 12 } 13 // if no thing in output queue, try writing directly 14 if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) 15 { 16 nwrote = sockets::write(channel_->fd(), data, len); 17 if (nwrote >= 0) 18 { 19 remaining = len - nwrote; 20 if (remaining == 0 && writeCompleteCallback_) 21 { 22 loop_->queueInLoop(std::bind(writeCompleteCallback_, 23 shared_from_this())); 24 } 25 } 26 else // nwrote < 0 27 { 28 nwrote = 0; 29 if (errno != EWOULDBLOCK) 30 { 31 LOG_SYSERR << "TcpConnection::sendInLoop"; 32 // FIXME: any others? 33 if (errno == EPIPE || errno == ECONNRESET) 34 { 35 faultError = true; 36 } 37 } 38 } 39 } 40 41 assert(remaining <= len); 42 if (!faultError && remaining > 0) 43 { 44 size_t oldLen = outputBuffer_.readableBytes(); 45 if (oldLen + remaining >= highWaterMark_ 46 && oldLen < highWaterMark_ 47 && highWaterMarkCallback_) 48 { 49 loop_->queueInLoop(std::bind(highWaterMarkCallback_, 50 shared_from_this(), 51 oldLen + remaining)); 52 } 53 outputBuffer_.append(static_cast<const char*>(data)+nwrote, 54 remaining); 55 if (!channel_->isWriting()) 56 { 57 channel_->enableWriting(); 58 } 59 } 60}
如果剩余的数据remaining大于则调用channel_->enableWriting();开始监听可写事件,可写事件处理如下:
1void TcpConnection::handleWrite() 2{ 3 loop_->assertInLoopThread(); 4 if (channel_->isWriting()) 5 { 6 ssize_t n = sockets::write(channel_->fd(), 7 outputBuffer_.peek(), 8 outputBuffer_.readableBytes()); 9 if (n > 0) 10 { 11 outputBuffer_.retrieve(n); 12 if (outputBuffer_.readableBytes() == 0) 13 { 14 channel_->disableWriting(); 15 if (writeCompleteCallback_) 16 { 17 loop_->queueInLoop(std::bind(writeCompleteCallback_, 18 shared_from_this())); 19 } 20 if (state_ == kDisconnecting) 21 { 22 shutdownInLoop(); 23 } 24 } 25 } 26 else 27 { 28 LOG_SYSERR << "TcpConnection::handleWrite"; 29 // if (state_ == kDisconnecting) 30 // { 31 // shutdownInLoop(); 32 // } 33 } 34 } 35 else 36 { 37 LOG_TRACE << "Connection fd = " << channel_->fd() 38 << " is down, no more writing"; 39 } 40}
如果发送完数据以后调用channel_->disableWriting();移除监听可写事件。
很多读者可能一直想问,文中不是说解包数据并处理逻辑是业务代码而非网络通信的代码,你这里貌似都混在一起了,其实没有,这里实际的业务代码处理都是框架曾提供的回调函数里面处理的,具体怎么处理,由框架使用者——业务层自己定义。
总结起来,实际上就是一个线程函数里一个loop那么点事情,不信你再看我曾经工作上的一个交易系统项目代码:
1void CEventDispatcher::Run() 2{ 3 m_bShouldRun = true; 4 while(m_bShouldRun) 5 { 6 DispatchIOs(); 7 SyncTime(); 8 CheckTimer(); 9 DispatchEvents(); 10 } 11} 12 13 14void CEpollReactor::DispatchIOs() 15{ 16 DWORD dwSelectTimeOut = SR_DEFAULT_EPOLL_TIMEOUT; 17 if (HandleOtherTask()) 18 { 19 dwSelectTimeOut = 0; 20 } 21 22 struct epoll_event ev; 23 CEventHandlerIdMap::iterator itor = m_mapEventHandlerId.begin(); 24 for(; itor!=m_mapEventHandlerId.end(); itor++) 25 { 26 CEventHandler *pEventHandler = (CEventHandler *)(*itor).first; 27 if(pEventHandler == NULL){ 28 continue; 29 } 30 ev.data.ptr = pEventHandler; 31 ev.events = 0; 32 int nReadID, nWriteID; 33 pEventHandler->GetIds(&nReadID, &nWriteID); 34 if (nReadID > 0) 35 { 36 ev.events |= EPOLLIN; 37 } 38 if (nWriteID > 0) 39 { 40 ev.events |= EPOLLOUT; 41 } 42 43 epoll_ctl(m_fdEpoll, EPOLL_CTL_MOD, (*itor).second, &ev); 44 } 45 46 struct epoll_event events[EPOLL_MAX_EVENTS]; 47 48 int nfds = epoll_wait(m_fdEpoll, events, 49 EPOLL_MAX_EVENTS, 50 dwSelectTimeOut/1000); 51 52 for (int i=0; i<nfds; i++) 53 { 54 struct epoll_event &evref = events[i]; 55 CEventHandler *pEventHandler = (CEventHandler *)evref.data.ptr; 56 if ((evref.events|EPOLLIN)!=0 && 57 m_mapEventHandlerId.find(pEventHandler)!=m_mapEventHandlerId.end()) 58 { 59 pEventHandler->HandleInput(); 60 } 61 if ((evref.events|EPOLLOUT)!=0 && 62 m_mapEventHandlerId.find(pEventHandler)!=m_mapEventHandlerId.end()) 63 { 64 pEventHandler->HandleOutput(); 65 } 66 } 67} 68 69 70void CEventDispatcher::DispatchEvents() 71{ 72 CEvent event; 73 CSyncEvent *pSyncEvent; 74 while(m_queueEvent.PeekEvent(event)) 75 { 76 int nRetval; 77 78 if(event.pEventHandler != NULL) 79 { 80 nRetval = event.pEventHandler->HandleEvent(event.nEventID, 81 event.dwParam, 82 event.pParam); 83 } 84 else 85 { 86 nRetval = HandleEvent(event.nEventID, 87 event.dwParam, 88 event.pParam); 89 } 90 91 if(event.pAdd != NULL) //同步消息 92 { 93 pSyncEvent=(CSyncEvent *)event.pAdd; 94 pSyncEvent->nRetval = nRetval; 95 pSyncEvent->sem.UnLock(); 96 } 97 } 98}
再看看蘑菇街开源的TeamTalk的源码(代码下载地址:https://github.com/baloonwj/TeamTalk):
1void CEventDispatch::StartDispatch(uint32_t wait_timeout) 2{ 3 fd_set read_set, write_set, excep_set; 4 timeval timeout; 5 timeout.tv_sec = 0; 6 // 10 millisecond 7 timeout.tv_usec = wait_timeout * 1000; 8 9 if(running) 10 return; 11 running = true; 12 13 while (running) 14 { 15 _CheckTimer(); 16 _CheckLoop(); 17 18 if (!m_read_set.fd_count && 19 !m_write_set.fd_count && 20 !m_excep_set.fd_count) 21 { 22 Sleep(MIN_TIMER_DURATION); 23 continue; 24 } 25 26 m_lock.lock(); 27 memcpy(&read_set, &m_read_set, 28 sizeof(fd_set)); 29 memcpy(&write_set, &m_write_set, 30 sizeof(fd_set)); 31 memcpy(&excep_set, &m_excep_set, 32 sizeof(fd_set)); 33 m_lock.unlock(); 34 35 int nfds = select(0, &read_set, 36 &write_set, 37 &excep_set, 38 &timeout); 39 40 if (nfds == SOCKET_ERROR) 41 { 42 log("select failed, error code: %d", GetLastError()); 43 Sleep(MIN_TIMER_DURATION); 44 continue; // select again 45 } 46 47 if (nfds == 0) 48 { 49 continue; 50 } 51 52 for (u_int i = 0; i < read_set.fd_count; i++) 53 { 54 //log("select return read count=%d\n", read_set.fd_count); 55 SOCKET fd = read_set.fd_array[i]; 56 CBaseSocket* pSocket = FindBaseSocket((net_handle_t)fd); 57 if (pSocket) 58 { 59 pSocket->OnRead(); 60 pSocket->ReleaseRef(); 61 } 62 } 63 64 for (u_int i = 0; i < write_set.fd_count; i++) 65 { 66 //log("select return write count=%d\n", write_set.fd_count); 67 SOCKET fd = write_set.fd_array[i]; 68 CBaseSocket* pSocket = FindBaseSocket((net_handle_t)fd); 69 if (pSocket) 70 { 71 pSocket->OnWrite(); 72 pSocket->ReleaseRef(); 73 } 74 } 75 76 for (u_int i = 0; i < excep_set.fd_count; i++) 77 { 78 //log("select return exception count=%d\n", excep_set.fd_count); 79 SOCKET fd = excep_set.fd_array[i]; 80 CBaseSocket* pSocket = FindBaseSocket((net_handle_t)fd); 81 if (pSocket) 82 { 83 pSocket->OnClose(); 84 pSocket->ReleaseRef(); 85 } 86 } 87 88 } 89}
系列目录
第01篇 主线程与工作线程的分工
第02篇 Reactor模式
第03篇 一个服务器程序的架构介绍
第04篇 如何将socket设置为非阻塞模式
第05篇 如何编写高性能日志
第06篇 关于网络编程的一些实用技巧和细节
第07篇 开源一款即时通讯软件的源码
第08篇 高性能服务器架构设计总结1
第09篇 高性能服务器架构设计总结2
第10篇 高性能服务器架构设计总结3
第11篇 高性能服务器架构设计总结4
本文分享自微信公众号 - 高性能服务器开发(easyserverdev),作者:张小方
原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。
原始发表时间:2018-04-24
本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。
我来说两句