前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >进程间通信:同步双工管道

进程间通信:同步双工管道

作者头像
方亮
发布2019-01-16 10:02:56
1.5K0
发布2019-01-16 10:02:56
举报
文章被收录于专栏:方亮方亮

        因为工作需要,需要设计出一个双工的IPC。(转载请指明出处)在一番比较后,我发现管道是比较符合我们的需求的。但是我们需求要求管道的对方是可信任的,而在vista以下系统是没有GetNamedPipeClientProcessIdGetNamedPipeClientSessionIdGetNamedPipeServerProcessId

GetNamedPipeServerSessionId这样的函数,于是是否可信这个操作就要求由客户端和服务端两方互检来完成,至于互检的思路,我会在之后管道的加强版中给出思路和例子。而本文只是简单介绍一个同步双工管道。

        在工作中写的管道模型中,服务端每次被连接上,都会启动一个连接实例(线程)。于是如果存在多个客户端接入的情况下,将启动多个线程。这样的模型比较简单,但是效率存在问题。这些天我参考了微软的例子,重写了管道模型。服务端只启动一个线程,利用该线程的APC完成所有连接的读写操作。因为是同步双工,所以我设计的模型是不停的一问一答。当有消息要发向对方时,只需要向“问”列表中插入消息,底层会将这条消息发往对方;如果“问”表中不存数据,则发一条垃圾消息,对方在接受到这条消息后不做任何处理。这样的设计也就是为了维持管道畅通,不因一个环节卡住导致其他操作不可完成。

        对于管道模型,我设计成:传输层,数据层,逻辑层,应用层四层结构。其中传输层只负责管道连接和数据传输,不关心数据内容;数据层会将传输层所有取到的数据以管道句柄为依据进行分组,同时负责将各个连接要传给对方的数据汇总供传输层使用;逻辑层考虑加入验证逻辑,即验证对方是否为可信任,同时为应用层提供方便的调用支持,比如在逻辑层启动一个线程调用一个应用层设置的回调函数来处理接受到的消息,同时暴露一个发送数据的函数供应用层使用。这样应用层只要实现处理消息的回调、调用发送数据的接口即可。(工作中设计的管道模型就是这样子的。因为我准备重写一个更稳定和高效的管道,目前只大致写好了传输层代码。)

        服务端

代码语言:javascript
复制
#include "stdafx.h"
#include "PipeServerInstance.h"
#include <string>
#include <strsafe.h>

CPipeServerInstance::CPipeServerInstance()
{
    m_hPipe = NULL;
}

CPipeServerInstance::~CPipeServerInstance()
{

}

VOID CPipeServerInstance::StartService()
{
    HANDLE hConnectEvent = NULL; 
    
    // 创建一个连接等待事件
    hConnectEvent = CreateEvent( NULL, TRUE, TRUE, NULL );

    if ( NULL == hConnectEvent ) 
    {
        // 创建连接事件失败
        MYTRACE(L"CreateEvent failed with %d.\n", GetLastError()); 
        return;
    }

    OVERLAPPED oConnect;
    ::ZeroMemory( &oConnect, sizeof(OVERLAPPED) );
    oConnect.hEvent = hConnectEvent; 

    BOOL bPendingIO = FALSE;
    // 创建一个管道实例并等待客户端接入
    bPendingIO = CreateAndConnectInstance( &oConnect ); 

    // 等待事件的返回值
    DWORD dwWait = WAIT_TIMEOUT;

    while ( TRUE ) 
    { 
        // 等待一个客户端的接入,或者为了读写例程执行
        dwWait = WaitForSingleObjectEx( hConnectEvent, INFINITE, TRUE );

        switch ( dwWait ) 
        { 
        case WAIT_OBJECT_0: 
            // 一个客户端接入

            if ( FALSE == RunServerInstance( &oConnect, bPendingIO) )
            {
                return;
            }
            
            if ( FALSE == bPendingIO )
            {
                return;
            }
            
            break; 

        case WAIT_IO_COMPLETION: 
            // 等待事件是由读写完成例程触发的
            break; 

        default: 
            {
                // 错误
                MYTRACE(L"WaitForSingleObjectEx (%d)\n", GetLastError()); 
                return ;
            }
        } 
    } 
    return; 
}

