说了这么多,我们来以flamingo的服务器程序的网络框架设计为例来验证上述介绍的理论。flamingo的网络框架是基于陈硕的muduo库,改成C++11的版本,并修改了一些bug。在此感谢原作者陈硕。flamingo的源码可以在这里下载:https://github.com/baloonwj/flamingo,打不开github的可以移步csdn:http://download.csdn.net/detail/analogous_love/9805797。
上文介绍的核心线程函数的while循环位于eventloop.cpp中:
void EventLoop::loop() { assert(!looping_); assertInLoopThread(); looping_ = true; quit_ = false; // FIXME: what if someone calls quit() before loop() ? LOG_TRACE << "EventLoop " << this << " start looping"; while (!quit_) { activeChannels_.clear(); pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); ++iteration_; if (Logger::logLevel() <= Logger::TRACE) { printActiveChannels(); } // TODO sort channel by priority eventHandling_ = true; for (ChannelList::iterator it = activeChannels_.begin(); it != activeChannels_.end(); ++it) { currentActiveChannel_ = *it; currentActiveChannel_->handleEvent(pollReturnTime_); } currentActiveChannel_ = NULL; eventHandling_ = false; doPendingFunctors(); if (frameFunctor_) { frameFunctor_(); } } LOG_TRACE << "EventLoop " << this << " stop looping"; looping_ = false; }
poller_->poll利用epoll分离网络事件,然后接着处理分离出来的网络事件,每一个客户端socket对应一个连接,即一个TcpConnection和Channel通道对象。currentActiveChannel_->handleEvent(pollReturnTime_)根据是可读、可写、出错事件来调用对应的处理函数,这些函数都是回调函数,程序初始化阶段设置进来的:
void Channel::handleEvent(Timestamp receiveTime) { std::shared_ptr<void> guard; if (tied_) { guard = tie_.lock(); if (guard) { handleEventWithGuard(receiveTime); } } else { handleEventWithGuard(receiveTime); } } void Channel::handleEventWithGuard(Timestamp receiveTime) { eventHandling_ = true; LOG_TRACE << reventsToString(); if ((revents_ & POLLHUP) && !(revents_ & POLLIN)) { if (logHup_) { LOG_WARN << "Channel::handle_event() POLLHUP"; } if (closeCallback_) closeCallback_(); } if (revents_ & POLLNVAL) { LOG_WARN << "Channel::handle_event() POLLNVAL"; } if (revents_ & (POLLERR | POLLNVAL)) { if (errorCallback_) errorCallback_(); } if (revents_ & (POLLIN | POLLPRI | POLLRDHUP)) { //当是侦听socket时,readCallback_指向Acceptor::handleRead //当是客户端socket时,调用TcpConnection::handleRead if (readCallback_) readCallback_(receiveTime); } if (revents_ & POLLOUT) { //如果是连接状态服的socket,则writeCallback_指向Connector::handleWrite() if (writeCallback_) writeCallback_(); } eventHandling_ = false; }
当然,这里利用了Channel对象的“多态性”,如果是普通socket,可读事件就会调用预先设置的回调函数;但是如果是侦听socket,则调用Aceptor对象的handleRead()来接收新连接:
void Acceptor::handleRead() { loop_->assertInLoopThread(); InetAddress peerAddr; //FIXME loop until no more int connfd = acceptSocket_.accept(&peerAddr); if (connfd >= 0) { // string hostport = peerAddr.toIpPort(); // LOG_TRACE << "Accepts of " << hostport; //newConnectionCallback_实际指向TcpServer::newConnection(int sockfd, const InetAddress& peerAddr) if (newConnectionCallback_) { newConnectionCallback_(connfd, peerAddr); } else { sockets::close(connfd); } } else { LOG_SYSERR << "in Acceptor::handleRead"; // Read the section named "The special problem of // accept()ing when you can't" in libev's doc. // By Marc Lehmann, author of livev. if (errno == EMFILE) { ::close(idleFd_); idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL); ::close(idleFd_); idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC); } } }
主循环里面的业务逻辑处理对应:
doPendingFunctors(); if (frameFunctor_) { frameFunctor_(); }
void EventLoop::doPendingFunctors() { std::vector<Functor> functors; callingPendingFunctors_ = true; { std::unique_lock<std::mutex> lock(mutex_); functors.swap(pendingFunctors_); } for (size_t i = 0; i < functors.size(); ++i) { functors[i](); } callingPendingFunctors_ = false; }
这里增加业务逻辑是增加执行任务的函数指针的,增加的任务保存在成员变量pendingFunctors_中,这个变量是一个函数指针数组(vector对象),执行的时候,调用每个函数就可以了。上面的代码先利用一个栈变量将成员变量pendingFunctors_里面的函数指针换过来,接下来对这个栈变量进行操作就可以了,这样减少了锁的粒度。因为成员变量pendingFunctors_在增加任务的时候,也会被用到,设计到多个线程操作,所以要加锁,增加任务的地方是:
void EventLoop::queueInLoop(const Functor& cb) { { std::unique_lock<std::mutex> lock(mutex_); pendingFunctors_.push_back(cb); } if (!isInLoopThread() || callingPendingFunctors_) { wakeup(); } }
而frameFunctor_就更简单了,就是通过设置一个函数指针就可以了。当然这里有个技巧性的东西,即增加任务的时候,为了能够立即执行,使用唤醒机制,通过往一个fd里面写入简单的几个字节,来唤醒epoll,使其立刻返回,因为此时没有其它的socke有事件,这样接下来就执行刚才添加的任务了。
我们看一下数据收取的逻辑:
void TcpConnection::handleRead(Timestamp receiveTime) { loop_->assertInLoopThread(); int savedErrno = 0; ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno); if (n > 0) { //messageCallback_指向CTcpSession::OnRead(const std::shared_ptr<TcpConnection>& conn, Buffer* pBuffer, Timestamp receiveTime) messageCallback_(shared_from_this(), &inputBuffer_, receiveTime); } else if (n == 0) { handleClose(); } else { errno = savedErrno; LOG_SYSERR << "TcpConnection::handleRead"; handleError(); } }
将收到的数据放到接收缓冲区里面,将来我们来解包:
void ClientSession::OnRead(const std::shared_ptr<TcpConnection>& conn, Buffer* pBuffer, Timestamp receivTime) { while (true) { //不够一个包头大小 if (pBuffer->readableBytes() < (size_t)sizeof(msg)) { LOG_INFO << "buffer is not enough for a package header, pBuffer->readableBytes()=" << pBuffer->readableBytes() << ", sizeof(msg)=" << sizeof(msg); return; } //不够一个整包大小 msg header; memcpy(&header, pBuffer->peek(), sizeof(msg)); if (pBuffer->readableBytes() < (size_t)header.packagesize + sizeof(msg)) return; pBuffer->retrieve(sizeof(msg)); std::string inbuf; inbuf.append(pBuffer->peek(), header.packagesize); pBuffer->retrieve(header.packagesize); if (!Process(conn, inbuf.c_str(), inbuf.length())) { LOG_WARN << "Process error, close TcpConnection"; conn->forceClose(); } }// end while-loop }
先判断接收缓冲区里面的数据是否够一个包头大小,如果够再判断够不够包头指定的包体大小,如果还是够的话,接着在Process函数里面处理该包。
再看看发送数据的逻辑:
void TcpConnection::sendInLoop(const void* data, size_t len) { loop_->assertInLoopThread(); ssize_t nwrote = 0; size_t remaining = len; bool faultError = false; if (state_ == kDisconnected) { LOG_WARN << "disconnected, give up writing"; return; } // if no thing in output queue, try writing directly if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) { nwrote = sockets::write(channel_->fd(), data, len); if (nwrote >= 0) { remaining = len - nwrote; if (remaining == 0 && writeCompleteCallback_) { loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this())); } } else // nwrote < 0 { nwrote = 0; if (errno != EWOULDBLOCK) { LOG_SYSERR << "TcpConnection::sendInLoop"; if (errno == EPIPE || errno == ECONNRESET) // FIXME: any others? { faultError = true; } } } } assert(remaining <= len); if (!faultError && remaining > 0) { size_t oldLen = outputBuffer_.readableBytes(); if (oldLen + remaining >= highWaterMark_ && oldLen < highWaterMark_ && highWaterMarkCallback_) { loop_->queueInLoop(std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining)); } outputBuffer_.append(static_cast<const char*>(data)+nwrote, remaining); if (!channel_->isWriting()) { channel_->enableWriting(); } } }
如果剩余的数据remaining大于则调用channel_->enableWriting();开始监听可写事件,可写事件处理如下:
void TcpConnection::handleWrite() { loop_->assertInLoopThread(); if (channel_->isWriting()) { ssize_t n = sockets::write(channel_->fd(), outputBuffer_.peek(), outputBuffer_.readableBytes()); if (n > 0) { outputBuffer_.retrieve(n); if (outputBuffer_.readableBytes() == 0) { channel_->disableWriting(); if (writeCompleteCallback_) { loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this())); } if (state_ == kDisconnecting) { shutdownInLoop(); } } } else { LOG_SYSERR << "TcpConnection::handleWrite"; // if (state_ == kDisconnecting) // { // shutdownInLoop(); // } } } else { LOG_TRACE << "Connection fd = " << channel_->fd() << " is down, no more writing"; } }
如果发送完数据以后调用channel_->disableWriting();移除监听可写事件。
很多读者可能一直想问,文中不是说解包数据并处理逻辑是业务代码而非网络通信的代码,你这里貌似都混在一起了,其实没有,这里实际的业务代码处理都是框架曾提供的回调函数里面处理的,具体怎么处理,由框架使用者——业务层自己定义。
总结起来,实际上就是一个线程函数里一个loop那么点事情,不信你再看我曾经工作上的一个交易系统项目代码:
void CEventDispatcher::Run() { m_bShouldRun = true; while(m_bShouldRun) { DispatchIOs(); SyncTime(); CheckTimer(); DispatchEvents(); } }
void CEpollReactor::DispatchIOs() { DWORD dwSelectTimeOut = SR_DEFAULT_EPOLL_TIMEOUT; if (HandleOtherTask()) { dwSelectTimeOut = 0; } struct epoll_event ev; CEventHandlerIdMap::iterator itor = m_mapEventHandlerId.begin(); for(; itor!=m_mapEventHandlerId.end(); itor++) { CEventHandler *pEventHandler = (CEventHandler *)(*itor).first; if(pEventHandler == NULL){ continue; } ev.data.ptr = pEventHandler; ev.events = 0; int nReadID, nWriteID; pEventHandler->GetIds(&nReadID, &nWriteID); if (nReadID > 0) { ev.events |= EPOLLIN; } if (nWriteID > 0) { ev.events |= EPOLLOUT; } epoll_ctl(m_fdEpoll, EPOLL_CTL_MOD, (*itor).second, &ev); } struct epoll_event events[EPOLL_MAX_EVENTS]; int nfds = epoll_wait(m_fdEpoll, events, EPOLL_MAX_EVENTS, dwSelectTimeOut/1000); for (int i=0; i<nfds; i++) { struct epoll_event &evref = events[i]; CEventHandler *pEventHandler = (CEventHandler *)evref.data.ptr; if ((evref.events|EPOLLIN)!=0 && m_mapEventHandlerId.find(pEventHandler)!=m_mapEventHandlerId.end()) { pEventHandler->HandleInput(); } if ((evref.events|EPOLLOUT)!=0 && m_mapEventHandlerId.find(pEventHandler)!=m_mapEventHandlerId.end()) { pEventHandler->HandleOutput(); } } }
void CEventDispatcher::DispatchEvents() { CEvent event; CSyncEvent *pSyncEvent; while(m_queueEvent.PeekEvent(event)) { int nRetval; if(event.pEventHandler != NULL) { nRetval = event.pEventHandler->HandleEvent(event.nEventID, event.dwParam, event.pParam); } else { nRetval = HandleEvent(event.nEventID, event.dwParam, event.pParam); } if(event.pAdd != NULL) //同步消息 { pSyncEvent=(CSyncEvent *)event.pAdd; pSyncEvent->nRetval = nRetval; pSyncEvent->sem.UnLock(); } } }
再看看蘑菇街开源的TeamTalk的源码(代码下载地址:https://github.com/baloonwj/TeamTalk):
void CEventDispatch::StartDispatch(uint32_t wait_timeout) { fd_set read_set, write_set, excep_set; timeval timeout; timeout.tv_sec = 0; timeout.tv_usec = wait_timeout * 1000; // 10 millisecond if(running) return; running = true; while (running) { _CheckTimer(); _CheckLoop(); if (!m_read_set.fd_count && !m_write_set.fd_count && !m_excep_set.fd_count) { Sleep(MIN_TIMER_DURATION); continue; } m_lock.lock(); memcpy(&read_set, &m_read_set, sizeof(fd_set)); memcpy(&write_set, &m_write_set, sizeof(fd_set)); memcpy(&excep_set, &m_excep_set, sizeof(fd_set)); m_lock.unlock(); int nfds = select(0, &read_set, &write_set, &excep_set, &timeout); if (nfds == SOCKET_ERROR) { log("select failed, error code: %d", GetLastError()); Sleep(MIN_TIMER_DURATION); continue; // select again } if (nfds == 0) { continue; } for (u_int i = 0; i < read_set.fd_count; i++) { //log("select return read count=%d\n", read_set.fd_count); SOCKET fd = read_set.fd_array[i]; CBaseSocket* pSocket = FindBaseSocket((net_handle_t)fd); if (pSocket) { pSocket->OnRead(); pSocket->ReleaseRef(); } } for (u_int i = 0; i < write_set.fd_count; i++) { //log("select return write count=%d\n", write_set.fd_count); SOCKET fd = write_set.fd_array[i]; CBaseSocket* pSocket = FindBaseSocket((net_handle_t)fd); if (pSocket) { pSocket->OnWrite(); pSocket->ReleaseRef(); } } for (u_int i = 0; i < excep_set.fd_count; i++) { //log("select return exception count=%d\n", excep_set.fd_count); SOCKET fd = excep_set.fd_array[i]; CBaseSocket* pSocket = FindBaseSocket((net_handle_t)fd); if (pSocket) { pSocket->OnClose(); pSocket->ReleaseRef(); } } } }
由于微信公众号文章字数的限制,本篇文章未完,下一篇是《服务器端编程心得(八)——高性能服务器架构设计总结3——以flamigo服务器代码为例》。
本文分享自微信公众号 - 高性能服务器开发(easyserverdev)
原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。
原始发表时间:2018-02-21
本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。
我来说两句