前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >windows完成端口(五)

windows完成端口(五)

作者头像
范蠡
发布2018-04-24 15:12:05
1.8K0
发布2018-04-24 15:12:05
举报

系列目录 windows完成端口(一) windows完成端口(二) windows完成端口(三) windows完成端口(四) windows完成端口(五) windows完成端口(六)

代码语言:javascript
复制
#include "StdAfx.h"  
#include "IOCPModel.h"  
#include "MainDlg.h"  

// 每一个处理器上产生多少个线程(为了最大限度的提升服务器性能,详见配套文档)  
#define WORKER_THREADS_PER_PROCESSOR 2  
// 同时投递的Accept请求的数量(这个要根据实际的情况灵活设置)  
#define MAX_POST_ACCEPT              10  
// 传递给Worker线程的退出信号  
#define EXIT_CODE                    NULL  


// 释放指针和句柄资源的宏  

// 释放指针宏  
#define RELEASE(x)
 {if(x != NULL ){delete x;x=NULL;}}  
// 释放句柄宏  
#define RELEASE_HANDLE(x)
{if(x != NULL && x!=INVALID_HANDLE_VALUE){ CloseHandle(x);x = NULL;}}  
// 释放Socket宏  
#define RELEASE_SOCKET(x)
{if(x !=INVALID_SOCKET) { closesocket(x);x=INVALID_SOCKET;}}  



CIOCPModel::CIOCPModel(void):  
                            m_nThreads(0),  
                            m_hShutdownEvent(NULL),  
                            m_hIOCompletionPort(NULL),  
                            m_phWorkerThreads(NULL),  
                            m_strIP(DEFAULT_IP),  
                            m_nPort(DEFAULT_PORT),  
                            m_pMain(NULL),  
                            m_lpfnAcceptEx( NULL ),  
                            m_pListenContext( NULL )  
{  
}  


CIOCPModel::~CIOCPModel(void)  
{  
    // 确保资源彻底释放  
    this->Stop();  
}  




///////////////////////////////////////////////////////////////////  
// 工作者线程:  为IOCP请求服务的工作者线程  
//         也就是每当完成端口上出现了完成数据包,就将之取出来进行处理的线程  
///////////////////////////////////////////////////////////////////  

