客户IO处理,是在工作线程,_WorkerThreadProc中完成的
函数,在完成端口上调用GetQueuedCompletionStatus函数等待IO完成,并调用自定义函数HandleIO来处理IO,具体代码如下:
DOWRD WINAPI CIOCPServer::_WorkerThreadProc(LPVOID lpParam)
{
#ifdef _DEBUG
::OutputDebugString("Worker Thread startup...\n");
#endif //_DEBUG
CIOCPServer *pThis = (CIOCPServer*)lpParam;
CIOCPBuffer *pBuffer;
DWORD dwKey;
DWORD dwTrans;
LPOVERLAPPED lpol;
while(TRUE)
{
//在关联到此完成端口的所有套接字上等待IO完成
BOOL bOK = ::GetQueuedCompletionStatus(pThis->m_hCompletion,&dwTrans,(LPDWORD)&dwKey,(LPOVERLAPPED)&lpol,WSA_INFINITE);
if(dwTrans == -1)
{
#ifdef _DEBUG
::OutputDebugString("Worker Thread startup...\n");
#endif //_DEBUG
::ExitThread(0);
}
pBuffer=CONTAINING_RECORD(lpol,CIOCPBuffer,ol);
int nError = NO_ERROR;
if(!bOK)
{
SOCKET s;
if(pBuffer->nOperation == OP_ACCEPT)
{
s=pThis->m_sListen;
}
else
{
if(dwKey == 0)
break;
s=((CIOCPContext*)dwKey)->s;
}
DWORD dwFlags = 0;
if(!::WSAGetOverlappedResult(s,&pBuffer->ol,&dwTrans,FALSE,&dwFlags))
{
nError = ::WSAGetLastError();
}
}
pThis->HandleIO(dwKey,pBuffer,dwTrans,nError);
}
#ifdef _DEBUG
::OutputDebugString("Worker Thread out...\n");
#endif //_DEBUG
return 0;
}
SendText成员函数用于在连接上发送数据,执行时先申请一个缓冲区对象,把用户将要发送的数据复制到里面,然后调用postSend成员函数投递这个缓冲区对象
BOOL CIOCPServer::SendText(CIOCPContext *pContext,char *pszText,int nLen)
{
CIOCPBuffer *pBuffer = AllocateBuffer(nLen);
if(pBuffer != NULL)
{
memcpy(pBuffer->buff,pszText,nLen);
return PostSend(pContext,pBuffer);
}
return FALSE;
}
下面的HandleIO函数是关键,
处理完成的IO,投递新的IO请求,释放完成的缓冲区对象,关闭客户上下文对象
下面是主要的实现代码:
void CIOCPServer::HandleIO(DWORD dwKey,CIOCPBuffer *pBuffer,DOWRD dwTrans,int nError)
{
CIOCPContext *pContext = (CIOCPContext*)dwKey;
#ifdef _DEBUG
::OutputDebugString("HandleIO startup..\n");
#endif //_DEBUG
//减少套接字未决IO计数
if(pContext!=NULL)
{
::EnterCriticalSection(&pContext->Lock);
if(pBuffer->nOperation == OP_READ)
pContext->nOutstandingRecv--;
else if(pBuffer->nOperation == OP_WRITE)
pContext->nOutstandingSend--;
::LeaveCriticalSection(&pContext->Lock);
//检查套接字是否已经打开
if(pContext->bClosing)
{
#ifdef _DEBUG
::OutputDebugString("HandleIO startup..\n");
#endif //_DEBUG
if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
{
ReleaseContext(pContext);
}
//释放已关闭套接字的未决IO
ReleaseBuffer(pBuffer);
return;
}
}
else
{
RemovePendingAccept(pBuffer);
}
//检查套接字上发生的错误,然后直接关闭套接字
if(nError!=NO_ERROR)
{
if(pBuffer->nOperation != OP_ACCEPT)
{
OnConnectionError(pContext,pBuffer,nError);
CloseAConnection(pContext);
if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
{
ReleaseContext(pContext);
}
#ifdef _DEBUG
::OutputDebugString("HandleIO startup..\n");
#endif //_DEBUG
}
else//在监听套接字上发生错误
{
if(pBuffer->sClient != INVALID_SOCKET)
{
::closesocket(pBuffer->sClient);
pBuffer->sClient = INVALID_SOCKET;
}
#ifdef _DEBUG
::OutputDebugString("HandleIO startup..\n");
#endif //_DEBUG
}
ReleaseBuffer(pBuffer);
return;
}
//开始处理
if(pBuffer->nOperation == OP_ACCEPT)
{
if(dwTrans == 0)
{
#ifdef _DEBUG
::OutputDebugString("HandleIO startup..\n");
#endif //_DEBUG
if(pBuffer->sClient != INVALID_SOCKET)
{
::closesocket(pBuffer->sClient);
pBuffer->sClient = INVALID_SOCKET;
}
}
else
{
//为接收新连接的申请客户上下文对象
CIOCPContext *pClient = AllocateContext(pBuffer->sClient);
if(pClient != NULL)
{
if(AddAConnection(pCliebt))
{
//取得用户地址
int nLocalLen,nRmoteLen;
m_lpfnGetAcceptExSockaddrs(
pBuffer->buff,
pBuffer->nLen-((sizeof(sockaddr_in)+16)*2),
sizeof(sockaddr_in)+16,
sizeof(sockaddr_in)+16,
(SOCKADDR **)&pLocalAddr,
&nLocalLen,
(SOCKADDR **)&pRemoteAddr,
&nRmoteLen);
memcpy(&pClient->addrLocal,pLocalAddr,nLocalLen);
memcpy(&pClient->addrRemote,pRemoteAddr,nRmoteLen);
//关联新连接到完成端口对象
::CreateIoCompletionPort((HANDLE)pClient->s,m_hCompletion,(DWORD)pClient,0);
//通知用户
pBuffer->nLen = dwTrans;
OnConnectionEstablished(pClient,pBuffer);
//向新连接投递Read请求
for(int i=0;i<5;i++)
{
CIOCPBuffer *p = AllocateBuffer(BUFFER_SIZE);
if(p != NULL)
{
if(!PostRecv(pClient,p))
{
CloseAConnection(pClient);
break;
}
}
}
}
else
{
CloseAConnection(pClient);
ReleaseContext(pClient);
}
}
else
{
//资源不足,关闭与客户的连接即可
::closesocket(pBuffer->sClient);
pBuffer->sClient = INVALID_SOCKET;
}
}
//Accept请求完成,释放IO缓冲区
ReleaseBuffer(pBuffer);
//通知监听线程继续再投递一个Accept请求
::InterlockedDecrement(&m_nRepostCount);
::SetEvent(m_hRepostEvent);
}
else if(pBuffer->nOperation == OP_READ)
{
if(dwTrans == 0)
{
//先通知用户
pBuffer->nLen = 0;
OnConnectionClosing(pContext,pBuffer);
//再关闭连接
CloseAConnection(pContext);
//释放客户上下文和缓冲区对象
if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
{
ReleaseContext(pContext);
}
ReleaseBuffer(pBuffer);
}
else
{
pBuffer->nLen = dwTrans;
//按照IO投递的顺序读取接收到的数据
CIOCPBuffer *p = GetNextReadBuffer(pContext,pBuffer);
while(p!=NULL)
{
OnReadCompleted(pContext,p);
//增加要读的序列号的值
::InterlockedDecrement((LONG*)pContext->nCurrentReadSequence);
//释放IO
ReleaseBuffer(p);
p = GetNextReadBuffer(pContext,NULL);
}
//继续投递一个新的请求
pBuffer = AllocateBuffer(BUFFER_SIZE);
if(pBuffer==NULL || !PostRecv(pContext,pBuffer))
{
CloseAConnection(pContext);
}
}
}
else if(pBuffer->nOperation == OP_WRITE)
{
if(dwTrans == 0)
{
//先通知用户
pBuffer->nLen = 0;
OnConnectionClosing(pContext,pBuffer);
//再关闭连接
CloseAConnection(pContext);
//释放客户上下文和缓冲区对象
if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
{
ReleaseContext(pContext);
}
ReleaseBuffer(pBuffer);
}
else
{
//写操作完成,通知用户
pBuffer->nLen = dwTrans;
OnWriteCompleted(pContext,pBuffer);
//释放SendText函数申请缓冲区
ReleaseBuffer(pBuffer);
}
}
}