BOOL CPipeServerInstance::RunServerInstance( LPOVERLAPPED lpoConnect, BOOL& bPendingIO )
{
    DWORD cbRet = 0;

    // 如果一个操作被挂起,则获取这个连接的结果
    if ( FALSE != bPendingIO ) 
    { 
        if ( FALSE == GetOverlappedResult( m_hPipe, lpoConnect, &cbRet, FALSE ) ) 
        {
            MYTRACE(L"ConnectNamedPipe (%d)\n", GetLastError()); 
            return FALSE;
        }
    } 

    LPPIPEINST lpPipeInst; 
    // 分配全局固定内存空间用于保存读写数据
    lpPipeInst = (LPPIPEINST) GlobalAlloc( GPTR, sizeof(PIPEINST) ); 
    if ( NULL == lpPipeInst) 
    {
        MYTRACE(L"GlobalAlloc failed (%d)\n", GetLastError()); 
        return FALSE;
    }

    lpPipeInst->hPipeInst = m_hPipe; 
    lpPipeInst->lpPointer = this;

    DealMsg( lpPipeInst );
    WriteFileEx( 
        lpPipeInst->hPipeInst, 
        lpPipeInst->cbWrite, 
        lpPipeInst->dwWrite, 
        (LPOVERLAPPED) lpPipeInst, 
        (LPOVERLAPPED_COMPLETION_ROUTINE) CompletedWriteRoutine); 

    // 创建一个新的管道实例去等待下一个客户端接入
    bPendingIO = CreateAndConnectInstance( lpoConnect ); 

    return TRUE;
}

BOOL CPipeServerInstance::StopService()
{
    m_CriticalPipeHandle.Lock();
    for ( VECHANDLEITER it = m_VecPipeHandle.begin(); it != m_VecPipeHandle.end(); it++ )
    {
        DisconnectNamedPipe( *it );
    }
    m_CriticalPipeHandle.Unlock();
    return TRUE;
}

VOID CPipeServerInstance::DisconnectAndClose( LPPIPEINST lpPipeInst )
{
    if( NULL == lpPipeInst || NULL == lpPipeInst->hPipeInst )
    {
        return;
    }

    // 断开管道连接
    if ( FALSE == DisconnectNamedPipe( lpPipeInst->hPipeInst ) ) 
    {
        MYTRACE(L"DisconnectNamedPipe failed with %d.\n", GetLastError());
    }

    // 关闭管道
    CloseHandle( lpPipeInst->hPipeInst ); 

    // 释放掉全局分配的内存
    if ( NULL != lpPipeInst ) 
    {
        GlobalFree(lpPipeInst); 
    }
}

BOOL CPipeServerInstance::CreateAndConnectInstance( LPOVERLAPPED lpoOverlap )
{
    m_hPipe = CreateNamedPipe( 
        PIPENAME, 
        PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
        PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
        PIPE_UNLIMITED_INSTANCES,
        sizeof(PipeMsgStruct),
        sizeof(PipeMsgStruct),
        PIPE_TIMEOUT,
        NULL );

    if ( INVALID_HANDLE_VALUE == m_hPipe ) 
    {
        MYTRACE(L"CreateNamedPipe failed with %d.\n", GetLastError()); 
        return FALSE;
    }

    m_CriticalPipeHandle.Lock();
    m_VecPipeHandle.push_back( m_hPipe );
    m_CriticalPipeHandle.Unlock();

    // 启动一个新的连接等待客户端接入
    return ConnectToNewClient( m_hPipe, lpoOverlap ); 
}

BOOL CPipeServerInstance::ConnectToNewClient( HANDLE hPipe, LPOVERLAPPED lpo )
{
    BOOL fPendingIO = FALSE; 
    DWORD dwLastError = ERROR_SUCCESS;

    // 异步命名管道连接应该失败
    if ( FALSE != ConnectNamedPipe( hPipe, lpo ) ) 
    {    
        MYTRACE( L"ConnectNamedPipe failed with %d.\n", GetLastError() ); 
        return FALSE;
    }

    switch ( GetLastError() ) 
    { 
        // If the function fails, the return value is zero 
        // and GetLastError returns a value other than ERROR_IO_PENDING or ERROR_PIPE_CONNECTED.
    case ERROR_IO_PENDING: 
        {
            // 正在连接
            fPendingIO = TRUE; 
        }break; 

    case ERROR_PIPE_CONNECTED: 
        {
            // If a client connects before the function is called, 
            // the function returns zero and GetLastError returns ERROR_PIPE_CONNECTED. 
            // This can happen if a client connects in the interval 
            // between the call to CreateNamedPipe and the call to ConnectNamedPipe.
            // In this situation, there is a good connection between client and server, 
            // even though the function returns zero.

            // 如果客户端已经连接上,则设置事件
            if ( SetEvent(lpo->hEvent) ) 
            {
                break; 
            }
        }// 这个地方故意不break的,因为SetEvent失败了
 
    default: 
        {
            MYTRACE(L"ConnectNamedPipe failed with %d.\n", GetLastError());
        }
    } 

    return fPendingIO; 
}

