windows完成端口(五)

系列目录 windows完成端口(一) windows完成端口(二) windows完成端口(三) windows完成端口(四) windows完成端口(五) windows完成端口(六)

#include "StdAfx.h"  
#include "IOCPModel.h"  
#include "MainDlg.h"  

// 每一个处理器上产生多少个线程(为了最大限度的提升服务器性能,详见配套文档)  
#define WORKER_THREADS_PER_PROCESSOR 2  
// 同时投递的Accept请求的数量(这个要根据实际的情况灵活设置)  
#define MAX_POST_ACCEPT              10  
// 传递给Worker线程的退出信号  
#define EXIT_CODE                    NULL  


// 释放指针和句柄资源的宏  

// 释放指针宏  
#define RELEASE(x)
 {if(x != NULL ){delete x;x=NULL;}}  
// 释放句柄宏  
#define RELEASE_HANDLE(x)
{if(x != NULL && x!=INVALID_HANDLE_VALUE){ CloseHandle(x);x = NULL;}}  
// 释放Socket宏  
#define RELEASE_SOCKET(x)
{if(x !=INVALID_SOCKET) { closesocket(x);x=INVALID_SOCKET;}}  



CIOCPModel::CIOCPModel(void):  
                            m_nThreads(0),  
                            m_hShutdownEvent(NULL),  
                            m_hIOCompletionPort(NULL),  
                            m_phWorkerThreads(NULL),  
                            m_strIP(DEFAULT_IP),  
                            m_nPort(DEFAULT_PORT),  
                            m_pMain(NULL),  
                            m_lpfnAcceptEx( NULL ),  
                            m_pListenContext( NULL )  
{  
}  


CIOCPModel::~CIOCPModel(void)  
{  
    // 确保资源彻底释放  
    this->Stop();  
}  




///////////////////////////////////////////////////////////////////  
// 工作者线程:  为IOCP请求服务的工作者线程  
//         也就是每当完成端口上出现了完成数据包,就将之取出来进行处理的线程  
///////////////////////////////////////////////////////////////////  

DWORD WINAPI CIOCPModel::_WorkerThread(LPVOID lpParam)  
{      
    THREADPARAMS_WORKER* pParam = (THREADPARAMS_WORKER*)lpParam;  
    CIOCPModel* pIOCPModel = (CIOCPModel*)pParam->pIOCPModel;  
    int nThreadNo = (int)pParam->nThreadNo;  

    pIOCPModel->_ShowMessage(_T("工作者线程启动,ID: %d."),nThreadNo);  

    OVERLAPPED           *pOverlapped = NULL;  
    PER_SOCKET_CONTEXT   *pSocketContext = NULL;  
    DWORD                dwBytesTransfered = 0;  

    // 循环处理请求,知道接收到Shutdown信息为止  
    while (WAIT_OBJECT_0 != WaitForSingleObject(pIOCPModel->m_hShutdownEvent, 0))  
    {  
        BOOL bReturn = GetQueuedCompletionStatus(  
            pIOCPModel->m_hIOCompletionPort,  
            &dwBytesTransfered,  
            (PULONG_PTR)&pSocketContext,  
            &pOverlapped,  
            INFINITE);  

        // 如果收到的是退出标志,则直接退出  
        if ( EXIT_CODE==(DWORD)pSocketContext )  
        {  
            break;  
        }  

        // 判断是否出现了错误  
        if( !bReturn )    
        {    
            DWORD dwErr = GetLastError();  

            // 显示一下提示信息  
            if( !pIOCPModel->HandleError( pSocketContext,dwErr ) )  
            {  
                break;  
            }  

            continue;    
        }    
        else    
        {     
            // 读取传入的参数  
            PER_IO_CONTEXT* pIoContext = CONTAINING_RECORD(pOverlapped,
                                         PER_IO_CONTEXT,
                                         m_Overlapped);    

            // 判断是否有客户端断开了  
            if((0 == dwBytesTransfered) && ( RECV_POSTED==pIoContext->m_OpType ||
                                            SEND_POSTED==pIoContext->m_OpType))    
            {    
                pIOCPModel->_ShowMessage( _T("客户端 %s:%d 断开连接."),
                                              inet_ntoa(pSocketContext->m_ClientAddr.sin_addr), 
                                              ntohs(pSocketContext->m_ClientAddr.sin_port) );  

                // 释放掉对应的资源  
                pIOCPModel->_RemoveContext( pSocketContext );  

                continue;    
            }    
            else  
            {  
                switch( pIoContext->m_OpType )    
                {    
                     // Accept    
                case ACCEPT_POSTED:  
                    {   

                        // 为了增加代码可读性,这里用专门的_DoAccept函数进行处理连入请求  
                        pIOCPModel->_DoAccpet( pSocketContext, pIoContext );                       


                    }  
                    break;  

                    // RECV  
                case RECV_POSTED:  
                    {  
                        // 为了增加代码可读性,这里用专门的_DoRecv函数进行处理接收请求  
                        pIOCPModel->_DoRecv( pSocketContext,pIoContext );  
                    }  
                    break;  

                    // SEND  
                    // 这里略过不写了,要不代码太多了,不容易理解,Send操作相对来讲简单一些  
                case SEND_POSTED:  
                    {  

                    }  
                    break;  
                default:  
                    // 不应该执行到这里  
                    TRACE(_T("_WorkThread中的 pIoContext->m_OpType 参数异常.\n"));  
                    break;  
                } //switch  
            }//if  
        }//if  

    }//while  

    TRACE(_T("工作者线程 %d 号退出.\n"),nThreadNo);  

    // 释放线程参数  
    RELEASE(lpParam);     

    return 0;  
}  