DWORD WINAPI CIOCPModel::_WorkerThread(LPVOID lpParam)  
{      
    THREADPARAMS_WORKER* pParam = (THREADPARAMS_WORKER*)lpParam;  
    CIOCPModel* pIOCPModel = (CIOCPModel*)pParam->pIOCPModel;  
    int nThreadNo = (int)pParam->nThreadNo;  

    pIOCPModel->_ShowMessage(_T("工作者线程启动,ID: %d."),nThreadNo);  

    OVERLAPPED           *pOverlapped = NULL;  
    PER_SOCKET_CONTEXT   *pSocketContext = NULL;  
    DWORD                dwBytesTransfered = 0;  

    // 循环处理请求,知道接收到Shutdown信息为止  
    while (WAIT_OBJECT_0 != WaitForSingleObject(pIOCPModel->m_hShutdownEvent, 0))  
    {  
        BOOL bReturn = GetQueuedCompletionStatus(  
            pIOCPModel->m_hIOCompletionPort,  
            &dwBytesTransfered,  
            (PULONG_PTR)&pSocketContext,  
            &pOverlapped,  
            INFINITE);  

        // 如果收到的是退出标志,则直接退出  
        if ( EXIT_CODE==(DWORD)pSocketContext )  
        {  
            break;  
        }  

        // 判断是否出现了错误  
        if( !bReturn )    
        {    
            DWORD dwErr = GetLastError();  

            // 显示一下提示信息  
            if( !pIOCPModel->HandleError( pSocketContext,dwErr ) )  
            {  
                break;  
            }  

            continue;    
        }    
        else    
        {     
            // 读取传入的参数  
            PER_IO_CONTEXT* pIoContext = CONTAINING_RECORD(pOverlapped,
                                         PER_IO_CONTEXT,
                                         m_Overlapped);    

            // 判断是否有客户端断开了  
            if((0 == dwBytesTransfered) && ( RECV_POSTED==pIoContext->m_OpType ||
                                            SEND_POSTED==pIoContext->m_OpType))    
            {    
                pIOCPModel->_ShowMessage( _T("客户端 %s:%d 断开连接."),
                                              inet_ntoa(pSocketContext->m_ClientAddr.sin_addr), 
                                              ntohs(pSocketContext->m_ClientAddr.sin_port) );  

                // 释放掉对应的资源  
                pIOCPModel->_RemoveContext( pSocketContext );  

                continue;    
            }    
            else  
            {  
                switch( pIoContext->m_OpType )    
                {    
                     // Accept    
                case ACCEPT_POSTED:  
                    {   

                        // 为了增加代码可读性,这里用专门的_DoAccept函数进行处理连入请求  
                        pIOCPModel->_DoAccpet( pSocketContext, pIoContext );                       


                    }  
                    break;  

                    // RECV  
                case RECV_POSTED:  
                    {  
                        // 为了增加代码可读性,这里用专门的_DoRecv函数进行处理接收请求  
                        pIOCPModel->_DoRecv( pSocketContext,pIoContext );  
                    }  
                    break;  

                    // SEND  
                    // 这里略过不写了,要不代码太多了,不容易理解,Send操作相对来讲简单一些  
                case SEND_POSTED:  
                    {  

                    }  
                    break;  
                default:  
                    // 不应该执行到这里  
                    TRACE(_T("_WorkThread中的 pIoContext->m_OpType 参数异常.\n"));  
                    break;  
                } //switch  
            }//if  
        }//if  

    }//while  

    TRACE(_T("工作者线程 %d 号退出.\n"),nThreadNo);  

    // 释放线程参数  
    RELEASE(lpParam);     

    return 0;  
}  



//====================================================================================  
//  
//                  系统初始化和终止  
//  
//====================================================================================  




////////////////////////////////////////////////////////////////////  
// 初始化WinSock 2.2  
bool CIOCPModel::LoadSocketLib()  
{      
    WSADATA wsaData;  
    int nResult;  
    nResult = WSAStartup(MAKEWORD(2,2), &wsaData);  
    // 错误(一般都不可能出现)  
    if (NO_ERROR != nResult)  
    {  
        this->_ShowMessage(_T("初始化WinSock 2.2失败!\n"));  
        return false;   
    }  

    return true;  
}  

//////////////////////////////////////////////////////////////////  
//  启动服务器  
bool CIOCPModel::Start()  
{  
    // 初始化线程互斥量  
    InitializeCriticalSection(&m_csContextList);  

    // 建立系统退出的事件通知  
    m_hShutdownEvent = CreateEvent(NULL, TRUE, FALSE, NULL);  

    // 初始化IOCP  
    if (false == _InitializeIOCP())  
    {  
        this->_ShowMessage(_T("初始化IOCP失败!\n"));  
        return false;  
    }  
    else  
    {  
        this->_ShowMessage(_T("\nIOCP初始化完毕\n."));  
    }  

    // 初始化Socket  
    if( false==_InitializeListenSocket() )  
    {  
        this->_ShowMessage(_T("Listen Socket初始化失败!\n"));  
        this->_DeInitialize();  
        return false;  
    }  
    else  
    {  
        this->_ShowMessage(_T("Listen Socket初始化完毕."));  
    }  

    this->_ShowMessage(_T("系统准备就绪,等候连接....\n"));  

    return true;  
}  