VOID WINAPI CPipeServerInstance::CompletedWriteRoutine( DWORD dwErr, DWORD cbWritten,
                                                       LPOVERLAPPED lpOverLap )
{
    LPPIPEINST lpPipeInst = NULL; 
    BOOL fRead = FALSE; 

    // 因为lpOverLap的内存是固定的,而其又是LPPIPEINST类型的第一个元素
    // 于是,这样就可以获得之前分配的LPPIPEINST类型的对象
    lpPipeInst = (LPPIPEINST) lpOverLap; 

    // 已经异步写完,于是再异步读
    if ( ( 0 == dwErr ) && ( cbWritten == lpPipeInst->dwWrite ) ) 
        fRead = ReadFileEx( 
        lpPipeInst->hPipeInst, 
        lpPipeInst->cbRead, 
        PIPEMSGLENGTH, 
        (LPOVERLAPPED) lpPipeInst, 
        (LPOVERLAPPED_COMPLETION_ROUTINE) CompletedReadRoutine); 

    // 如果写失败了,就断开连接
    CPipeServerInstance* pThis = static_cast<CPipeServerInstance*> (lpPipeInst->lpPointer);
    if ( FALSE == fRead && NULL != pThis ) 
    {
        pThis->DisconnectAndClose( lpPipeInst ); 
    }
}

VOID WINAPI CPipeServerInstance::CompletedReadRoutine( DWORD dwErr, DWORD cbBytesRead, 
                                                      LPOVERLAPPED lpOverLap )
{
    LPPIPEINST lpPipeInst = NULL; 
    BOOL fWrite = FALSE; 

    // 因为lpOverLap的内存是固定的,而其又是LPPIPEINST类型的第一个元素
    // 于是,这样就可以获得之前分配的LPPIPEINST类型的对象
    lpPipeInst = (LPPIPEINST) lpOverLap; 

    CPipeServerInstance* pThis = static_cast<CPipeServerInstance*> (lpPipeInst->lpPointer);

    // 已经异步读完,于是再异步写
    if ( 0 == dwErr && 0 != cbBytesRead && NULL != pThis) 
    { 
        pThis->DealMsg( lpPipeInst );
        
        fWrite = WriteFileEx( 
            lpPipeInst->hPipeInst, 
            lpPipeInst->cbWrite, 
            lpPipeInst->dwWrite, 
            (LPOVERLAPPED) lpPipeInst, 
            (LPOVERLAPPED_COMPLETION_ROUTINE) CompletedWriteRoutine); 
    }

    // 如果读失败了,就断开连接
    if ( FALSE == fWrite && NULL != pThis ) 
    {
        pThis->DisconnectAndClose(lpPipeInst); 
    }
}

VOID CPipeServerInstance::DealMsg( LPPIPEINST pipe )
{
    MYTRACE( L"[%p] %s\n", pipe->hPipeInst, pipe->cbRead);
    std::wstring strCmd = L"Default answer from server";
    StringCchCopy( (WCHAR*)pipe->cbWrite, PIPEMSGLENGTH, strCmd.c_str() );
    pipe->dwWrite = ( lstrlen( strCmd.c_str() ) + 1 ) * sizeof(TCHAR);
}

        客户端

代码语言:javascript
复制
#include "stdafx.h"
#include "PipeClientInstance.h"
#include <string>
#include <strsafe.h>

CPipeClientInstance::CPipeClientInstance()
{
    m_hPipe = NULL;
    m_hStopEvent = NULL;
}

CPipeClientInstance::~CPipeClientInstance()
{

}

BOOL CPipeClientInstance::StartClient()
{
    LPPIPEINST lpPipeInst = NULL;  

    // 创建一个退出事件
    m_hStopEvent = CreateEvent( NULL, FALSE, FALSE, NULL );

    if ( NULL == m_hStopEvent ) 
    {
        // 创建退出事件失败
        MYTRACE(L"CreateEvent failed with %d.\n", GetLastError()); 
        return FALSE;
    }

    // 连接服务器
    if ( FALSE == ConnectToServer() )
    {
        return FALSE;
    }

    // 消息循环
    if ( FALSE == MsgLoop( lpPipeInst ) )
    {
        return FALSE;
    }

    // 关闭管道清除内存
    DisconnectAndClose( lpPipeInst );

    return TRUE; 
}

