系列目录 windows完成端口(一) windows完成端口(二) windows完成端口(三) windows完成端口(四) windows完成端口(五) windows完成端口(六)
本人很多年前接触完成端口以来,期间学习和练习了很多次,本以为自己真正地理解了其原理,最近在看网狐的服务器端源码时又再一次拾起完成端口的知识,结果发现以前理解的其实很多偏差,有些理解的甚至都是错误的。网络上关于windows完成端口的介绍举不胜举,但大多数都是介绍怎么做,而不是为告诉读者为什么这么做。看了很多遍小猪的讲解:http://blog.csdn.net/piggyxp/article/details/6922277,终于有些顿悟。为自己也为别人,在这里做个备忘。 这篇文章将从为什么这么做的角度来解释完成端口的一些重难点。
使用完成端口一般按以下步骤(这里以网络服务器接受客户端连接并与客户端进行网络通信为例):
//步骤1:创建完成端口
//步骤2:创建侦听socket并将侦听socket绑定到完成端口上
//步骤3:设置侦听
步骤1代码:
m_hIOCompletionPort =
CreateIoCompletionPort(INVALID_HANDLE_VALUE,
NULL, 0, 0 );
步骤2代码:
//创建侦听socket
m_pListenContext->m_Socket = WSASocket(AF_INET,
SOCK_STREAM, 0,
NULL, 0, WSA_FLAG_OVERLAPPED);
// 将Listen Socket绑定至完成端口中
if( NULL== CreateIoCompletionPort( (HANDLE)m_pListenContext->m_Socket,
m_hIOCompletionPort,(DWORD)m_pListenContext, 0))
{
return false;
}
return true;
注意,必须使用WSASocket函数,并设置标识位WSA_FLAG_OVERLAPPED。
步骤3代码:
// 服务器地址信息,用于绑定Socket
struct sockaddr_in ServerAddress;
// 填充地址信息
ZeroMemory((char *)&ServerAddress,
sizeof(ServerAddress));
ServerAddress.sin_family = AF_INET;
// 这里可以绑定任何可用的IP地址,或者绑定一个指定的IP地址
ServerAddress.sin_addr.s_addr = htonl(INADDR_ANY);
/*ServerAddress.sin_addr.s_addr =
inet_addr(CStringA(m_strIP).GetString());*/
ServerAddress.sin_port = htons(m_nPort);
// 绑定地址和端口
if (SOCKET_ERROR == bind(m_pListenContext->m_Socket,
(struct sockaddr *) &ServerAddress,
sizeof(ServerAddress)))
return false;
// 开始进行监听
if (SOCKET_ERROR == listen(m_pListenContext->m_Socket,
SOMAXCONN))
return false;
return true;
1
以上步骤都是完成端口约定俗成的套路,现在接下来的问题是如何接受客户端连接?
不管是使用select还是epoll这里模型无非都是检测到侦听socket可读,然后在调用accept函数接受连接,这样存在一个问题,就是侦听socket只有一个,所以调用accept函数接受连接的逻辑也只能有一个(一般不会在多线程里面对同一个socket进行同一种操作)。但是如果是这样的话,如果同一时间有大量的连接来了,可能就要逐个接受连接了,相当于一群人排队进入一个门里面,那有没有更好的方法呢?有,windows提供了一个AcceptEx函数,在创建完侦听函数之后,调用这个函数,那么将来在完成端口的工作线程里面如果有接受新连接动作,则无需调用accept或者AcceptEx,操作系统自动帮你接受新连接,等在工作线程里面得到通知的时候,连接已经建立,而且新的客户端socket也已经创建好。注意:这是完成端口的另外一个优势,如果使用accept,不仅需要使用accept接受新连接,同时需要在连接现场建立一个socket,而使用AcceptEx,这两个步骤都不需要了。AcceptEx函数签名如下:
BOOL AcceptEx(
_In_ SOCKET sListenSocket,
_In_ SOCKET sAcceptSocket,
_In_ PVOID lpOutputBuffer,
_In_ DWORD dwReceiveDataLength,
_In_ DWORD dwLocalAddressLength,
_In_ DWORD dwRemoteAddressLength,
_Out_ LPDWORD lpdwBytesReceived,
_In_ LPOVERLAPPED lpOverlapped
);
注意看第二个参数sAcceptSocket,这个socket我们在初始化的时候需要准备好,将来新连接成功以后,可以直接使用这个socket表示客户端连接。但是你可能又会问,我初始化阶段需要准备多少个这样的socket呢?毕竟不可能多个连接使用同一个sAcceptSocket。的确如此,所以一般初始化的时候准备一批客户端socket,等工作线程有新连接成功后,表明开始准备的某个客户端socket已经被使用了,这个时候我们可以继续补充一个。相当于,我们预先准备五个容器,在使用过程中每次使用一个,我们就立刻补充一个。当然,这个AcceptEx这个函数不仅准备了接受连接操作,同时也准备了连接的两端的地址缓冲区和对端发来的第一组数据缓冲区,将来有新连接成功以后,操作系统通知我们的时候,操作系统不仅帮我门接收好了连接,还将连接两端的地址和对端发过来的第一组数据填到我们指定的缓冲区了。当然msdn上说使用这个函数最好不要直接使用,而是通过相应API获取该函数的指针,再调用之(https://msdn.microsoft.com/en-us/library/windows/desktop/ms737524(v=vs.85).aspx): Note The function pointer for the AcceptEx function must be obtained at run time by making a call to the WSAIoctl function with the SIO_GET_EXTENSION_FUNCTION_POINTER opcode specified. The input buffer passed to the WSAIoctl function must contain WSAID_ACCEPTEX, a globally unique identifier (GUID) whose value identifies the AcceptEx extension function. On success, the output returned by the WSAIoctl function contains a pointer to the AcceptEx function. The WSAID_ACCEPTEX GUID is defined in the Mswsock.h header file.
代码应该写成这样:
// 使用AcceptEx函数,
//因为这个是属于WinSock2规范之外的微软另外提供的扩展函数
// 所以需要额外获取一下函数的指针,
// 获取AcceptEx函数指针
DWORD dwBytes = 0;
if(SOCKET_ERROR == WSAIoctl(
m_pListenContext->m_Socket,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidAcceptEx,
sizeof(GuidAcceptEx),
&m_lpfnAcceptEx,
sizeof(m_lpfnAcceptEx),
&dwBytes,
NULL,
NULL))
{
this->_ShowMessage(_T("WSAIoctl 未能获取AcceptEx函数指针。
错误代码: %d\n"),WSAGetLastError());
return false;
}
当然,WSAIoctl函数第一个参数只要填写任意一个有效的socket就可以了。
NO1. 写过网络通信程序的人都知道,尤其是服务器端程序,我们不能直接调用send和recv这类函数进行数据收发,因为当tcp窗口太小时,数据发不出去,send会阻塞线程,同理,如果当前网络缓冲区没有数据,调用recv也会阻塞线程。这是入门级的做法。
NO2. 既然上述做法不好,那我就换成主动检测数据是否可以收发,当数据可以收发的时候,再调用send或者recv函数进行收发。这就是常用的IO复用函数的用途,如select函数、linux下的poll函数。这是中级做法。
NO3. 使用IO复用技术主动检测数据是否可读可写,也存在问题。如果检测到了数据可读或可写,那这种检测就是值得的;但是反之检测不到呢?那也是白白地浪费时间的。如果有一种方法,我不需要主动去检测,我只需要预先做一个部署,当有数据可读或者可写时,操作系统能通知我就好了,而不是每次都是我自己去主动检测。有,这就是linux下的epoll模型和windows下的WSAAsyncSelect和完成端口模型。这是高级做法。
NO4. 但是无论是epoll模型还是WSAAsyncSelect模型,虽然操作系统会告诉我们什么时候数据可读或者可写,但是当数据可读或者可写时,还是需要我们自己去调用send或者recv函数做实际的收发数据工作。那有没有一种模型,不仅能通知我们数据可读和可写,甚至当数据可读或者可写时,连数据的收发工作也帮我们做好了?有,这就是windows的完成端口模型。
这就是标题所说的完成端口将IO操作从手动变为自动,完成端口将数据的可读与可写检测操作和收发数据操作这两项工作改为操作系统代劳,等系统完成之后会通知我们的,而我们只需要在这之前做一些相应的部署(初始化工作)就可以了。 那么需要做那些初始化工作呢?这里我们以收发网络数据为例。 对于收数据,我们只需要准备好,存放数据的缓冲区就可以了:
// 初始化变量
DWORD dwFlags = 0;
DWORD dwBytes = 0;
WSABUF *p_wbuf = &pIoContext->m_wsaBuf;
OVERLAPPED *p_ol = &pIoContext->m_Overlapped;
pIoContext->ResetBuffer();
pIoContext->m_OpType = RECV_POSTED;
// 初始化完成后,,投递WSARecv请求
int nBytesRecv = WSARecv( pIoContext->m_sockAccept,
p_wbuf, 1, &dwBytes, &dwFlags,
p_ol, NULL );
// 如果返回值错误,并且错误的代码并非是Pending的话,
//那就说明这个重叠请求失败了
if ((SOCKET_ERROR == nBytesRecv) &&
(WSA_IO_PENDING != WSAGetLastError()))
{
this->_ShowMessage(_T("投递第一个WSARecv失败!"));
return false;
}
WSARecv函数会立刻返回,不会阻塞,如果返回时数据已经收成功了,那我们准备的缓冲区m_wsaBuf中存放的就是我们收到的数据;否则WASRecv会返回-1(SOCKET_ERROR),此时错误码如果是WSA_IO_PENDING表示收数据暂且还没完成,这样你需要等待后续通知。所以从某种意义上来说WSARecv函数并不是收取数据,而更像是安排让操作系统收数据的设置。
同理,对于发数据,我们也只要准备好需要发送的数据即可:
// 初始化变量
DWORD dwFlags = 0;
DWORD dwBytes = 0;
WSABUF *p_wbuf = &pIoContext->m_wsaBuf;
OVERLAPPED *p_ol = &pIoContext->m_Overlapped;
pIoContext->ResetBuffer();
pIoContext->m_OpType = SEND_POSTED;
// 初始化完成后,,投递WSARecv请求
int nBytesSend = WSASend pIoContext->m_sockAccept,
p_wbuf, 1, &dwBytes, &dwFlags,
p_ol, NULL );
// 如果返回值错误,并且错误的代码并非是Pending的话,
//那就说明这个重叠请求失败了
if ((SOCKET_ERROR == nBytesSend) &&
(WSA_IO_PENDING != WSAGetLastError()))
{
this->_ShowMessage(_T("发送数据失败!"));
return false;
}
发数据的代码基本上和收数据一模一样。
2
上面介绍了一些不成体系的代码片段,那么我们应该怎么把上面介绍的代码组织成一个整体呢?完成端口模型,需要初始化步骤中还需要建立一些工作线程,这些工作线程就是用来处理各种操作系统的通知的,比如有新客户端连接成功了、数据收好了、数据发送好了等等。创建工作线程以及准备新连接到来时需要的一些容器的代码(上文介绍过了,如一些acceptSocket、两端地址缓冲区、第一份收到的数据缓冲区): 创建工作线程:
DWORD nThreadID;
for (int i = 0; i < m_nThreads; i++)
{
THREADPARAMS_WORKER* pThreadParams = new THREADPARAMS_WORKER;
pThreadParams->nThreadNo = i+1;
m_phWorkerThreads[i] = ::CreateThread(0, 0,
_WorkerThread,
(void *)pThreadParams,
0, &nThreadID);
}
调用AcceptEx为将来接受新连接准备:
// 为AcceptEx 准备参数,然后投递AcceptEx I/O请求
for( int i=0;i<MAX_POST_ACCEPT;i++ )
{
// 新建一个IO_CONTEXT
PER_IO_CONTEXT* pAcceptIoContext = m_pListenContext->GetNewIoContext();
if( false==this->_PostAccept( pAcceptIoContext ) )
{
m_pListenContext->RemoveContext(pAcceptIoContext);
return false;
}
}
//////////////////////////////////////////////////////////////////
// 投递Accept请求
bool CIOCPModel::_PostAccept( PER_IO_CONTEXT* pAcceptIoContext )
{
ASSERT( INVALID_SOCKET!=m_pListenContext->m_Socket );
// 准备参数
DWORD dwBytes = 0;
pAcceptIoContext->m_OpType = ACCEPT_POSTED;
WSABUF *p_wbuf = &pAcceptIoContext->m_wsaBuf;
OVERLAPPED *p_ol = &pAcceptIoContext->m_Overlapped;
// 为以后新连入的客户端先准备好Socket( 这个是与传统accept最大的区别 )
pAcceptIoContext->m_sockAccept = WSASocket(AF_INET,
SOCK_STREAM,
IPPROTO_TCP,
NULL, 0, WSA_FLAG_OVERLAPPED);
if( INVALID_SOCKET==pAcceptIoContext->m_sockAccept )
{
_ShowMessage(_T("创建用于Accept的Socket失败!错误代码: %d"),
WSAGetLastError());
return false;
}
// 投递AcceptEx
if(FALSE == m_lpfnAcceptEx( m_pListenContext->m_Socket,
pAcceptIoContext->m_sockAccept,
p_wbuf->buf,
p_wbuf->len - ((sizeof(SOCKADDR_IN)+16)*2),
sizeof(SOCKADDR_IN)+16,
sizeof(SOCKADDR_IN)+16,
&dwBytes, p_ol))
{
if(WSA_IO_PENDING != WSAGetLastError())
{
_ShowMessage(_T("投递 AcceptEx 请求失败,错误代码: %d"),
WSAGetLastError());
return false;
}
}
return true;
}
这里我开始准备了MAX_POST_ACCEPT=10个socket。
而工作线程的线程函数应该看起来是这个样子:
DWORD ThreadFunction()
{
//使用GetQueuedCompletionStatus函数检测事件类型
if (事件类型 == 有新客户端连成功)
{
//做一些操作1,比如显示一个新连接信息
}
else if (事件类型 == 收到了一份数据)
{
//做一些操作2,比如解析数据
}
else if (事件类型 == 数据发送成功了)
{
//做一些操作3,比如显示一条数据发送成功信息
}
}
在没有事件发生时,函数GetQueuedCompletionStatus()会让工作线程挂起,不然不会占用cpu时间片。
但是不知道你有没有发现线程函数存在以下问题:
HANDLE WINAPI CreateIoCompletionPort(
_In_ HANDLE FileHandle,
_In_opt_ HANDLE ExistingCompletionPort,
_In_ ULONG_PTR CompletionKey,
_In_ DWORD NumberOfConcurrentThreads
);
GetQueuedCompletionStatus函数签名如下:
BOOL WINAPI GetQueuedCompletionStatus(
_In_ HANDLE CompletionPort,
_Out_ LPDWORD lpNumberOfBytes,
_Out_ PULONG_PTR lpCompletionKey,
_Out_ LPOVERLAPPED *lpOverlapped,
_In_ DWORD dwMilliseconds
);
看到没有,GetQueuedCompletionStatus正好也有一个参数叫CompletionPort,而且还是一个输出参数。没错!这两个其实就是同一个指针。这样如果我在绑定socket到完成端口句柄时使用一块内存的指针作为CompletionKey的值,该内存含有该socket的信息,这样我在工作线程中收到事件通知时就能取出这个CompletionKey来得到这个socket句柄了,这样我就知道到底是哪个socket上的事件了。伪码如下:
struct SOME_STRUCT
{
SOCKET s;
//可以再定义一些其它信息一起携带
};
//对于侦听socket
SOME_STRUCT someStruct1;
someStruct1.s = ListenSocket;
CreateIoCompletionPort( ListenSocket, m_hIOCompletionPort,(DWORD)&someStruct1, 0);
//对于普通客户端连接socket
SOME_STRUCT someStruct2;
someStruct2.s = acceptSocket;
CreateIoCompletionPort( acceptSocket, m_hIOCompletionPort,(DWORD)&someStruct2, 0);
其实这个SOME_STRUCT因为是每一个socket有一份,所以它有个名字叫“Per Socket Data”。
线程函数里面就应该写成这个样子:
DWORD ThreadFunction()
{
OVERLAPPED *pOverlapped = NULL;
PER_SOCKET_CONTEXT *pSocketContext = NULL;
DWORD dwBytesTransfered = 0;
BOOL bReturn = GetQueuedCompletionStatus(m_hIOCompletionPort,
&dwBytesTransfered,
(PULONG_PTR)&pSocketContext,
&pOverlapped, INFINITE);
if (((SOME_STRUCT*)pSocketContext)->s == 侦听socket句柄)
{
//新连接接收成功,做一些操作
}
//普通客户端socket收发数据
else
{
if (事件类型 == 收到了一份数据)
{
//做一些操作2,比如解析数据
}
else if (事件类型 == 数据发送成功了)
{
//做一些操作3,比如显示一条数据发送成功信息
}
}
}
3
现在另外一个问题就是,如何判断是数据发送成功还是收到了数据?前面已经说过,对于每一次的收发数据,都需要调用WSASend或WSARecv函数进行准备,而这两个函数需要一个OVERLAPPED结构体,反正传得是这个结构体的指针,我们可以根据指针对象的伸缩特性,在这个OVERLAPPED结构体后面再增加一些字段来标识我们是收数据动作还是发数据动作。而这个扩展的OVERLAPPED结构体,因为是针对每一次IO操作的,所以叫“Per IO Data”。因此这个数据结构的第一个字段必须是一个OVERLAPPED结构体:
typedef struct _PER_IO_CONTEXT
{
OVERLAPPED m_Overlapped; // 每一个重叠网络操作的重叠结构(针对每一个Socket的每一个操作,都要有一个)
SOCKET m_sockAccept; // 这个网络操作所使用的Socket
WSABUF m_wsaBuf; // WSA类型的缓冲区,用于给重叠操作传参数的
char m_szBuffer[MAX_BUFFER_LEN]; // 这个是WSABUF里具体存字符的缓冲区
OPERATION_TYPE m_OpType; // 标识网络操作的类型(对应上面的枚举)
};
我们首先将SOME_STRUCT改名成它应该叫的名字,即_PER_SOCKET_CONTEXT:
typedef struct _PER_SOCKET_CONTEXT
{
SOCKET m_Socket;
// 每一个客户端连接的Socket
SOCKADDR_IN m_ClientAddr;
// 客户端的地址
CArray<_PER_IO_CONTEXT*> m_arrayIoContext;
// 客户端网络操作的上下文数据,
};
我们再次观察GetQueuedCompletionStatus的函数签名会发现,其第三个参数正好就是一个OVERLAPPED结构指针,至此我们在工作线程里面不仅可以知道是哪个socket的事件,同时能通过OVERLAPPED*后面的字段知道是收数据还是发数据:
DWORD ThreadFunction()
{
OVERLAPPED *pOverlapped = NULL;
PER_SOCKET_CONTEXT *pSocketContext = NULL;
DWORD dwBytesTransfered = 0;
BOOL bReturn = GetQueuedCompletionStatus(m_hIOCompletionPort,
&dwBytesTransfered,
(PULONG_PTR)&pSocketContext,
&pOverlapped, INFINITE);
if (((SOME_STRUCT*)pSocketContext)->s == 侦听socket句柄)
{
//新连接接收成功,做一些操作
}
//普通客户端socket收发数据
else
{
//通过pOverlapped结构得到pIOContext
PER_IO_CONTEXT* pIOContext = (PER_IO_CONTEXT*)pOverlapped;
if (pIOContext->Type == 收)
{
//做一些操作2,比如解析数据
}
else if (pIOContext->Type == 发)
{
//做一些操作3,比如显示一条数据发送成功信息
}
}
}
小结构体指针转换成大结构体指针操作:PER_IO_CONTEXT* pIOContext = (PER_IO_CONTEXT*)pOverlapped; 微软直接帮我们定义了一个宏CONTAINING_RECORD来操作:
//
// Calculate the address of the base of the structure given its type, and an
// address of a field within the structure.
//
#define CONTAINING_RECORD(address, type, field) ((type *)( \
(PCHAR)(address) - \
(ULONG_PTR)(&((type *)0)->field)))
所以上述代码也可以写成:
PER_IO_CONTEXT* pIoContext = CONTAINING_RECORD(pOverlapped,
PER_IO_CONTEXT,
m_Overlapped);
由于公众号文章字数有限,您可以接着阅读下一篇:《windows完成端口(二)》 系列目录 windows完成端口(一) windows完成端口(二) windows完成端口(三) windows完成端口(四) windows完成端口(五) windows完成端口(六)