////////////////////////////////////////////////////////////////////  
//  开始发送系统退出消息,退出完成端口和线程资源  
void CIOCPModel::Stop()  
{  
    if( m_pListenContext!=NULL && m_pListenContext->m_Socket!=INVALID_SOCKET )  
    {  
        // 激活关闭消息通知  
        SetEvent(m_hShutdownEvent);  

        for (int i = 0; i < m_nThreads; i++)  
        {  
            // 通知所有的完成端口操作退出  
            PostQueuedCompletionStatus(m_hIOCompletionPort, 
                                        0, (DWORD)EXIT_CODE,
                                        NULL);  
        }  

        // 等待所有的客户端资源退出  
        WaitForMultipleObjects(m_nThreads, 
                               m_phWorkerThreads, 
                               TRUE, INFINITE);  

        // 清除客户端列表信息  
        this->_ClearContextList();  

        // 释放其他资源  
        this->_DeInitialize();  

        this->_ShowMessage(_T("停止监听\n"));  
    }     
}  


////////////////////////////////  
// 初始化完成端口  
bool CIOCPModel::_InitializeIOCP()  
{  
    // 建立第一个完成端口  
    m_hIOCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 
                                                 NULL, 0, 0 );  

    if ( NULL == m_hIOCompletionPort)  
    {  
        this->_ShowMessage(_T("建立完成端口失败!错误代码: %d!\n"),
                             WSAGetLastError());  
        return false;  
    }  

    // 根据本机中的处理器数量,建立对应的线程数  
    m_nThreads = WORKER_THREADS_PER_PROCESSOR * _GetNoOfProcessors();  

    // 为工作者线程初始化句柄  
    m_phWorkerThreads = new HANDLE[m_nThreads];  

    // 根据计算出来的数量建立工作者线程  
    DWORD nThreadID;  
    for (int i = 0; i < m_nThreads; i++)  
    {  
        THREADPARAMS_WORKER* pThreadParams = new THREADPARAMS_WORKER;  
        pThreadParams->pIOCPModel = this;  
        pThreadParams->nThreadNo  = i+1;  
        m_phWorkerThreads[i] = ::CreateThread(0, 0, _WorkerThread,
                                              (void *)pThreadParams,
                                               0, &nThreadID);  
    }  

    TRACE(" 建立 _WorkerThread %d 个.\n", m_nThreads );  

    return true;  
}  


