(三)一个服务器程序的架构介绍

本文将介绍我曾经做过的一个项目的服务器架构和服务器编程的一些重要细节。

一、程序运行环境

操作系统:centos 7.0 编译器:gcc/g++ 4.8.3 cmake 2.8.11 mysql数据库:5.5.47 项目代码管理工具:VS2013

二、程序结构

该程序总共有17个线程,其中分为9个数据库工作线程D和一个日志线程L,6个普通工作线程W,一个主线程M。(以下会用这些字母来代指这些线程)

(一)、数据库工作线程的用途

9个数据库工作线程在线程启动之初,与mysql建立连接,也就是说每个线程都与mysql保持一路连接,共9个数据库连接。 每个数据库工作线程同时存在两个任务队列,第一个队列A存放需要执行数据库增删查改操作的任务sqlTask,第二个队列B存放sqlTask执行完成后的结果。sqlTask执行完成后立即放入结果队列中,因而结果队列中任务也是一个个的需要执行的任务。大致伪代码如下:

 1void db_thread_func()  {  
 2    while (!m_bExit)  
 3    {  
 4        if (NULL != (pTask = m_sqlTask.Pop()))  
 5        {  
 6            //从m_sqlTask中取出的任务先执行完成后,pTask将携带结果数据  
 7            pTask->Execute();              
 8            //得到结果后,立刻将该任务放入结果任务队列  
 9            m_resultTask.Push(pTask);  
10            continue;  
11        }  
12        sleep(1000);  
13    }  } 

现在的问题来了:

任务队列A中的任务从何而来,目前只有消费者,没有生产者,那么生产者是谁?

任务队列B中的任务将去何方,目前只有生产者没有消费者。 这两个问题先放一会儿,等到后面我再来回答。

(二)工作线程和主线程

在介绍主线程和工作线程具体做什么时,我们介绍下服务器编程中常常抽象出来的几个概念(这里以tcp连接为例):

TcpServer 即Tcp服务,服务器需要绑定ip地址和端口号,并在该端口号上侦听客户端的连接(往往由一个成员变量TcpListener来管理侦听细节)。所以一个TcpServer要做的就是这些工作。除此之外,每当有新连接到来时,TcpServer需要接收新连接,当多个新连接存在时,TcpServer需要有条不紊地管理这些连接:连接的建立、断开等,即产生和管理下文中说的TcpConnection对象。

一个连接对应一个TcpConnection对象,TcpConnection对象管理着这个连接的一些信息:如连接状态、本端和对端的ip地址和端口号等。

数据通道对象Channel,Channel记录了socket的句柄,因而是一个连接上执行数据收发的真正执行者,Channel对象一般作为TcpConnection的成员变量。

TcpSession对象,是将Channel收取的数据进行解包,或者对准备好的数据进行装包,并传给Channel发送。

归纳起来:

一个TcpServer依靠TcpListener对新连接的侦听和处理,依靠TcpConnection对象对连接上的数据进行管理,TcpConnection实际依靠Channel对数据进行收发,依靠TcpSession对数据进行装包和解包。也就是说一个TcpServer存在一个TcpListener,对应多个TcpConnection,有几个TcpConnection就有几个TcpSession,同时也就有几个Channel。

以上说的TcpServer、TcpListener、TcpConnection、Channel和TcpSession是服务器框架的网络层。一个好的网络框架,应该做到与业务代码脱耦。即上层代码只需要拿到数据,执行业务逻辑,而不用关注数据的收发和网络数据包的封包和解包以及网络状态的变化(比如网络断开与重连)。

拿数据的发送来说: 当业务逻辑将数据交给TcpSession,TcpSession将数据装好包后(装包过程后可以有一些加密或压缩操作),交给TcpConnection::SendData(),而TcpConnection::SendData()实际是调用Channel::SendData(),因为Channel含有socket句柄,所以Channel::SendData()真正调用send()/sendto()/write()方法将数据发出去。