//====================================================================================  
//  
//                  系统初始化和终止  
//  
//====================================================================================  




////////////////////////////////////////////////////////////////////  
// 初始化WinSock 2.2  
bool CIOCPModel::LoadSocketLib()  
{      
    WSADATA wsaData;  
    int nResult;  
    nResult = WSAStartup(MAKEWORD(2,2), &wsaData);  
    // 错误(一般都不可能出现)  
    if (NO_ERROR != nResult)  
    {  
        this->_ShowMessage(_T("初始化WinSock 2.2失败!\n"));  
        return false;   
    }  

    return true;  
}  

//////////////////////////////////////////////////////////////////  
//  启动服务器  
bool CIOCPModel::Start()  
{  
    // 初始化线程互斥量  
    InitializeCriticalSection(&m_csContextList);  

    // 建立系统退出的事件通知  
    m_hShutdownEvent = CreateEvent(NULL, TRUE, FALSE, NULL);  

    // 初始化IOCP  
    if (false == _InitializeIOCP())  
    {  
        this->_ShowMessage(_T("初始化IOCP失败!\n"));  
        return false;  
    }  
    else  
    {  
        this->_ShowMessage(_T("\nIOCP初始化完毕\n."));  
    }  

    // 初始化Socket  
    if( false==_InitializeListenSocket() )  
    {  
        this->_ShowMessage(_T("Listen Socket初始化失败!\n"));  
        this->_DeInitialize();  
        return false;  
    }  
    else  
    {  
        this->_ShowMessage(_T("Listen Socket初始化完毕."));  
    }  

    this->_ShowMessage(_T("系统准备就绪,等候连接....\n"));  

    return true;  
}  


////////////////////////////////////////////////////////////////////  
//  开始发送系统退出消息,退出完成端口和线程资源  
void CIOCPModel::Stop()  
{  
    if( m_pListenContext!=NULL && m_pListenContext->m_Socket!=INVALID_SOCKET )  
    {  
        // 激活关闭消息通知  
        SetEvent(m_hShutdownEvent);  

        for (int i = 0; i < m_nThreads; i++)  
        {  
            // 通知所有的完成端口操作退出  
            PostQueuedCompletionStatus(m_hIOCompletionPort, 
                                        0, (DWORD)EXIT_CODE,
                                        NULL);  
        }  

        // 等待所有的客户端资源退出  
        WaitForMultipleObjects(m_nThreads, 
                               m_phWorkerThreads, 
                               TRUE, INFINITE);  

        // 清除客户端列表信息  
        this->_ClearContextList();  

        // 释放其他资源  
        this->_DeInitialize();  

        this->_ShowMessage(_T("停止监听\n"));  
    }     
}  


