这是从零学习开源项目的第四篇,上一篇是《从零学习开源项目系列(三) CSBattleMgr服务源码研究》,这篇文章我们一起来学习LogServer,中文意思可能是“日志服务器”。那么这个日志服务器到底做了哪些工作呢?
我们在Visual Studio中将LogServer设置为启动项,然后按F5将LogServer启动起来,启动成功后显示如下图:
从上图中,我们可以到大致做了三件事:
1. 创建一个侦听端口(端口号1234)
2. 连接mysql数据库
3. 初始化日志处理程序
我们来验证一下这三件事的细节。我们再Visual Studio中将程序中断(【调试】菜单-【全部中断】,快捷键Ctrl + Alt + Break)。然后在线程窗口查看这个程序所有的线程,如下图所示:
所有用红色旗帜标记的线程都是用户线程,我们可以查看这些线程的调用堆栈。我们从最上面的主线程开始:
切换到main函数,我们可以看出这里是一个循环:
1int main()
2{
3 auto res = CLogHandler::GetInstance().Init();
4 if (res)
5 {
6 while(true)
7 {
8 INetSessionMgr::GetInstance()->Update();
9 Sleep(1);
10 }
11 }
12 return 0;
13}
这里一个是初始化动作,一个循环中Update动作,它们具体做了些什么,我们先不管,我们先看其他线程做了什么,再回过头来看这里的代码。
我们接着看下一个线程的内容:
从调用堆栈来看,这是一个使用boost::thread启动的线程,这个线程函数代码如下:
1void Active::Run() {
2 if (m_BeginInThreadCallback){
3 m_BeginInThreadCallback();
4 }
5 while (true){
6 Consume();
7 }
8}
我们先看下这个线程函数做了什么,主要是m_BeginInThreadCallback和Consume()函数,看下Consume()函数:
1void Active::Consume(){
2 boost::mutex::scoped_lock lock(m_IOMutex);
3 while(m_Queue.empty()){
4 m_ConditionVar.wait(lock);
5 }
6 m_SwapQueue.swap(m_Queue);
7 lock.unlock();
8 while(!m_SwapQueue.empty()){
9 Buffer* pBuffer = m_SwapQueue.front();
10 m_SwapQueue.pop();
11 m_Callback(pBuffer);
12 --m_PendingWorkNum;
13 if (pBuffer){
14 m_pBufferPool.ReleaseObejct(pBuffer);
15 }
16 }
17}
这段代码很好理解,先使用条件变量挂起当前线程,条件变量触发后,如果消费者和生产者共有队列m_Queue中有数据,将公用的队列m_Queue临时倒换到本地的一个局部队列m_SwapQueue中,然后挨个处理队列m_SwapQueue中的数据。
这个线程在哪里创建的呢?通过搜索线程函数,我们找到如下代码:
1void Active::Start(){
2 bool ifHvTimer = !m_ThreadTimer.IsEmpty();
3 if (ifHvTimer){
4 m_Thread = boost::thread(&Active::RunWithUpdate, this);
5 }
6 else{
7 m_Thread = boost::thread(&Active::Run, this);
8 }
9 m_ThreadID = get_native_thread_id(m_Thread);
10 char sThreadName[30];
11 sprintf(sThreadName, "%s-%d", "Actor-Run", GetActorID());
12 _SetThreadName(m_ThreadID, sThreadName);
13}
在上面这个函数中添加断点,重启下程序,很快会触发断点,我们看下断点触发时的调用堆栈:
通过调用堆栈,我们发现这个线程在一个全局变量的构造函数中初始化的,这个全局变量在DllMain()函数中初始化:
而这个dll是ELogging项目生成的:
也就是说,这是一个与日志处理相关的线程。生产者产生日志记录,然后由这个线程作为消费者,来处理日志。
我们接着看下一个线程的内容:
1void CConnectCtrl::OnExecute()
2{
3 while(!m_bTerminate)
4 {
5 _ProcRequests();
6 _ProcEvents();
7 //CCPSockMgr::Instance()->CheckDelayRelease();
8 Sleep(1);
9 }
10}
这也是一个循环,先看下_ProcRequests()函数:
1void CConnectCtrl::_ProcRequests()
2{
3 while(m_dwSockCount < (UINT32)MAX_CONNECTION)
4 {
5 SConnReq* pstConnReq = (SConnReq*)m_oReqQueue.PopFront();
6 if(NULL == pstConnReq)
7 {
8 break;
9 }
10 SOCKET hSock = socket(PF_INET, SOCK_STREAM, IPPROTO_IP);
11 if(INVALID_SOCKET == hSock)
12 {
13 CRITICAL(_SDT("CConnectCtrl::_ProcRequests, socket failed, errno %d"), WSAGetLastError());
14 CEventMgr::Instance()->PushConnErrEvt(WSAGetLastError(), pstConnReq->dwConnectorID);
15 m_oFreeQueue.PushBack(pstConnReq);
16 break;
17 }
18 //// 2009-04-02 cwy modify for general use
19 if (pstConnReq->bNeedBind)
20 {
21 if ( false == BindAddress(hSock, pstConnReq->pszBindIP, pstConnReq->wBindPort) )
22 {
23 _OnSockError(hSock, pstConnReq);
24 break;
25 }
26 }
27 if (g_bNodelay)
28 {
29 const CHAR szOpt = 1;
30 if (0 != ::setsockopt(hSock, IPPROTO_TCP, TCP_NODELAY, (char *)&szOpt, sizeof(char)))
31 {
32 WARN(_SDT("setsockopt for new socket on UpdateConetext failed, errno=%d"), ::WSAGetLastError());
33 }
34 }
35 WSAEVENT hEvent = WSACreateEvent();
36 if(WSA_INVALID_EVENT == hEvent)
37 {
38 _OnSockError(hSock, pstConnReq);
39 break;
40 }
41 if(SOCKET_ERROR == WSAEventSelect(hSock, hEvent, FD_CONNECT))
42 {
43 _OnSockError(hSock, pstConnReq);
44 WSACloseEvent(hEvent);
45 break;
46 }
47 sockaddr_in stAddr = {0};
48 stAddr.sin_family = AF_INET;
49 stAddr.sin_addr.s_addr = pstConnReq->dwIP;
50 stAddr.sin_port = htons(pstConnReq->wPort);
51 if( SOCKET_ERROR == connect(hSock, (sockaddr*)&stAddr, sizeof(stAddr)) )
52 {
53 if(WSAEWOULDBLOCK != WSAGetLastError())
54 {
55 _OnSockError(hSock, pstConnReq);
56 WSACloseEvent(hEvent);
57 break;
58 }
59 }
60 m_pProcReqArray[m_dwSockCount] = pstConnReq;
61 m_pSockArray[m_dwSockCount] = hSock;
62 m_pEventsArray[m_dwSockCount] = hEvent;
63 ++m_dwSockCount;
64 }
65}
这段函数的逻辑也是比较容易懂,先从一个队列中取出数据,然后处理,只不过这些数据都是与连接相关的信息。
再看下while循环中第二个函数_ProcEvents:
1void CConnectCtrl::_ProcEvents()
2{
3 if(0 == m_dwSockCount)
4 {
5 return;
6 }
7 WSANETWORKEVENTS stNetworkEvents;
8 WSAEVENT* pEvents;
9 UINT32 dwCount;
10 UINT32 dwIndex;
11 UINT32 dwStart = 0;
12 do
13 {
14 pEvents = &m_pEventsArray[dwStart];
15 if(dwStart + WSA_MAXIMUM_WAIT_EVENTS > m_dwSockCount)
16 {
17 dwCount = m_dwSockCount - dwStart;
18 }
19 else
20 {
21 dwCount = WSA_MAXIMUM_WAIT_EVENTS;
22 }
23 dwIndex = WSAWaitForMultipleEvents(dwCount, pEvents, false, 0, false);
24 if(WSA_WAIT_FAILED == dwIndex || WSA_WAIT_TIMEOUT == dwIndex)
25 {
26 dwStart += dwCount;
27 continue;
28 }
29 dwIndex -= WSA_WAIT_EVENT_0;
30 dwIndex += dwStart;
31 ++dwStart;
32 SDASSERT(m_pProcReqArray[dwIndex] != NULL && m_pSockArray[dwIndex] != INVALID_SOCKET && m_pEventsArray[dwIndex] != WSA_INVALID_EVENT);
33 if(SOCKET_ERROR == WSAEnumNetworkEvents(m_pSockArray[dwIndex], m_pEventsArray[dwIndex], &stNetworkEvents))
34 {
35 if(WSAEWOULDBLOCK != WSAGetLastError())
36 {
37 CEventMgr::Instance()->PushConnErrEvt(WSAGetLastError(), m_pProcReqArray[dwIndex]->dwConnectorID);
38 _CloseEvent(dwIndex);
39 }
40 continue;
41 }
42 if(stNetworkEvents.lNetworkEvents & FD_CONNECT)
43 {
44 if(stNetworkEvents.iErrorCode[FD_CONNECT_BIT] != 0)
45 {
46 CEventMgr::Instance()->PushConnErrEvt(stNetworkEvents.iErrorCode[FD_CONNECT_BIT], m_pProcReqArray[dwIndex]->dwConnectorID);
47 _CloseEvent(dwIndex);
48 continue;
49 }
50 //
51 // 连接成功
52 //
53 SConnReq* pstReq = m_pProcReqArray[dwIndex];
54 CConnData * pConnData = CConnDataMgr::Instance()->Alloc(pstReq->dwRecvBufSize, pstReq->dwSendBufSize);
55 if (pConnData == NULL)
56 {
57 CRITICAL(_SDT("CConnectCtrl::_ProcEvents, create ConnData failed"));
58 CEventMgr::Instance()->PushConnErrEvt(0, pstReq->dwConnectorID);
59 _CloseEvent(dwIndex);
60 continue;
61 }
62 CCPSock *poSock = &pConnData->sock;
63 CUCConnection * poConnection = &pConnData->connection;
64 poSock->SetSock(m_pSockArray[dwIndex]);
65 m_oFreeQueue.PushBack(m_pProcReqArray[dwIndex]);
66 WSACloseEvent(m_pEventsArray[dwIndex]);
67 m_pProcReqArray[dwIndex] = NULL;
68 m_pSockArray[dwIndex] = INVALID_SOCKET;
69 m_pEventsArray[dwIndex] = WSA_INVALID_EVENT;
70 sockaddr_in stAddr = {0};
71 INT32 nAddrLen = sizeof(stAddr);
72 getsockname(poSock->GetSock(), (sockaddr*)&stAddr, &nAddrLen);
73 poConnection->SetAccept(false);
74 poConnection->SetParentID(pstReq->dwConnectorID);
75 poConnection->SetSession(pstReq->poSession);
76 poConnection->SetLocalIP(stAddr.sin_addr.s_addr);
77 poConnection->SetLocalPort(SDNtohs(stAddr.sin_port));
78 poConnection->SetRemoteIP(pstReq->dwIP);
79 poConnection->SetRemotePort(pstReq->wPort);
80 //poConnection->SetCpSock(poSock);
81 //poSock->SetConnection(poConnection);
82 poSock->SetPacketParser(pstReq->poPacketParser);
83 poSock->SetConnect(TRUE);
84 //CEventMgr::Instance()->PushEstablishEvt(pConnData, false, pstReq->dwConnectorID);
85 if(false == poSock->AssociateWithIocp())
86 {
87 poSock->Close();
88 }
89 else
90 {
91 if(false == poSock->PostRecv())
92 {
93 poSock->Close();
94 }
95 }
96 }
97 }while(dwStart < m_dwSockCount);
98 _CompressEvent();
99}
这个函数,对上一个函数中发起的连接结果做出判断并处理。如果连接成功,则向完成端口上投递一个recv事件。这个循环的代码,我建议读者好好研究一下,非常好的重连实例,同时也组合了完成端口的模型,还有一些重要的网络编程细节(如nodelay选项等)。
那么这个线程在哪里启动的呢?通过搜索OnExecute函数名我们发现真正的线程函数:
1unsigned CConnectCtrl::ThreadFunc(LPVOID pParam)
2{
3 CConnectCtrl* poCtrl = (CConnectCtrl*)pParam;
4 poCtrl->OnExecute();
5 return 0;
6}
进而搜索到:
1bool CConnectCtrl::Init()
2{
3 INT32 nMaxRequest = MAX_CONNECTION * 2;
4 m_pAllReqArray = new SConnReq[nMaxRequest];
5 if(NULL == m_pAllReqArray)
6 {
7 return false;
8 }
9 if(false == m_oFreeQueue.Init(nMaxRequest+1))
10 {
11 return false;
12 }
13 if(false == m_oReqQueue.Init(nMaxRequest+1))
14 {
15 return false;
16 }
17 INT32 i;
18 for(i = 0; i < nMaxRequest; i++)
19 {
20 m_oFreeQueue.PushBack(&m_pAllReqArray[i]);
21 }
22 m_pProcReqArray = new SConnReq*[MAX_CONNECTION];
23 if(NULL == m_pProcReqArray)
24 {
25 CRITICAL(_SDT("CConnectCtrl::Init, new SConnReq*[%d] failed"), MAX_CONNECTION);
26 return false;
27 }
28 m_pEventsArray = new WSAEVENT[MAX_CONNECTION];
29 if(NULL == m_pEventsArray)
30 {
31 CRITICAL(_SDT("CConnectCtrl::Init, new WSAEVENT[%d] failed"), MAX_CONNECTION);
32 return false;
33 }
34 m_pSockArray = new SOCKET[MAX_CONNECTION];
35 if(NULL == m_pSockArray)
36 {
37 CRITICAL(_SDT("CConnectCtrl::Init, new SOCKET[%d] failed"), MAX_CONNECTION);
38 return false;
39 }
40 for(i = 0; i < MAX_CONNECTION; i++)
41 {
42 m_pProcReqArray[i] = NULL;
43 m_pEventsArray[i] = WSA_INVALID_EVENT;
44 m_pSockArray[i] = INVALID_SOCKET;
45 }
46 m_dwSockCount = 0;
47 m_bTerminate = false;
48 UINT dwThreadID = 0;
49 m_hThread = (HANDLE)_beginthreadex( NULL, // Security
50 0, // Stack size - use default
51 ThreadFunc, // Thread fn entry point
52 (void*)this, // Param for thread
53 0, // Init flag
54 &dwThreadID); // Thread address
55 if(NULL == m_hThread)
56 {
57 CRITICAL(_SDT("CConnectCtrl::Init, _beginthreadex failed"));
58 return false;
59 }
60 return true;
61}
我们在CConnectCtrl::Init()处加个断点,然后重启一下程序,看下调用堆栈:
在CUCODENETWin::_InitComponent()中我们看到整个网络通信框架的初始化,初始化CConnDataMgr、CEventMgr、CConnectCtrl和CIocpCtrl。
1bool CUCODENetWin::_InitComponent()
2{
3 if (false == CConnDataMgr::Instance()->Init())
4 {
5 CRITICAL(_SDT("CUCODENetWin::_InitComponent, Init CConnDataMgr failed" ));
6 return false;
7 }
8 if(false == CEventMgr::Instance()->Init(MAX_NET_EVENT))
9 {
10 CRITICAL(_SDT("CUCODENetWin::_InitComponent, Init CEventMgr %d failed"), MAX_NET_EVENT);
11 return false;
12 }
13 if(false == CConnectCtrl::Instance()->Init())
14 {
15 CRITICAL(_SDT("CUCODENetWin::_InitComponent, Init CConnectCtrl failed"));
16 return false;
17 }
18 if(false == CIocpCtrl::Instance()->Init())
19 {
20 CRITICAL(_SDT("CUCODENetWin::_InitComponent, Init CIocpCtrl failed"));
21 return false;
22 }
23 return true;
24}
而所有的这些初始化,都是在所谓的CLogNetSessionMgr中初始化的:
我们最终追溯到最上层的代码中:
到这里,终于找到家了。
最后一批介绍的四个线程是完成端口线程,如下图所示:
精华部分全在其线程函数中:
1void CIocpCtrl::OnExecute()
2{
3 SPerHandleData* pstPerHandleData;
4 SPerIoData* pstPerIoData;
5 CCPSock* poSock;
6 CCpListener* poListener;
7 BOOL bRet;
8 DWORD dwByteTrabsferred;
9 while(true)
10 {
11 pstPerHandleData = NULL;
12 pstPerIoData = NULL;
13 dwByteTrabsferred = 0;
14 bRet = GetQueuedCompletionStatus(
15 m_hCompletionPort,
16 &dwByteTrabsferred,
17 (PULONG_PTR)&pstPerHandleData,
18 (LPOVERLAPPED*)&pstPerIoData,
19 INFINITE);
20 // 检查是否是线程退出
21 if(NULL == pstPerHandleData)
22 {
23 return;
24 }
25 //当有客户端请求创建连接时
26 if(pstPerHandleData->bListen)
27 {
28 // for listen event
29 poListener = (CCpListener*)pstPerHandleData->ptr;
30 if(NULL != poListener && NULL != pstPerIoData)
31 {
32 poListener->OnAccept(bRet, pstPerIoData);
33 //printf("Accpet Count:%d \n", InterlockedIncrement((LONG*)&m_acceptCount) );
34 }
35 else
36 {
37 SDASSERT(false);
38 }
39 }
40 else
41 {
42 //for non-listen event
43 poSock = (CCPSock*)pstPerHandleData->ptr;
44 if ( NULL == poSock )
45 {
46 continue;
47 }
48 if( FALSE == bRet || NULL == pstPerIoData )
49 {
50 if (::WSAGetLastError()!=ERROR_IO_PENDING)
51 {
52 INFO(_SDT("[%s:%d]CCPSock connID=%d error %d, close it"),
53 MSG_MARK, poSock->GetConnectionID(), ::WSAGetLastError());
54 poSock->OnClose();
55 }
56 }
57 else
58 {
59 switch(pstPerIoData->nOp)
60 {
61 case IOCP_RECV:
62 {
63 poSock->DecPostRecv();
64 if (dwByteTrabsferred > 0)
65 {
66 poSock->OnRecv(dwByteTrabsferred);
67 }
68 else
69 {
70 INFO(_SDT("[%s:%d]CCPSock connID=%d error %d, close it, socket :%d "),
71 MSG_MARK, poSock->GetConnectionID(), ::WSAGetLastError(), poSock->GetSock());
72 poSock->OnClose();
73 }
74 }
75 break;
76 case IOCP_SEND:
77 {
78 poSock->DecPostSend();
79 if (dwByteTrabsferred > 0)
80 {
81 poSock->OnSend(dwByteTrabsferred);
82 }
83 else
84 {
85 INFO(_SDT("[%s:%d]CCPSock connID=%d error %d, close it"),
86 MSG_MARK, poSock->GetConnectionID(), ::WSAGetLastError());
87 poSock->OnClose();
88 }
89 }
90 break;
91 case IOCP_CLOSE:
92 {
93 poSock->OnClose(false);
94 }
95 break;
96 default:
97 ;
98 }
99 }
100 }
101 }
102}
我始终觉得,完成端口模型即使不从事Windows开发的linux服务器开发人员应该也要掌握一下。尤其是linux服务器开发人员需要给客户端人员设计网络通信层的企业。
我们看下,这四个线程在哪里启动的。
同样的方法,我们通过搜索,先找到:
1unsigned CIocpCtrl::ThreadFunc(LPVOID pParam)
2{
3 CIocpCtrl* poCtrl = (CIocpCtrl*)pParam;
4 poCtrl->m_threadBufPool.CreateThreadBuffer();
5 poCtrl->OnExecute();
6 poCtrl->m_threadBufPool.ReleaseThreadBuffer();
7 return 0;
8}
进而进一步找到:
1bool CIocpCtrl::Init()
2{
3 //创建IO完成端口句柄
4 m_hCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
5 if (m_hCompletionPort == NULL)
6 {
7 CRITICAL(_SDT("CIocpCtrl::Init, CreateIoCompletionPort failed, Error %d \n"), ::WSAGetLastError());
8 return false;
9 }
10 //获取当前服务器的CPU核数
11 SYSTEM_INFO stSysInfo;
12 GetSystemInfo(&stSysInfo);
13 m_nNumberOfWorkers = stSysInfo.dwNumberOfProcessors * THREAD_PER_CPU;
14 if (g_nThreadNum > 0)
15 {
16 m_nNumberOfWorkers = g_nThreadNum;
17 }
18 m_WorkerArray = new HANDLE[m_nNumberOfWorkers];
19 for (INT32 i = 0; i < m_nNumberOfWorkers; i++)
20 {
21 m_WorkerArray[i] = INVALID_HANDLE_VALUE;
22 }
23 //创建m_nNumberOfWorkers个线程
24 UINT dwThreadID = 0;
25 for (INT32 j = 0; j < m_nNumberOfWorkers; j++)
26 {
27 m_WorkerArray[j] = (HANDLE)_beginthreadex( NULL, // Security
28 0, // Stack size - use default
29 ThreadFunc, // Thread fn entry point
30 (void*)this, // Param for thread
31 0, // Init flag
32 &dwThreadID); // Thread address
33 if (NULL == m_WorkerArray[j])
34 {
35 m_nNumberOfWorkers = j;
36 this->Uninit();
37 CRITICAL(_SDT("CIocpCtrl::Init, Create Worker thread failed, Close Handler\n"));
38 return false;
39 }
40 }
41 return true;
42}
然后同样的方法在CIocpCtrl::Init()处加个断点,重新跑下程序,得到如下调用堆栈:
我们上文中已经介绍过了,这里就不再重复说明:
通过分析,我们知道LogServer大致的技术框架,业务细节和技术细节,我们在后面的文章中会接着介绍。我们当前的目的是快速把所有的服务的技术框架给熟悉一遍。