/////////////////////////////////////////////////////////////////  
// 初始化Socket  
bool CIOCPModel::_InitializeListenSocket()  
{  
    // AcceptEx 和 GetAcceptExSockaddrs 的GUID,用于导出函数指针  
    GUID GuidAcceptEx = WSAID_ACCEPTEX;    
    GUID GuidGetAcceptExSockAddrs = WSAID_GETACCEPTEXSOCKADDRS;   

    // 服务器地址信息,用于绑定Socket  
    struct sockaddr_in ServerAddress;  

    // 生成用于监听的Socket的信息  
    m_pListenContext = new PER_SOCKET_CONTEXT;  

    // 需要使用重叠IO,必须得使用WSASocket来建立Socket,才可以支持重叠IO操作  
    m_pListenContext->m_Socket = WSASocket(AF_INET, SOCK_STREAM,
                                            0, NULL, 0,
                                            WSA_FLAG_OVERLAPPED);  
    if (INVALID_SOCKET == m_pListenContext->m_Socket)   
    {  
        this->_ShowMessage(_T("初始化Socket失败,错误代码: %d.\n"),
                               WSAGetLastError());  
        return false;  
    }  
    else  
    {  
        TRACE(_T("WSASocket() 完成.\n"));  
    }  

    // 将Listen Socket绑定至完成端口中  
    if( NULL== CreateIoCompletionPort( (HANDLE)m_pListenContext->m_Socket, 
                                         m_hIOCompletionPort,
                                        (DWORD)m_pListenContext, 0))    
    {    
        this->_ShowMessage(_T("绑定 Listen Socket至完成端口失败!错误代码: %d/n"),
                              WSAGetLastError());  
        RELEASE_SOCKET( m_pListenContext->m_Socket );  
        return false;  
    }  
    else  
    {  
        TRACE(_T("Listen Socket绑定完成端口 完成.\n"));  
    }  

    // 填充地址信息  
    ZeroMemory((char *)&ServerAddress, sizeof(ServerAddress));  
    ServerAddress.sin_family = AF_INET;  
    // 这里可以绑定任何可用的IP地址,或者绑定一个指定的IP地址   
    ServerAddress.sin_addr.s_addr = htonl(INADDR_ANY);                        
    //ServerAddress.sin_addr.s_addr = inet_addr(CStringA(m_strIP).GetString());           
    ServerAddress.sin_port = htons(m_nPort);                            

    // 绑定地址和端口  
    if (SOCKET_ERROR == bind(m_pListenContext->m_Socket,
                            (struct sockaddr *) &ServerAddress, 
                            sizeof(ServerAddress)))   
    {  
        this->_ShowMessage(_T("bind()函数执行错误.\n"));  
        return false;  
    }  
    else  
    {  
        TRACE(_T("bind() 完成.\n"));  
    }  

    // 开始进行监听  
    if (SOCKET_ERROR == listen(m_pListenContext->m_Socket,SOMAXCONN))  
    {  
        this->_ShowMessage(_T("Listen()函数执行出现错误.\n"));  
        return false;  
    }  
    else  
    {  
        TRACE(_T("Listen() 完成.\n"));  
    }  

    // 使用AcceptEx函数,因为这个是属于WinSock2规范之外的微软另外提供的扩展函数  
    // 所以需要额外获取一下函数的指针,  
    // 获取AcceptEx函数指针  
    DWORD dwBytes = 0;    
    if(SOCKET_ERROR == WSAIoctl(  
        m_pListenContext->m_Socket,   
        SIO_GET_EXTENSION_FUNCTION_POINTER,   
        &GuidAcceptEx,   
        sizeof(GuidAcceptEx),   
        &m_lpfnAcceptEx,   
        sizeof(m_lpfnAcceptEx),   
        &dwBytes,   
        NULL,   
        NULL))    
    {    
        this->_ShowMessage(_T("WSAIoctl 未能获取AcceptEx函数指针。错误代码: %d\n"),
                           WSAGetLastError());  
        this->_DeInitialize();  
        return false;    
    }    

    // 获取GetAcceptExSockAddrs函数指针,也是同理  
    if(SOCKET_ERROR == WSAIoctl(  
        m_pListenContext->m_Socket,   
        SIO_GET_EXTENSION_FUNCTION_POINTER,   
        &GuidGetAcceptExSockAddrs,  
        sizeof(GuidGetAcceptExSockAddrs),   
        &m_lpfnGetAcceptExSockAddrs,   
        sizeof(m_lpfnGetAcceptExSockAddrs),     
        &dwBytes,   
        NULL,   
        NULL))    
    {    
        this->_ShowMessage(_T("WSAIoctl 未能获取GuidGetAcceptExSockAddrs函数指针。错误代码: %d\n"), 
                               WSAGetLastError());  
        this->_DeInitialize();  
        return false;   
    }    


    // 为AcceptEx 准备参数,然后投递AcceptEx I/O请求  
    for( int i=0;i<MAX_POST_ACCEPT;i++ )  
    {  
        // 新建一个IO_CONTEXT  
        PER_IO_CONTEXT* pAcceptIoContext = m_pListenContext->GetNewIoContext();  

        if( false==this->_PostAccept( pAcceptIoContext ) )  
        {  
            m_pListenContext->RemoveContext(pAcceptIoContext);  
            return false;  
        }  
    }  

    this->_ShowMessage( _T("投递 %d 个AcceptEx请求完毕"),MAX_POST_ACCEPT );  

    return true;  
}  

////////////////////////////////////////////////////////////   

由于公众号文章字数有限,您可以接着阅读下一篇:《windows完成端口(六)》 系列目录 windows完成端口(一) windows完成端口(二) windows完成端口(三) windows完成端口(四) windows完成端口(五) windows完成端口(六)

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-04-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 高性能服务器开发 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档