////////////////////////////////  
// 初始化完成端口  
bool CIOCPModel::_InitializeIOCP()  
{  
    // 建立第一个完成端口  
    m_hIOCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 
                                                 NULL, 0, 0 );  

    if ( NULL == m_hIOCompletionPort)  
    {  
        this->_ShowMessage(_T("建立完成端口失败!错误代码: %d!\n"),
                             WSAGetLastError());  
        return false;  
    }  

    // 根据本机中的处理器数量,建立对应的线程数  
    m_nThreads = WORKER_THREADS_PER_PROCESSOR * _GetNoOfProcessors();  

    // 为工作者线程初始化句柄  
    m_phWorkerThreads = new HANDLE[m_nThreads];  

    // 根据计算出来的数量建立工作者线程  
    DWORD nThreadID;  
    for (int i = 0; i < m_nThreads; i++)  
    {  
        THREADPARAMS_WORKER* pThreadParams = new THREADPARAMS_WORKER;  
        pThreadParams->pIOCPModel = this;  
        pThreadParams->nThreadNo  = i+1;  
        m_phWorkerThreads[i] = ::CreateThread(0, 0, _WorkerThread,
                                              (void *)pThreadParams,
                                               0, &nThreadID);  
    }  

    TRACE(" 建立 _WorkerThread %d 个.\n", m_nThreads );  

    return true;  
}  


/////////////////////////////////////////////////////////////////  
// 初始化Socket  
bool CIOCPModel::_InitializeListenSocket()  
{  
    // AcceptEx 和 GetAcceptExSockaddrs 的GUID,用于导出函数指针  
    GUID GuidAcceptEx = WSAID_ACCEPTEX;    
    GUID GuidGetAcceptExSockAddrs = WSAID_GETACCEPTEXSOCKADDRS;   

    // 服务器地址信息,用于绑定Socket  
    struct sockaddr_in ServerAddress;  

    // 生成用于监听的Socket的信息  
    m_pListenContext = new PER_SOCKET_CONTEXT;  

    // 需要使用重叠IO,必须得使用WSASocket来建立Socket,才可以支持重叠IO操作  
    m_pListenContext->m_Socket = WSASocket(AF_INET, SOCK_STREAM,
                                            0, NULL, 0,
                                            WSA_FLAG_OVERLAPPED);  
    if (INVALID_SOCKET == m_pListenContext->m_Socket)   
    {  
        this->_ShowMessage(_T("初始化Socket失败,错误代码: %d.\n"),
                               WSAGetLastError());  
        return false;  
    }  
    else  
    {  
        TRACE(_T("WSASocket() 完成.\n"));  
    }  

    // 将Listen Socket绑定至完成端口中  
    if( NULL== CreateIoCompletionPort( (HANDLE)m_pListenContext->m_Socket, 
                                         m_hIOCompletionPort,
                                        (DWORD)m_pListenContext, 0))    
    {    
        this->_ShowMessage(_T("绑定 Listen Socket至完成端口失败!错误代码: %d/n"),
                              WSAGetLastError());  
        RELEASE_SOCKET( m_pListenContext->m_Socket );  
        return false;  
    }  
    else  
    {  
        TRACE(_T("Listen Socket绑定完成端口 完成.\n"));  
    }  

    // 填充地址信息  
    ZeroMemory((char *)&ServerAddress, sizeof(ServerAddress));  
    ServerAddress.sin_family = AF_INET;  
    // 这里可以绑定任何可用的IP地址,或者绑定一个指定的IP地址   
    ServerAddress.sin_addr.s_addr = htonl(INADDR_ANY);                        
    //ServerAddress.sin_addr.s_addr = inet_addr(CStringA(m_strIP).GetString());           
    ServerAddress.sin_port = htons(m_nPort);                            

    // 绑定地址和端口  
    if (SOCKET_ERROR == bind(m_pListenContext->m_Socket,
                            (struct sockaddr *) &ServerAddress, 
                            sizeof(ServerAddress)))   
    {  
        this->_ShowMessage(_T("bind()函数执行错误.\n"));  
        return false;  
    }  
    else  
    {  
        TRACE(_T("bind() 完成.\n"));  
    }  

    // 开始进行监听  
    if (SOCKET_ERROR == listen(m_pListenContext->m_Socket,SOMAXCONN))  
    {  
        this->_ShowMessage(_T("Listen()函数执行出现错误.\n"));  
        return false;  
    }  
    else  
    {  
        TRACE(_T("Listen() 完成.\n"));  
    }  

    // 使用AcceptEx函数,因为这个是属于WinSock2规范之外的微软另外提供的扩展函数  
    // 所以需要额外获取一下函数的指针,  
    // 获取AcceptEx函数指针  
    DWORD dwBytes = 0;    
    if(SOCKET_ERROR == WSAIoctl(  
        m_pListenContext->m_Socket,   
        SIO_GET_EXTENSION_FUNCTION_POINTER,   
        &GuidAcceptEx,   
        sizeof(GuidAcceptEx),   
        &m_lpfnAcceptEx,   
        sizeof(m_lpfnAcceptEx),   
        &dwBytes,   
        NULL,   
        NULL))    
    {    
        this->_ShowMessage(_T("WSAIoctl 未能获取AcceptEx函数指针。错误代码: %d\n"),
                           WSAGetLastError());  
        this->_DeInitialize();  
        return false;    
    }    

    // 获取GetAcceptExSockAddrs函数指针,也是同理  
    if(SOCKET_ERROR == WSAIoctl(  
        m_pListenContext->m_Socket,   
        SIO_GET_EXTENSION_FUNCTION_POINTER,   
        &GuidGetAcceptExSockAddrs,  
        sizeof(GuidGetAcceptExSockAddrs),   
        &m_lpfnGetAcceptExSockAddrs,   
        sizeof(m_lpfnGetAcceptExSockAddrs),     
        &dwBytes,   
        NULL,   
        NULL))    
    {    
        this->_ShowMessage(_T("WSAIoctl 未能获取GuidGetAcceptExSockAddrs函数指针。错误代码: %d\n"), 
                               WSAGetLastError());  
        this->_DeInitialize();  
        return false;   
    }    


    // 为AcceptEx 准备参数,然后投递AcceptEx I/O请求  
    for( int i=0;i<MAX_POST_ACCEPT;i++ )  
    {  
        // 新建一个IO_CONTEXT  
        PER_IO_CONTEXT* pAcceptIoContext = m_pListenContext->GetNewIoContext();  

        if( false==this->_PostAccept( pAcceptIoContext ) )  
        {  
            m_pListenContext->RemoveContext(pAcceptIoContext);  
            return false;  
        }  
    }  

    this->_ShowMessage( _T("投递 %d 个AcceptEx请求完毕"),MAX_POST_ACCEPT );  

    return true;  
}  