对于数据的接收,稍微有一点不同: 通过select()/poll()/epoll()等IO multiplex技术,确定好了哪些TcpConnection上有数据到来后,激活该TcpConnection的Channel对象去调用recv()/recvfrom()/read()来收取数据。数据收到以后,将数据交由TcpSession来处理,最终交给业务层。注意数据收取、解包乃至交给业务层是一定要分开的。我的意思是:最好不要解包并交给业务层和数据收取的逻辑放在一起。因为数据收取是IO操作,而解包和交给业务层是逻辑计算操作。IO操作一般比逻辑计算要慢。到底如何安排要根据服务器业务来取舍,也就是说你要想好你的服务器程序的性能瓶颈在网络IO还是逻辑计算,即使是网络IO,也可以分为上行操作和下行操作,上行操作即客户端发数据给服务器,下行即服务器发数据给客户端。有时候数据上行少,下行大。(如游戏服务器,一个npc移动了位置,上行是该客户端通知服务器自己最新位置,而下行确是服务器要告诉在场的每个客户端)。

在我的文章《服务器端编程心得(一)—— 主线程与工作线程的分工》中介绍了,工作线程的流程:

1while (!m_bQuit)  
2{  
3    epoll_or_select_func();  
4    handle_io_events();  
5    handle_other_things();  
6} 

其中epoll_or_select_func()即是上文所说的通过select()/poll()/epoll()等IO multiplex技术,确定好了哪些TcpConnection上有数据到来。我的服务器代码中一般只会监测socket可读事件,而不会监测socket可写事件。至于如何发数据,文章后面会介绍。

所以对于可读事件,以epoll为例,这里需要设置的标识位是:

EPOLLIN 普通可读事件(当连接正常时,产生这个事件,recv()/read()函数返回收到的字节数;当连接关闭,这两个函数返回0,也就是说我们设置这个标识已经可以监测到新来数据和对端关闭事件)

EPOLLRDHUP 对端关闭事件(linux man手册上说这个事件可以监测对端关闭,但我实际调试时发送即使对端关闭也没触发这个事件,仍然是EPOLLIN,只不过此时调用recv()/read()函数,返回值会为0,所以实际项目中是否可以通过设置这个标识来监测对端关闭,仍然待考证)EPOLLPRI 带外数据

muduo里面将epoll_wait的超时事件设置为1毫秒,我的另一个项目将epoll_wait超时时间设置为10毫秒。这两个数值供大家参考。

这个项目中,工作线程和主线程都是上文代码中的逻辑,主线程监听侦听socket上的可读事件,也就是监测是否有新连接来了。主线程和每个工作线程上都存在一个epollfd。如果新连接来了,则在主线程的handle_io_events()中接收新连接。产生的新连接的socket句柄挂接到哪个线程的epollfd上呢?这里采取的做法是round-robin算法,即存在一个对象CWorkerThreadManager记录了各个工作线程上工作状态。伪码大致如下:

1void attach_new_fd(int newsocketfd)  
2{  
3    workerthread = get_next_worker_thread(next);  
4    workerthread.attach_to_epollfd(newsocketfd);  
5    ++next;  
6    if (next > max_worker_thread_num)  
7        next = 0;  
8}

即先从第一个工作线程的epollfd开始挂接新来socket,接着累加索引,这样下次就是第二个工作线程了。如果所以超出工作线程数目,则从第一个工作重新开始。这里解决了新连接socket“负载均衡”的问题。在实际代码中还有个需要注意的细节就是:epoll_wait的函数中的struct epoll_event 数量开始到底要设置多少个才合理?存在的顾虑是,多了浪费,少了不够用,我在曾经一个项目中直接用的是4096:

1const int EPOLL_MAX_EVENTS = 4096;  
2const int dwSelectTimeout = 10000;  
3struct epoll_event events[EPOLL_MAX_EVENTS];  
4int nfds = epoll_wait(m_fdEpoll, events,
5                      EPOLL_MAX_EVENTS, 
6                      dwSelectTimeout / 1000); 

我在陈硕的muduo网络库中发现作者才用了一个比较好的思路,即动态扩张数量:开始是n个,当发现有事件的fd数量已经到达n个后,将struct epoll_event数量调整成2n个,下次如果还不够,则变成4n个,以此类推,作者巧妙地利用stl::vector在内存中的连续性来实现了这种思路:

 1//初始化代码  
 2std::vector<struct epoll_event> events_(16);  
 3//线程循环里面的代码  
 4while (m_bExit)  
 5{  
 6    int numEvents = ::epoll_wait(epollfd_, 
 7                                 &*events_.begin(),
 8                                 static_cast<int>(events_.size()), 1);  
 9    if (numEvents > 0)  
10    {  
11        if (static_cast<size_t>(numEvents) == events_.size())  
12        {  
13            events_.resize(events_.size() * 2);  
14        }  
15    }  
16} 

