系列目录
第01篇 主线程与工作线程的分工
第02篇 Reactor模式
第03篇 一个服务器程序的架构介绍
第04篇 如何将socket设置为非阻塞模式
第05篇 如何编写高性能日志
第06篇 关于网络编程的一些实用技巧和细节
第07篇 开源一款即时通讯软件的源码
第08篇 高性能服务器架构设计总结1
第09篇 高性能服务器架构设计总结2
第10篇 高性能服务器架构设计总结3
第11篇 高性能服务器架构设计总结4
说了这么多,我们来以flamingo的服务器程序的网络框架设计为例来验证上述介绍的理论。flamingo的网络框架是基于陈硕的muduo库,改成C++11的版本,并修改了一些bug。在此感谢原作者陈硕。flamingo的源码可以在这里下载:https://github.com/baloonwj/flamingo,打不开github的可以移步csdn:http://download.csdn.net/detail/analogous_love/9805797。
上文介绍的核心线程函数的while循环位于eventloop.cpp中:
1void EventLoop::loop()
2{
3 assert(!looping_);
4 assertInLoopThread();
5 looping_ = true;
6 quit_ = false;
7 // FIXME: what if someone calls quit() before loop() ?
8 LOG_TRACE << "EventLoop " << this << " start looping";
9
10 while (!quit_)
11 {
12 activeChannels_.clear();
13 pollReturnTime_ = poller_->poll(kPollTimeMs,
14 &activeChannels_);
15 ++iteration_;
16 if (Logger::logLevel() <= Logger::TRACE)
17 {
18 printActiveChannels();
19 }
20 // TODO sort channel by priority
21 eventHandling_ = true;
22 for (ChannelList::iterator it = activeChannels_.begin();
23 it != activeChannels_.end(); ++it)
24 {
25 currentActiveChannel_ = *it;
26 currentActiveChannel_->handleEvent(pollReturnTime_);
27 }
28 currentActiveChannel_ = NULL;
29 eventHandling_ = false;
30 doPendingFunctors();
31
32 if (frameFunctor_)
33 {
34 frameFunctor_();
35 }
36 }
37
38 LOG_TRACE << "EventLoop " << this << " stop looping";
39 looping_ = false;
40}
poller_->poll利用epoll分离网络事件,然后接着处理分离出来的网络事件,每一个客户端socket对应一个连接,即一个TcpConnection和Channel通道对象。currentActiveChannel_->handleEvent(pollReturnTime_)根据是可读、可写、出错事件来调用对应的处理函数,这些函数都是回调函数,程序初始化阶段设置进来的:
1void Channel::handleEvent(Timestamp receiveTime)
2{
3 std::shared_ptr<void> guard;
4 if (tied_)
5 {
6 guard = tie_.lock();
7 if (guard)
8 {
9 handleEventWithGuard(receiveTime);
10 }
11 }
12 else
13 {
14 handleEventWithGuard(receiveTime);
15 }
16}
17
18void Channel::handleEventWithGuard(Timestamp receiveTime)
19{
20 eventHandling_ = true;
21 LOG_TRACE << reventsToString();
22 if ((revents_ & POLLHUP) && !(revents_ & POLLIN))
23 {
24 if (logHup_)
25 {
26 LOG_WARN << "Channel::handle_event() POLLHUP";
27 }
28 if (closeCallback_) closeCallback_();
29 }
30
31 if (revents_ & POLLNVAL)
32 {
33 LOG_WARN << "Channel::handle_event() POLLNVAL";
34 }
35
36 if (revents_ & (POLLERR | POLLNVAL))
37 {
38 if (errorCallback_) errorCallback_();
39 }
40 if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))
41 {
42 //当是侦听socket时,readCallback_指向Acceptor::handleRead
43 //当是客户端socket时,调用TcpConnection::handleRead
44 if (readCallback_) readCallback_(receiveTime);
45 }
46 if (revents_ & POLLOUT)
47 {
48 //如果是连接状态服的socket,
49 //则writeCallback_指向Connector::handleWrite()
50 if (writeCallback_) writeCallback_();
51 }
52 eventHandling_ = false;
53}
当然,这里利用了Channel对象的“多态性”
1void Acceptor::handleRead()
2{
3 loop_->assertInLoopThread();
4 InetAddress peerAddr;
5 //FIXME loop until no more
6 int connfd = acceptSocket_.accept(&peerAddr);
7 if (connfd >= 0)
8 {
9 // string hostport = peerAddr.toIpPort();
10 // LOG_TRACE << "Accepts of " << hostport;
11 //newConnectionCallback_实际指向:
12 //TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
13 if (newConnectionCallback_)
14 {
15 newConnectionCallback_(connfd, peerAddr);
16 }
17 else
18 {
19 sockets::close(connfd);
20 }
21 }
22 else
23 {
24 LOG_SYSERR << "in Acceptor::handleRead";
25 // Read the section named "The special problem of
26 // accept()ing when you can't" in libev's doc.
27 // By Marc Lehmann, author of livev.
28 if (errno == EMFILE)
29 {
30 ::close(idleFd_);
31 idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL);
32 ::close(idleFd_);
33 idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC);
34 }
35 }
36}
主循环里面的业务逻辑处理对应:
1doPendingFunctors();
2
3if (frameFunctor_)
4{
5 frameFunctor_();
6}
7
8
9void EventLoop::doPendingFunctors()
10{
11 std::vector<Functor> functors;
12 callingPendingFunctors_ = true;
13
14 {
15 std::unique_lock<std::mutex> lock(mutex_);
16 functors.swap(pendingFunctors_);
17 }
18
19 for (size_t i = 0; i < functors.size(); ++i)
20 {
21 functors[i]();
22 }
23 callingPendingFunctors_ = false;
24}
这里增加业务逻辑是增加执行任务的函数指针的,增加的任务保存在成员变量pendingFunctors_中,这个变量是一个函数指针数组(vector对象),执行的时候,调用每个函数就可以了。上面的代码先利用一个栈变量将成员变量pendingFunctors_里面的函数指针换过来,接下来对这个栈变量进行操作就可以了,这样减少了锁的粒度。
因为成员变量pendingFunctors_在增加任务的时候,也会被用到,设计到多个线程操作,所以要加锁,增加任务的地方是:
1void EventLoop::queueInLoop(const Functor& cb)
2{
3 {
4 std::unique_lock<std::mutex> lock(mutex_);
5 pendingFunctors_.push_back(cb);
6 }
7
8 if (!isInLoopThread() || callingPendingFunctors_)
9 {
10 wakeup();
11 }
12}
而frameFunctor_就更简单了,就是通过设置一个函数指针就可以了。当然这里有个技巧性的东西,即增加任务的时候,为了能够立即执行,使用唤醒机制,通过往一个fd里面写入简单的几个字节,来唤醒epoll,使其立刻返回,因为此时没有其它的socke有事件,这样接下来就执行刚才添加的任务了。
我们看一下数据收取的逻辑:
1void TcpConnection::handleRead(Timestamp receiveTime)
2{
3 loop_->assertInLoopThread();
4 int savedErrno = 0;
5 ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
6 if (n > 0)
7 {
8 /*messageCallback_指向
9 CTcpSession::OnRead(const std::shared_ptr<TcpConnection>& conn,
10 Buffer* pBuffer,
11 Timestamp receiveTime)*/
12 messageCallback_(shared_from_this(),
13 &inputBuffer_, receiveTime);
14 }
15 else if (n == 0)
16 {
17 handleClose();
18 }
19 else
20 {
21 errno = savedErrno;
22 LOG_SYSERR << "TcpConnection::handleRead";
23 handleError();
24 }
25}
将收到的数据放到接收缓冲区里面,将来我们来解包:
1void ClientSession::OnRead(const std::shared_ptr<TcpConnection>& conn, Buffer* pBuffer, Timestamp receivTime)
2{
3 while (true)
4 {
5 //不够一个包头大小
6 if (pBuffer->readableBytes() < (size_t)sizeof(msg))
7 {
8 LOG_INFO << "buffer is not enough for a package header, pBuffer->readableBytes()=" << pBuffer->readableBytes() << ", sizeof(msg)=" << sizeof(msg);
9 return;
10 }
11
12 //不够一个整包大小
13 msg header;
14 memcpy(&header, pBuffer->peek(), sizeof(msg));
15 if (pBuffer->readableBytes() < (size_t)header.packagesize + sizeof(msg))
16 return;
17
18 pBuffer->retrieve(sizeof(msg));
19 std::string inbuf;
20 inbuf.append(pBuffer->peek(), header.packagesize);
21 pBuffer->retrieve(header.packagesize);
22 if (!Process(conn, inbuf.c_str(), inbuf.length()))
23 {
24 LOG_WARN << "Process error, close TcpConnection";
25 conn->forceClose();
26 }
27 }// end while-loop
28
29}
先判断接收缓冲区里面的数据是否够一个包头大小,如果够再判断够不够包头指定的包体大小,如果还是够的话,接着在Process函数里面处理该包。
再看看发送数据的逻辑:
1void TcpConnection::sendInLoop(const void* data,
2 size_t len)
3{
4 loop_->assertInLoopThread();
5 ssize_t nwrote = 0;
6 size_t remaining = len;
7 bool faultError = false;
8 if (state_ == kDisconnected)
9 {
10 LOG_WARN << "disconnected, give up writing";
11 return;
12 }
13 // if no thing in output queue, try writing directly
14 if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0)
15 {
16 nwrote = sockets::write(channel_->fd(), data, len);
17 if (nwrote >= 0)
18 {
19 remaining = len - nwrote;
20 if (remaining == 0 && writeCompleteCallback_)
21 {
22 loop_->queueInLoop(std::bind(writeCompleteCallback_,
23 shared_from_this()));
24 }
25 }
26 else // nwrote < 0
27 {
28 nwrote = 0;
29 if (errno != EWOULDBLOCK)
30 {
31 LOG_SYSERR << "TcpConnection::sendInLoop";
32 // FIXME: any others?
33 if (errno == EPIPE || errno == ECONNRESET)
34 {
35 faultError = true;
36 }
37 }
38 }
39 }
40
41 assert(remaining <= len);
42 if (!faultError && remaining > 0)
43 {
44 size_t oldLen = outputBuffer_.readableBytes();
45 if (oldLen + remaining >= highWaterMark_
46 && oldLen < highWaterMark_
47 && highWaterMarkCallback_)
48 {
49 loop_->queueInLoop(std::bind(highWaterMarkCallback_,
50 shared_from_this(),
51 oldLen + remaining));
52 }
53 outputBuffer_.append(static_cast<const char*>(data)+nwrote,
54 remaining);
55 if (!channel_->isWriting())
56 {
57 channel_->enableWriting();
58 }
59 }
60}
如果剩余的数据remaining大于则调用channel_->enableWriting();开始监听可写事件,可写事件处理如下:
1void TcpConnection::handleWrite()
2{
3 loop_->assertInLoopThread();
4 if (channel_->isWriting())
5 {
6 ssize_t n = sockets::write(channel_->fd(),
7 outputBuffer_.peek(),
8 outputBuffer_.readableBytes());
9 if (n > 0)
10 {
11 outputBuffer_.retrieve(n);
12 if (outputBuffer_.readableBytes() == 0)
13 {
14 channel_->disableWriting();
15 if (writeCompleteCallback_)
16 {
17 loop_->queueInLoop(std::bind(writeCompleteCallback_,
18 shared_from_this()));
19 }
20 if (state_ == kDisconnecting)
21 {
22 shutdownInLoop();
23 }
24 }
25 }
26 else
27 {
28 LOG_SYSERR << "TcpConnection::handleWrite";
29 // if (state_ == kDisconnecting)
30 // {
31 // shutdownInLoop();
32 // }
33 }
34 }
35 else
36 {
37 LOG_TRACE << "Connection fd = " << channel_->fd()
38 << " is down, no more writing";
39 }
40}
如果发送完数据以后调用channel_->disableWriting();移除监听可写事件。
很多读者可能一直想问,文中不是说解包数据并处理逻辑是业务代码而非网络通信的代码,你这里貌似都混在一起了,其实没有,这里实际的业务代码处理都是框架曾提供的回调函数里面处理的,具体怎么处理,由框架使用者——业务层自己定义。
总结起来,实际上就是一个线程函数里一个loop那么点事情,不信你再看我曾经工作上的一个交易系统项目代码:
1void CEventDispatcher::Run()
2{
3 m_bShouldRun = true;
4 while(m_bShouldRun)
5 {
6 DispatchIOs();
7 SyncTime();
8 CheckTimer();
9 DispatchEvents();
10 }
11}
12
13
14void CEpollReactor::DispatchIOs()
15{
16 DWORD dwSelectTimeOut = SR_DEFAULT_EPOLL_TIMEOUT;
17 if (HandleOtherTask())
18 {
19 dwSelectTimeOut = 0;
20 }
21
22 struct epoll_event ev;
23 CEventHandlerIdMap::iterator itor = m_mapEventHandlerId.begin();
24 for(; itor!=m_mapEventHandlerId.end(); itor++)
25 {
26 CEventHandler *pEventHandler = (CEventHandler *)(*itor).first;
27 if(pEventHandler == NULL){
28 continue;
29 }
30 ev.data.ptr = pEventHandler;
31 ev.events = 0;
32 int nReadID, nWriteID;
33 pEventHandler->GetIds(&nReadID, &nWriteID);
34 if (nReadID > 0)
35 {
36 ev.events |= EPOLLIN;
37 }
38 if (nWriteID > 0)
39 {
40 ev.events |= EPOLLOUT;
41 }
42
43 epoll_ctl(m_fdEpoll, EPOLL_CTL_MOD, (*itor).second, &ev);
44 }
45
46 struct epoll_event events[EPOLL_MAX_EVENTS];
47
48 int nfds = epoll_wait(m_fdEpoll, events,
49 EPOLL_MAX_EVENTS,
50 dwSelectTimeOut/1000);
51
52 for (int i=0; i<nfds; i++)
53 {
54 struct epoll_event &evref = events[i];
55 CEventHandler *pEventHandler = (CEventHandler *)evref.data.ptr;
56 if ((evref.events|EPOLLIN)!=0 &&
57 m_mapEventHandlerId.find(pEventHandler)!=m_mapEventHandlerId.end())
58 {
59 pEventHandler->HandleInput();
60 }
61 if ((evref.events|EPOLLOUT)!=0 &&
62 m_mapEventHandlerId.find(pEventHandler)!=m_mapEventHandlerId.end())
63 {
64 pEventHandler->HandleOutput();
65 }
66 }
67}
68
69
70void CEventDispatcher::DispatchEvents()
71{
72 CEvent event;
73 CSyncEvent *pSyncEvent;
74 while(m_queueEvent.PeekEvent(event))
75 {
76 int nRetval;
77
78 if(event.pEventHandler != NULL)
79 {
80 nRetval = event.pEventHandler->HandleEvent(event.nEventID,
81 event.dwParam,
82 event.pParam);
83 }
84 else
85 {
86 nRetval = HandleEvent(event.nEventID,
87 event.dwParam,
88 event.pParam);
89 }
90
91 if(event.pAdd != NULL) //同步消息
92 {
93 pSyncEvent=(CSyncEvent *)event.pAdd;
94 pSyncEvent->nRetval = nRetval;
95 pSyncEvent->sem.UnLock();
96 }
97 }
98}
再看看蘑菇街开源的TeamTalk的源码(代码下载地址:https://github.com/baloonwj/TeamTalk):
1void CEventDispatch::StartDispatch(uint32_t wait_timeout)
2{
3 fd_set read_set, write_set, excep_set;
4 timeval timeout;
5 timeout.tv_sec = 0;
6 // 10 millisecond
7 timeout.tv_usec = wait_timeout * 1000;
8
9 if(running)
10 return;
11 running = true;
12
13 while (running)
14 {
15 _CheckTimer();
16 _CheckLoop();
17
18 if (!m_read_set.fd_count &&
19 !m_write_set.fd_count &&
20 !m_excep_set.fd_count)
21 {
22 Sleep(MIN_TIMER_DURATION);
23 continue;
24 }
25
26 m_lock.lock();
27 memcpy(&read_set, &m_read_set,
28 sizeof(fd_set));
29 memcpy(&write_set, &m_write_set,
30 sizeof(fd_set));
31 memcpy(&excep_set, &m_excep_set,
32 sizeof(fd_set));
33 m_lock.unlock();
34
35 int nfds = select(0, &read_set,
36 &write_set,
37 &excep_set,
38 &timeout);
39
40 if (nfds == SOCKET_ERROR)
41 {
42 log("select failed, error code: %d", GetLastError());
43 Sleep(MIN_TIMER_DURATION);
44 continue; // select again
45 }
46
47 if (nfds == 0)
48 {
49 continue;
50 }
51
52 for (u_int i = 0; i < read_set.fd_count; i++)
53 {
54 //log("select return read count=%d\n", read_set.fd_count);
55 SOCKET fd = read_set.fd_array[i];
56 CBaseSocket* pSocket = FindBaseSocket((net_handle_t)fd);
57 if (pSocket)
58 {
59 pSocket->OnRead();
60 pSocket->ReleaseRef();
61 }
62 }
63
64 for (u_int i = 0; i < write_set.fd_count; i++)
65 {
66 //log("select return write count=%d\n", write_set.fd_count);
67 SOCKET fd = write_set.fd_array[i];
68 CBaseSocket* pSocket = FindBaseSocket((net_handle_t)fd);
69 if (pSocket)
70 {
71 pSocket->OnWrite();
72 pSocket->ReleaseRef();
73 }
74 }
75
76 for (u_int i = 0; i < excep_set.fd_count; i++)
77 {
78 //log("select return exception count=%d\n", excep_set.fd_count);
79 SOCKET fd = excep_set.fd_array[i];
80 CBaseSocket* pSocket = FindBaseSocket((net_handle_t)fd);
81 if (pSocket)
82 {
83 pSocket->OnClose();
84 pSocket->ReleaseRef();
85 }
86 }
87
88 }
89}
系列目录
第01篇 主线程与工作线程的分工
第02篇 Reactor模式
第03篇 一个服务器程序的架构介绍
第04篇 如何将socket设置为非阻塞模式
第05篇 如何编写高性能日志
第06篇 关于网络编程的一些实用技巧和细节
第07篇 开源一款即时通讯软件的源码
第08篇 高性能服务器架构设计总结1
第09篇 高性能服务器架构设计总结2
第10篇 高性能服务器架构设计总结3
第11篇 高性能服务器架构设计总结4