////////////////////////////////////////////////////////////   

由于公众号文章字数有限,您可以接着阅读下一篇:《windows完成端口(六)》 系列目录 windows完成端口(一) windows完成端口(二) windows完成端口(三) windows完成端口(四) windows完成端口(五) windows完成端口(六)

原文发布于微信公众号 - 高性能服务器开发(easyserverdev)

原文发表时间:2018-04-18

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏刘望舒

Android PMS处理APK的复制

在上一篇文章Android包管理机制之PackageInstaller安装APK中,我们学习了PackageInstaller是如何安装APK的,最后会将APK...

2065
来自专栏高性能服务器开发

从零学习开源项目系列(四)LogServer源码探究

这是从零学习开源项目的第四篇,上一篇是《从零学习开源项目系列(三) CSBattleMgr服务源码研究》,这篇文章我们一起来学习LogServer,中文意思可能...

2462
来自专栏Android知识点总结

Android原生下载(上篇)基本逻辑+断点续传

1281
来自专栏Android 研究

APK安装流程详解7——PackageManagerService的启动流程(上)

我们看到在SystemServer无参构造函数里面就是初始化mFactoryTestMode

1601
来自专栏KK的小酒馆

SQlite数据库简介Android网络与数据存储

SQLite看名字就知道是个数据库,Android专门为移动端内置了此种轻量级工具,并且为了方便在Java语言中进行数据库操作,编写了SQLiteOpenHel...

1403
来自专栏本立2道生

Win32对话框程序(1)

之前学C语言是一直都是在控制台下面操作的,面对的都是黑框框,严重的打击了学习的兴趣。后来在TC下进行C语言课程设计,做了图形界面编程,但都是点线面画的…… 

1611
来自专栏NetCore

[原创]Fluent NHibernate之旅(四)-- 关系(上)

经过了前面三篇的介绍,相信大家对Fluent NHibernate已经有一定的了解了,在我们学习中,Fluent 也已经进入了RTM版本。这次的版本发布离RC版...

2146
来自专栏Java3y

Hibernate【映射】知识要点

前言 前面的我们使用的是一个表的操作,但我们实际的开发中不可能只使用一个表的…因此,本博文主要讲解关联映射 集合映射 需求分析:当用户购买商品,用户可能有多个地...

3127
来自专栏知识分享

4-MSP430定时器_定时器中断

一开始没写好就上传了,,,,,,,,这次来个全的 自己学MSP430是为了写一篇关于PID的文章,需要430在proteus上做仿真,一则认为在自动控制算法上P...

3506
来自专栏王大锤

再谈RunLoop

1323

扫码关注云+社区

领取腾讯云代金券