读到这里,你可能觉得工作线程所做的工作也不过就是调用handle_io_events()来接收网络数据,其实不然,工作线程也可以做程序业务逻辑上的一些工作。也就是在handle_other_things()里面。那如何将这些工作加到handle_other_things()中去做呢?写一个队列,任务先放入队列,再让handle_other_things()从队列中取出来做?我在该项目中也借鉴了muduo库的做法。

即handle_other_things()中调用一系列函数指针,伪码如下:

 1void do_other_things()  
 2{  
 3    somefunc();  
 4}  
 5//m_functors是一个stl::vector,其中每一个元素为一个函数指针  
 6void somefunc()  
 7{  
 8    for (size_t i = 0; i < m_functors.size(); ++i)  
 9    {  
10        m_functors[i]();  
11    }  
12    m_functors.clear();  
13}  

当任务产生时,只要我们将执行任务的函数push_back到m_functors这个stl::vector对象中即可。但是问题来了,如果是其他线程产生的任务,两个线程同时操作m_functors,必然要加锁,这也会影响效率。

muduo是这样做的:

 1void add_task(const Functor& cb)  
 2{  
 3    std::unique_lock<std::mutex> lock(mutex_);  
 4    m_functors.push_back(cb);     
 5}  
 6void do_task()  
 7{  
 8    std::vector<Functor> functors;  
 9    {  
10        std::unique_lock<std::mutex> lock(mutex_);  
11        functors.swap(m_functors);  
12    }  
13    for (size_t i = 0; i < functors.size(); ++i)  
14    {  
15        functors[i]();  
16    }  
17} 

看到没有,利用一个栈变量functors将m_functors中的任务函数指针倒换(swap)过来了,这样大大减小了对m_functors操作时的加锁粒度。前后变化:变化前,相当于原来A给B多少东西,B消耗多少,A给的时候,B不能消耗;B消耗的时候A不能给。现在变成A将东西放到篮子里面去,B从篮子里面拿,B如果拿去一部分后,只有消耗完了才会来拿,或者A通知B去篮子里面拿,而B忙碌时,A是不会通知B来拿,这个时候A只管将东西放在篮子里面就可以了。

 1bool bBusy = false;  
 2void add_task(const Functor& cb)  
 3{  
 4    std::unique_lock<std::mutex> lock(mutex_);  
 5    m_functors_.push_back(cb);  
 6    //B不忙碌时只管往篮子里面加,不要通知B  
 7    if (!bBusy)  
 8    {  
 9        wakeup_to_do_task();  
10    }  
11}  
12void do_task()  
13{  
14    bBusy = true;  
15    std::vector<Functor> functors;  
16    {  
17        std::unique_lock<std::mutex> lock(mutex_);  
18        functors.swap(pendingFunctors_);  
19    }  
20    for (size_t i = 0; i < functors.size(); ++i)  
21    {  
22        functors[i]();  
23    }  
24    bBusy = false;  
25}

看,多巧妙的做法!

因为每个工作线程都存在一个m_functors,现在问题来了,如何将产生的任务均衡地分配给每个工作线程。这个做法类似上文中如何将新连接的socket句柄挂载到工作线程的epollfd上,也是round-robin算法。上文已经描述,此处不再赘述。

还有种情况,就是希望任务产生时,工作线程能够立马执行这些任务,而不是等epoll_wait超时返回之后。这个时候的做法,就是使用一些技巧唤醒epoll_wait,linux系统可以使用socketpair或timerevent、eventfd等技巧(这个细节在我的博文《服务器端编程心得(一)—— 主线程与工作线程的分工》已经详细介绍过了)。

上文中留下三个问题:

1.数据库线程任务队列A中的任务从何而来,目前只有消费者,没有生产者,那么生产者是谁?

2.数据库线程任务队列B中的任务将去何方,目前只有生产者没有消费者。

3.业务层的数据如何发送出去?

问题1的答案是:业务层产生任务可能会交给数据库任务队列A,这里的业务层代码可能就是工作线程中do_other_things()函数执行体中的调用。至于交给这个9个数据库线程的哪一个的任务队列,同样采用了round-robin算法。所以就存在一个对象CDbThreadManager来管理这九个数据库线程。

下面的伪码是向数据库工作线程中加入任务:

1bool CDbThreadManager::AddTask(IMysqlTask* poTask )  
2{  
3    if (m_index >= m_dwThreadsCount)  
4    {  
5        m_index = 0;  
6    }  
7    return m_aoMysqlThreads[m_index++].AddTask(poTask);  
8} 

同理问题2中的消费者也可能就是do_other_things()函数执行体中的调用。 现在来说问题3,业务层的数据产生后,经过TcpSession装包后,需要发送的话,产生任务丢给工作线程的do_other_things(),然后在相关的Channel里面发送,因为没有监测该socket上的可写事件,所以该数据可能调用send()或者write()时会阻塞,没关系,sleep()一会儿,继续发送,一直尝试,到数据发出去。伪码如下:

 1bool Channel::Send()  
 2{  
 3    int offset = 0;  
 4    while (true)  
 5    {  
 6        int n = ::send(socketfd,
 7                       buf + offset, length - offset);  
 8        if (n == -1)  
 9        {  
10            if (errno == EWOULDBLOCK)  
11            {  
12                ::sleep(100);  
13                continue;  
14            }  
15        }  
16        //对方关闭了socket,这端建议也关闭  
17        else if (n == 0)  
18        {  
19            close(socketfd);  
20            return false;  
21        }  
22        offset += n;  
23        if (offset >= length)  
24            break;  
25    }  
26    return true;      
27}

最后,还有一个日志线程没有介绍,高性能的日志实现方案目前并不常见。限于文章篇幅,下次再介绍。

原文发布于微信公众号 - 高性能服务器开发(easyserverdev)

原文发表时间:2018-04-06

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏高性能服务器开发

(三)一个服务器程序的架构介绍

本文将介绍我曾经做过的一个项目的服务器架构和服务器编程的一些重要细节。 一、程序运行环境 操作系统:centos 7.0 编译器:gcc/g++ 4.8.3 c...

3707
来自专栏Coding01

学习 Lumen 用户认证 (二) —— 使用 jwt-auth 插件

通过上一篇《学习 Lumen 用户认证 (一)》https://mp.weixin.qq.com/s/KVUQE2DUetNB2kqxHs0VDg的学习,大致懂...

2064
来自专栏张善友的专栏

[腾讯社区开放平台]介绍开放授权协议-OAuth

OAuth (开放授权) 是一个开放标准,允许用户授权第三方网站访问他们存储在另外的服务提供者上的信息,而不需要将用户名和密码提供给第三方网站或分享他们数据的所...

2547
来自专栏程序员宝库

从零开始写 PHP 扩展

PHP 是用 C 语言写的。对于每个 PHPer 来说,都有着内心的一种希望写扩展的冲动了吧。然而,缺乏一个很好的切入点。Google 上搜 PHP 扩展开发,...

3417
来自专栏信安之路

ourphp 前台注册登入前台某用戶

在function\api\ourphpuser\ourphp_system.php该文件存在用户login函数

1180
来自专栏琯琯博客

awesome-php-cn软件资源

PHP 资源列表,内容包括:库、框架、模板、安全、代码分析、日志、第三方库、配置工具、Web 工具、书籍、电子书、经典博文等。 依赖管理 依赖和包管理库 Com...

4305
来自专栏北京马哥教育

腾讯反病毒实验室为你揭秘Xshell软件后门技术!

背景: 最近,XShell远程终端工具发现被加入了恶意代码,目前官方就此事也做出了回应,要求使用者尽快下载最新版本。腾讯安全反病毒实验室就此跟进分析,对此次带...

3096
来自专栏ChaMd5安全团队

Flare-On 2018 writeup(下)

后门下载器加载库函数从ldr链表中遍历模块和模块函数,使用Hash来获取API定位,此后的API都用这种方法遍历,由于是病毒经典使用方法,可以搜到hash表,没...

1174
来自专栏玄魂工作室

Python黑帽编程1.5 使用Wireshark练习网络协议分析

1.5.0.1 本系列教程说明 本系列教程,采用的大纲母本为《Understanding Network Hacks Attack and Defense w...

36410
来自专栏Kirito的技术分享

浅析项目中的并发(二)

分布式遭遇并发 在前面的章节,并发操作要么发生在单个应用内,一般使用基于JVM的lock解决并发问题,要么发生在数据库,可以考虑使用数据库层面的锁,而在分布式...

39913

扫码关注云+社区

领取腾讯云代金券