前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >后台并发模型改进经验分享

后台并发模型改进经验分享

原创
作者头像
三棵老松
发布2019-08-26 13:02:35
5110
发布2019-08-26 13:02:35
举报

常见的多线程并发模型

异步IO多线程并发模型通常由监听线程组+工作线程组构成,监听线程负责接收新连接,然后把新连接转给工作线程。

这种模式的缺点:以“横向分工”方式来划分线程组的模式会造成线程或CPU负载不均衡,当处理大量TCP长连接时工作线程负载偏重,当应对大量短连接时监听线程负载偏重,为使线程和cpu负载不成为瓶颈,需要规则好监听线程和工作线程的平衡。

常见的多进程并发模型

进程并发模型,最熟悉的就是nginx了,效率很高。

这种模型是这样的:父进程负责监听端口,创建监听文件描述符(socket),fork创建多个子进程(工作进程),所有子进程都共享父进程的监听socket,接收新连接并处理。这种模型很好地解决了负载不均衡问题,并发很高。

但这种模式并不适合需要频繁开启/关闭监听端口的场景,熟悉linux系统进程关系的同学肯定知道,文件描述是属于进程空间的,父进程新创建的文件描述符对已存在的子进程来说是不可用的。因此,在这个模型中,每新开启一个监听端口,父进程为了使子进程能使用新创建的文件描述符,它就要通知子进子程退出,同时重新fork新的子进程,由于父进程先创建文件描述符后创建子进程,此时父进程的文件描述符对子进程是可用的。

并发模型的改进

此改进方案适合多线程和多进程,为了方便描述,暂定为多进程。

本模型类似nginx,各子进程是平等的,并不存在“横向分工”,但解决了频繁fork子进程问题。其过程是:父进程把新创建的监听文件描术符的“内存”副本发送给各子进程,子进程收到这份“内存”副本再重新恢复成自身进程空间的文件描述符,也就是说父进程发送的不是socket这个值 ,而这它对应的“内存”数据,通过这种方式很好地避免了重新fork子进程问题。

注意:只有多进程才需地发送socket内存副本,多线程直接发送socket值就行了。

怎样发送socket内存副本?这个过程是内核完成的,我们只使用sendmsg和recvmsg这两个接口就行了,这两个接口定义在系统头文件include/sys/socket.h,网上也有很多使用sendmsg和recvmsg的例子。

各进程需要创建一对socketpair通信通道,用作进程通信

代码语言:javascript
复制
    int fds[2] = {0};
    int ret = socketpair(AF_UNIX, SOCK_DGRAM, 0, fds);
    if (-1 == ret) {

        LOG_E("Create unix socket failed: " << strerror(errno));
        return EXCE_FAILE;
    }

    m_rfd = fds[0];
    m_wfd = fds[1];

    // 设置非阻塞
    if ((-1 == fcntl(m_rfd, F_SETFL, O_NONBLOCK))/* || (-1 == fcntl(m_wfd, F_SETFL, O_NONBLOCK))*/) {
        LOG_E("Unix socket, failed to call fcntl(O_NONBLOCK): " << strerror(errno));
        close(m_rfd);
        close(m_wfd);
        return EXCE_FAILE;
    }

父进程发送socket副本:

代码语言:javascript
复制
int Task::sendMsg(int fd, void *ptr, size_t nbytes, int sendfd, int flag) {
    struct msghdr msg;  
    struct iovec iov[1];  
    union {
      struct cmsghdr cm;  
      char control[CMSG_SPACE(sizeof(int))];  
    } control_un;

    msg.msg_control = 0;  
    msg.msg_controllen = 0;  

    if (sendfd > 0) {
        msg.msg_control = control_un.control;  
        msg.msg_controllen = sizeof(control_un.control);  
      
        struct cmsghdr  *cmptr;
        cmptr = CMSG_FIRSTHDR(&msg);  
        cmptr->cmsg_len = CMSG_LEN(sizeof(int));
        cmptr->cmsg_level = SOL_SOCKET;
        cmptr->cmsg_type = SCM_RIGHTS;  
        *((int*)CMSG_DATA(cmptr)) = sendfd;  
    }

    msg.msg_name = nullptr;  
    msg.msg_namelen = 0;  
  
    iov[0].iov_base = ptr;  
    iov[0].iov_len = nbytes;  
    msg.msg_iov = iov;  
    msg.msg_iovlen = 1;  
    return sendmsg(fd, &msg, flag);  
}

各子进程接收socket副本:

代码语言:javascript
复制
int Task::recvMsg(int sockfd, void *data, size_t size, int *fd, int flag) {  
    int num;  
    struct msghdr msg;  
    struct cmsghdr *cmsgptr;  
    struct iovec vec[1];  
    union {  
        struct cmsghdr cmsg;  
        char control[CMSG_SPACE(sizeof(int))];      
    } control_un;  
      
    msg.msg_control = control_un.control;  
    msg.msg_controllen = sizeof(control_un.control);  
      
    vec[0].iov_base = data;  
    vec[0].iov_len = size;  
      
    msg.msg_name = nullptr;  
    msg.msg_namelen = 0;  
    msg.msg_iov = vec;  
    msg.msg_iovlen = 1;     
      
    if ((num = recvmsg(sockfd, &msg, flag)) == -1){  
        return num;  
    }  
      
    if ((cmsgptr = CMSG_FIRSTHDR(&msg)) != nullptr && cmsgptr->cmsg_len == CMSG_LEN(sizeof(int))) {
        *fd = *((int *)CMSG_DATA(cmsgptr));  
    } else {
        *fd = -1;
    }  
      
    return num;  
} 

通过这两个接口可以把文件描述符发送给其它进程(不要求是父子关系)。

进程通信

最后说一下进程之间怎样收发消息和快速寻址,对异步IO模型, 每个进程都要分配一个消息通道(SOCK_DGRAM类型的socketpair),然后把这个消息通道加入到epoll_wait的监听列表中。

每个进程都维持着一个保存监听端口状态和连接状态的数组(context列表),消息通道的context固定放在首位,index为0

然后把context的index放到event.data.fd,加入到epoll_wait,这里要注意,赋值给event.data.fd的是index,不是socket

代码语言:javascript
复制
struct epoll_event event;
    bzero(&event, sizeof(event));
    event.events = EPOLLIN;
    event.data.fd = m_pipeCxt->getIndex();
    if (epoll_ctl(m_epollFd, EPOLL_CTL_ADD, m_rfd, &event) < 0) {
        LOG_E("Failed to call epoll_ctl: " << strerror(errno));
        return;
    }
    struct epoll_event* pEvents = new epoll_event[pollNumMax];
    while(true) {
        // 等待有事件发生, 第四个参数: -1是永久等待, 0是立即返回, 大于0值表示多少毫秒返回
        int iNfds = epoll_wait(m_epollFd, pEvents, pollNumMax, -1);
        if (iNfds == -1) {
            LOG_E("Failed to call epoll_wait " << strerror(errno));
            continue;
        }

        /* 处理所有事件 */
        for (int i = 0; i < iNfds; ++i) {
            if (pEvents[i].data.fd == 0) {
                msgProc();
            } else {
                epolleventhandle(pEvents[i].data.fd, pEvents[i].events);
            }
        }
    }

    delete []pEvents;

当进程收到事件知道时,从event.data.fd得到数组索引,快速从数组得到对应的连接上下文(context)。

以上介绍了并发模型的关键点, 本文就写到这,文笔有限,欢迎指正。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云服务器
云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档