系列目录 windows完成端口(一) windows完成端口(二) windows完成端口(三) windows完成端口(四) windows完成端口(五) windows完成端口(六)
#include "StdAfx.h"
#include "IOCPModel.h"
#include "MainDlg.h"
// 每一个处理器上产生多少个线程(为了最大限度的提升服务器性能,详见配套文档)
#define WORKER_THREADS_PER_PROCESSOR 2
// 同时投递的Accept请求的数量(这个要根据实际的情况灵活设置)
#define MAX_POST_ACCEPT 10
// 传递给Worker线程的退出信号
#define EXIT_CODE NULL
// 释放指针和句柄资源的宏
// 释放指针宏
#define RELEASE(x)
{if(x != NULL ){delete x;x=NULL;}}
// 释放句柄宏
#define RELEASE_HANDLE(x)
{if(x != NULL && x!=INVALID_HANDLE_VALUE){ CloseHandle(x);x = NULL;}}
// 释放Socket宏
#define RELEASE_SOCKET(x)
{if(x !=INVALID_SOCKET) { closesocket(x);x=INVALID_SOCKET;}}
CIOCPModel::CIOCPModel(void):
m_nThreads(0),
m_hShutdownEvent(NULL),
m_hIOCompletionPort(NULL),
m_phWorkerThreads(NULL),
m_strIP(DEFAULT_IP),
m_nPort(DEFAULT_PORT),
m_pMain(NULL),
m_lpfnAcceptEx( NULL ),
m_pListenContext( NULL )
{
}
CIOCPModel::~CIOCPModel(void)
{
// 确保资源彻底释放
this->Stop();
}
///////////////////////////////////////////////////////////////////
// 工作者线程: 为IOCP请求服务的工作者线程
// 也就是每当完成端口上出现了完成数据包,就将之取出来进行处理的线程
///////////////////////////////////////////////////////////////////
DWORD WINAPI CIOCPModel::_WorkerThread(LPVOID lpParam)
{
THREADPARAMS_WORKER* pParam = (THREADPARAMS_WORKER*)lpParam;
CIOCPModel* pIOCPModel = (CIOCPModel*)pParam->pIOCPModel;
int nThreadNo = (int)pParam->nThreadNo;
pIOCPModel->_ShowMessage(_T("工作者线程启动,ID: %d."),nThreadNo);
OVERLAPPED *pOverlapped = NULL;
PER_SOCKET_CONTEXT *pSocketContext = NULL;
DWORD dwBytesTransfered = 0;
// 循环处理请求,知道接收到Shutdown信息为止
while (WAIT_OBJECT_0 != WaitForSingleObject(pIOCPModel->m_hShutdownEvent, 0))
{
BOOL bReturn = GetQueuedCompletionStatus(
pIOCPModel->m_hIOCompletionPort,
&dwBytesTransfered,
(PULONG_PTR)&pSocketContext,
&pOverlapped,
INFINITE);
// 如果收到的是退出标志,则直接退出
if ( EXIT_CODE==(DWORD)pSocketContext )
{
break;
}
// 判断是否出现了错误
if( !bReturn )
{
DWORD dwErr = GetLastError();
// 显示一下提示信息
if( !pIOCPModel->HandleError( pSocketContext,dwErr ) )
{
break;
}
continue;
}
else
{
// 读取传入的参数
PER_IO_CONTEXT* pIoContext = CONTAINING_RECORD(pOverlapped,
PER_IO_CONTEXT,
m_Overlapped);
// 判断是否有客户端断开了
if((0 == dwBytesTransfered) && ( RECV_POSTED==pIoContext->m_OpType ||
SEND_POSTED==pIoContext->m_OpType))
{
pIOCPModel->_ShowMessage( _T("客户端 %s:%d 断开连接."),
inet_ntoa(pSocketContext->m_ClientAddr.sin_addr),
ntohs(pSocketContext->m_ClientAddr.sin_port) );
// 释放掉对应的资源
pIOCPModel->_RemoveContext( pSocketContext );
continue;
}
else
{
switch( pIoContext->m_OpType )
{
// Accept
case ACCEPT_POSTED:
{
// 为了增加代码可读性,这里用专门的_DoAccept函数进行处理连入请求
pIOCPModel->_DoAccpet( pSocketContext, pIoContext );
}
break;
// RECV
case RECV_POSTED:
{
// 为了增加代码可读性,这里用专门的_DoRecv函数进行处理接收请求
pIOCPModel->_DoRecv( pSocketContext,pIoContext );
}
break;
// SEND
// 这里略过不写了,要不代码太多了,不容易理解,Send操作相对来讲简单一些
case SEND_POSTED:
{
}
break;
default:
// 不应该执行到这里
TRACE(_T("_WorkThread中的 pIoContext->m_OpType 参数异常.\n"));
break;
} //switch
}//if
}//if
}//while
TRACE(_T("工作者线程 %d 号退出.\n"),nThreadNo);
// 释放线程参数
RELEASE(lpParam);
return 0;
}
//====================================================================================
//
// 系统初始化和终止
//
//====================================================================================
////////////////////////////////////////////////////////////////////
// 初始化WinSock 2.2
bool CIOCPModel::LoadSocketLib()
{
WSADATA wsaData;
int nResult;
nResult = WSAStartup(MAKEWORD(2,2), &wsaData);
// 错误(一般都不可能出现)
if (NO_ERROR != nResult)
{
this->_ShowMessage(_T("初始化WinSock 2.2失败!\n"));
return false;
}
return true;
}
//////////////////////////////////////////////////////////////////
// 启动服务器
bool CIOCPModel::Start()
{
// 初始化线程互斥量
InitializeCriticalSection(&m_csContextList);
// 建立系统退出的事件通知
m_hShutdownEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
// 初始化IOCP
if (false == _InitializeIOCP())
{
this->_ShowMessage(_T("初始化IOCP失败!\n"));
return false;
}
else
{
this->_ShowMessage(_T("\nIOCP初始化完毕\n."));
}
// 初始化Socket
if( false==_InitializeListenSocket() )
{
this->_ShowMessage(_T("Listen Socket初始化失败!\n"));
this->_DeInitialize();
return false;
}
else
{
this->_ShowMessage(_T("Listen Socket初始化完毕."));
}
this->_ShowMessage(_T("系统准备就绪,等候连接....\n"));
return true;
}
////////////////////////////////////////////////////////////////////
// 开始发送系统退出消息,退出完成端口和线程资源
void CIOCPModel::Stop()
{
if( m_pListenContext!=NULL && m_pListenContext->m_Socket!=INVALID_SOCKET )
{
// 激活关闭消息通知
SetEvent(m_hShutdownEvent);
for (int i = 0; i < m_nThreads; i++)
{
// 通知所有的完成端口操作退出
PostQueuedCompletionStatus(m_hIOCompletionPort,
0, (DWORD)EXIT_CODE,
NULL);
}
// 等待所有的客户端资源退出
WaitForMultipleObjects(m_nThreads,
m_phWorkerThreads,
TRUE, INFINITE);
// 清除客户端列表信息
this->_ClearContextList();
// 释放其他资源
this->_DeInitialize();
this->_ShowMessage(_T("停止监听\n"));
}
}
////////////////////////////////
// 初始化完成端口
bool CIOCPModel::_InitializeIOCP()
{
// 建立第一个完成端口
m_hIOCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE,
NULL, 0, 0 );
if ( NULL == m_hIOCompletionPort)
{
this->_ShowMessage(_T("建立完成端口失败!错误代码: %d!\n"),
WSAGetLastError());
return false;
}
// 根据本机中的处理器数量,建立对应的线程数
m_nThreads = WORKER_THREADS_PER_PROCESSOR * _GetNoOfProcessors();
// 为工作者线程初始化句柄
m_phWorkerThreads = new HANDLE[m_nThreads];
// 根据计算出来的数量建立工作者线程
DWORD nThreadID;
for (int i = 0; i < m_nThreads; i++)
{
THREADPARAMS_WORKER* pThreadParams = new THREADPARAMS_WORKER;
pThreadParams->pIOCPModel = this;
pThreadParams->nThreadNo = i+1;
m_phWorkerThreads[i] = ::CreateThread(0, 0, _WorkerThread,
(void *)pThreadParams,
0, &nThreadID);
}
TRACE(" 建立 _WorkerThread %d 个.\n", m_nThreads );
return true;
}
/////////////////////////////////////////////////////////////////
// 初始化Socket
bool CIOCPModel::_InitializeListenSocket()
{
// AcceptEx 和 GetAcceptExSockaddrs 的GUID,用于导出函数指针
GUID GuidAcceptEx = WSAID_ACCEPTEX;
GUID GuidGetAcceptExSockAddrs = WSAID_GETACCEPTEXSOCKADDRS;
// 服务器地址信息,用于绑定Socket
struct sockaddr_in ServerAddress;
// 生成用于监听的Socket的信息
m_pListenContext = new PER_SOCKET_CONTEXT;
// 需要使用重叠IO,必须得使用WSASocket来建立Socket,才可以支持重叠IO操作
m_pListenContext->m_Socket = WSASocket(AF_INET, SOCK_STREAM,
0, NULL, 0,
WSA_FLAG_OVERLAPPED);
if (INVALID_SOCKET == m_pListenContext->m_Socket)
{
this->_ShowMessage(_T("初始化Socket失败,错误代码: %d.\n"),
WSAGetLastError());
return false;
}
else
{
TRACE(_T("WSASocket() 完成.\n"));
}
// 将Listen Socket绑定至完成端口中
if( NULL== CreateIoCompletionPort( (HANDLE)m_pListenContext->m_Socket,
m_hIOCompletionPort,
(DWORD)m_pListenContext, 0))
{
this->_ShowMessage(_T("绑定 Listen Socket至完成端口失败!错误代码: %d/n"),
WSAGetLastError());
RELEASE_SOCKET( m_pListenContext->m_Socket );
return false;
}
else
{
TRACE(_T("Listen Socket绑定完成端口 完成.\n"));
}
// 填充地址信息
ZeroMemory((char *)&ServerAddress, sizeof(ServerAddress));
ServerAddress.sin_family = AF_INET;
// 这里可以绑定任何可用的IP地址,或者绑定一个指定的IP地址
ServerAddress.sin_addr.s_addr = htonl(INADDR_ANY);
//ServerAddress.sin_addr.s_addr = inet_addr(CStringA(m_strIP).GetString());
ServerAddress.sin_port = htons(m_nPort);
// 绑定地址和端口
if (SOCKET_ERROR == bind(m_pListenContext->m_Socket,
(struct sockaddr *) &ServerAddress,
sizeof(ServerAddress)))
{
this->_ShowMessage(_T("bind()函数执行错误.\n"));
return false;
}
else
{
TRACE(_T("bind() 完成.\n"));
}
// 开始进行监听
if (SOCKET_ERROR == listen(m_pListenContext->m_Socket,SOMAXCONN))
{
this->_ShowMessage(_T("Listen()函数执行出现错误.\n"));
return false;
}
else
{
TRACE(_T("Listen() 完成.\n"));
}
// 使用AcceptEx函数,因为这个是属于WinSock2规范之外的微软另外提供的扩展函数
// 所以需要额外获取一下函数的指针,
// 获取AcceptEx函数指针
DWORD dwBytes = 0;
if(SOCKET_ERROR == WSAIoctl(
m_pListenContext->m_Socket,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidAcceptEx,
sizeof(GuidAcceptEx),
&m_lpfnAcceptEx,
sizeof(m_lpfnAcceptEx),
&dwBytes,
NULL,
NULL))
{
this->_ShowMessage(_T("WSAIoctl 未能获取AcceptEx函数指针。错误代码: %d\n"),
WSAGetLastError());
this->_DeInitialize();
return false;
}
// 获取GetAcceptExSockAddrs函数指针,也是同理
if(SOCKET_ERROR == WSAIoctl(
m_pListenContext->m_Socket,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidGetAcceptExSockAddrs,
sizeof(GuidGetAcceptExSockAddrs),
&m_lpfnGetAcceptExSockAddrs,
sizeof(m_lpfnGetAcceptExSockAddrs),
&dwBytes,
NULL,
NULL))
{
this->_ShowMessage(_T("WSAIoctl 未能获取GuidGetAcceptExSockAddrs函数指针。错误代码: %d\n"),
WSAGetLastError());
this->_DeInitialize();
return false;
}
// 为AcceptEx 准备参数,然后投递AcceptEx I/O请求
for( int i=0;i<MAX_POST_ACCEPT;i++ )
{
// 新建一个IO_CONTEXT
PER_IO_CONTEXT* pAcceptIoContext = m_pListenContext->GetNewIoContext();
if( false==this->_PostAccept( pAcceptIoContext ) )
{
m_pListenContext->RemoveContext(pAcceptIoContext);
return false;
}
}
this->_ShowMessage( _T("投递 %d 个AcceptEx请求完毕"),MAX_POST_ACCEPT );
return true;
}
////////////////////////////////////////////////////////////
由于公众号文章字数有限,您可以接着阅读下一篇:《windows完成端口(六)》 系列目录 windows完成端口(一) windows完成端口(二) windows完成端口(三) windows完成端口(四) windows完成端口(五) windows完成端口(六)