BOOL CPipeClientInstance::ConnectToServer()
{
    while ( WAIT_TIMEOUT == WaitForSingleObjectEx( m_hStopEvent, 10, TRUE ) ) 
    { 
        // 异步的方式打开一个已经存在的管道
        m_hPipe = CreateFile( PIPENAME, GENERIC_READ | GENERIC_WRITE, 
            0, NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, NULL);

        if ( INVALID_HANDLE_VALUE != m_hPipe )
        {
            // 打开管道成功
            break; 
        }

        if ( ERROR_PIPE_BUSY != GetLastError() ) 
        {
            // 除了发生ERROR_PIPE_BUSY错误,否则其他错误都认为打开失败
            MYTRACE( L"Could not open pipe. GLE=%d\n", GetLastError() ); 
            return FALSE;
        }

        // 所有的管道处于“忙”状态,所以等待20秒
        if ( FALSE == WaitNamedPipe( PIPENAME, 20000 ) ) 
        { 
            MYTRACE(L"Could not open pipe: 20 second wait timed out."); 
            return FALSE;
        }
    } 

    return TRUE;
}

BOOL CPipeClientInstance::MsgLoop( LPPIPEINST& lpPipeInst )
{
    DWORD dwMode = PIPE_READMODE_BYTE; 
    BOOL bSuccess = SetNamedPipeHandleState( m_hPipe, &dwMode, NULL, NULL );

    if ( FALSE == bSuccess )
    {
        return FALSE;
    }

    // 分配全局固定内存空间用于保存读写数据
    lpPipeInst = (LPPIPEINST) GlobalAlloc( GPTR, sizeof(PIPEINST) ); 
    if ( NULL == lpPipeInst) 
    {
        MYTRACE(L"GlobalAlloc failed (%d)\n", GetLastError()); 
        return FALSE;
    }

    lpPipeInst->hPipeInst = m_hPipe; 
    lpPipeInst->lpPointer = this;

    lpPipeInst->dwWrite = 0;

    m_CriticalSectionQuit.Lock();
    lpPipeInst->bSuccess = TRUE;
    m_CriticalSectionQuit.Unlock();

    CompletedWriteRoutine( 0, 0, (LPOVERLAPPED) lpPipeInst ); 

    DWORD dwWait = WAIT_TIMEOUT;
    bSuccess = FALSE;
    BOOL bExit = FALSE;

    do 
    {
        bSuccess = FALSE;
        // 等待退出事件,同时也让读写完成例程执行
        dwWait = WaitForSingleObjectEx( m_hStopEvent, INFINITE, TRUE );
        switch( dwWait )
        {
        case WAIT_OBJECT_0:
            {
                // 等到了终止事件,退出循环
                bExit = TRUE;
            }break;
        case WAIT_IO_COMPLETION:
            {
                m_CriticalSectionQuit.Lock();
                if ( FALSE != lpPipeInst->bSuccess )
                {
                    // 读写完成,继续循环
                    bSuccess = TRUE;
                }
                else
                {
                    bExit = TRUE;
                }
                m_CriticalSectionQuit.Unlock();
            }break;
        default:
            {
                // 其他类型导致退出,认为是出错,退出循环
                bExit = TRUE;
            }
        }

        if ( FALSE != bExit )
        {
            break;
        }

    } while( FALSE != bSuccess );

    return TRUE;
}


BOOL CPipeClientInstance::StopClient()
{
    if ( NULL != m_hStopEvent )
    {
        ::SetEvent( m_hStopEvent );
    }

    return TRUE;
}

VOID CPipeClientInstance::DisconnectAndClose( LPPIPEINST lpPipeInst )
{
    if ( NULL != lpPipeInst->hPipeInst )
    {
        // 关闭管道
        CloseHandle( lpPipeInst->hPipeInst ); 
    }

    // 释放掉全局分配的内存
    if ( NULL != lpPipeInst ) 
    {
        GlobalFree(lpPipeInst); 
    }
}

VOID WINAPI CPipeClientInstance::CompletedWriteRoutine( DWORD dwErr, DWORD cbWritten,
                                                       LPOVERLAPPED lpOverLap )
{
    LPPIPEINST lpPipeInst = NULL; 
    BOOL fRead = FALSE; 

    // 因为lpOverLap的内存是固定的,而其又是LPPIPEINST类型的第一个元素
    // 于是,这样就可以获得之前分配的LPPIPEINST类型的对象
    lpPipeInst = (LPPIPEINST) lpOverLap; 

    // 已经异步写完,于是再异步读
    if ( ( 0 == dwErr ) && ( cbWritten == lpPipeInst->dwWrite ) ) 
        fRead = ReadFileEx( 
        lpPipeInst->hPipeInst, 
        lpPipeInst->cbRead, 
        PIPEMSGLENGTH, 
        (LPOVERLAPPED) lpPipeInst, 
        (LPOVERLAPPED_COMPLETION_ROUTINE) CompletedReadRoutine); 

    // 如果写失败了,就断开连接
    CPipeClientInstance* pThis = static_cast<CPipeClientInstance*> ( lpPipeInst->lpPointer );
    if ( FALSE == fRead && NULL != pThis ) 
    {
        pThis->NotifyExit( lpPipeInst ); 
    }
}

VOID WINAPI CPipeClientInstance::CompletedReadRoutine( DWORD dwErr, DWORD cbBytesRead, 
                                                      LPOVERLAPPED lpOverLap )
{
    LPPIPEINST lpPipeInst = NULL; 
    BOOL fWrite = FALSE; 

    // 因为lpOverLap的内存是固定的,而其又是LPPIPEINST类型的第一个元素
    // 于是,这样就可以获得之前分配的LPPIPEINST类型的对象
    lpPipeInst = (LPPIPEINST) lpOverLap; 

    CPipeClientInstance* pThis = static_cast<CPipeClientInstance*> (lpPipeInst->lpPointer);

    // 已经异步读完,于是再异步写
    if ( 0 == dwErr && 0 != cbBytesRead && NULL != pThis) 
    { 
        pThis->DealMsg( lpPipeInst );
        
        fWrite = WriteFileEx( 
            lpPipeInst->hPipeInst, 
            lpPipeInst->cbWrite, 
            lpPipeInst->dwWrite, 
            (LPOVERLAPPED) lpPipeInst, 
            (LPOVERLAPPED_COMPLETION_ROUTINE) CompletedWriteRoutine); 
    }

    // 如果读失败了,就断开连接
    if ( FALSE == fWrite && NULL != pThis ) 
    {
        pThis->NotifyExit(lpPipeInst); 
    }
}

VOID CPipeClientInstance::DealMsg( LPPIPEINST pipe )
{
    MYTRACE( L"[%p] %s\n", pipe->hPipeInst, pipe->cbRead );
    std::wstring strCmd = L"Default answer from client";
    StringCchCopy( (WCHAR*)pipe->cbWrite, PIPEMSGLENGTH, strCmd.c_str() );
    pipe->dwWrite = ( lstrlen( strCmd.c_str() ) + 1 ) * sizeof(TCHAR);
}

VOID CPipeClientInstance::NotifyExit( LPPIPEINST lpPipeInst )
{
    m_CriticalSectionQuit.Lock();
    lpPipeInst->bSuccess = FALSE;
    m_CriticalSectionQuit.Unlock();
}

        这个代码中的一些值得注意的设计:

  1. 在写完成例程中调用异步读,在读完成例程中调用异步写,从而实现同步双工。(特别注意不要在完成例程中的异步操作后WaitforXXEX,否则会出现严重的递归问题,最后内存耗尽,程序挂掉)
  2. 对每一个接入,都分配一个不可移动的内存,其第一个元素设置成OVERLAPPED结构对象,同时让这个结构对象就是异步操作和完成例程中都会使用的那个参数。如异步操作
代码语言:javascript
复制
BOOL WINAPI ReadFileEx(
  __in       HANDLE hFile,
  __out_opt  LPVOID lpBuffer,
  __in       DWORD nNumberOfBytesToRead,
  __inout    LPOVERLAPPED lpOverlapped,
  __in_opt   LPOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine
);

        完成例程

代码语言:javascript
复制
VOID CALLBACK FileIOCompletionRoutine(
  __in     DWORD dwErrorCode,
  __in     DWORD dwNumberOfBytesTransfered,
  __inout  LPOVERLAPPED lpOverlapped
);

        这样设计,就可以达到一个很重要的目的:在完成例程中获取“读/写”的数据。         对应的工程地址是:CommunicatePipe工程

(转载请指明出处)

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2011年11月22日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档