muduo网络库学习之EventLoop(四):EventLoopThread 类、EventLoopThreadPool 类

1、EventLoopThread(IO线程类)

任何一个线程,只要创建并运行了EventLoop,都称之为IO线程

IO线程不一定是主线程

muduo并发模型one loop per thread + threadpool(计算线程池)

为了方便今后使用,定义了EventLoopThread类,该类封装了IO线程

EventLoopThread创建了一个线程 在线程函数中创建了一个EvenLoop对象并调用EventLoop::loop

多个IO线程可以用IO线程池来管理,对应的类是EventLoopThreadPool 

 C++ Code 
 class EventLoopThread : boost::noncopyable
{
public:
    typedef boost::function<void(EventLoop *)> ThreadInitCallback;

    EventLoopThread(const ThreadInitCallback &cb = ThreadInitCallback());
    ~EventLoopThread();

    EventLoop *startLoop()// 启动线程,该线程就成为了IO线程
    {
        thread_.start(); // 执行threadFunc(); 构造函数初始化列表中thread_(boost::bind(&EventLoopThread::threadFunc, this))
        ....
    };

private:
    void threadFunc();      // 线程函数

    EventLoop *loop_;           // loop_指针指向一个EventLoop对象
    bool exiting_;
    Thread thread_;
    MutexLock mutex_;
    Condition cond_;
    ThreadInitCallback callback_;       // 回调函数在EventLoop::loop事件循环之前被调用
};

EventLoopThread::~EventLoopThread()
{
    exiting_ = true;
    loop_->quit();      // 退出IO线程,让IO线程的loop循环退出,从而退出了IO线程
    thread_.join();
}

 C++ Code 
 	
EventLoop *EventLoopThread::startLoop()
{
    assert(!thread_.started());
    thread_.start(); //启动线程,此时有两个线程在运行,
    //一个是调用EventLoopThread::startLoop()的线程,一个是执行EventLoopThread::threadFunc()的线程(IO线程)
    {
        MutexLockGuard lock(mutex_);
        while (loop_ == NULL)
        {
            cond_.wait(); //函数返回loop_,所以要等待IO线程启动
        }
    }

    return loop_;
}

void EventLoopThread::threadFunc()
{
    EventLoop loop;

    if (callback_)
    {
        callback_(&loop);
    }

    {
        MutexLockGuard lock(mutex_);
        // 一般情况是EventLoopThread对象先析构,析构函数调用loop_->quit() 使得loop.loop() 退出循环
        // 这样threadFunc 退出,loop栈上对象析构,loop_ 指针失效,但此时已经不会再通过loop_ 访问loop,
        // 故不会有问题。
        loop_ = &loop;
        cond_.notify();
    }

    loop.loop();
    //assert(exiting_);
}

测试代码:

#include <muduo/net/EventLoop.h>
#include <muduo/net/EventLoopThread.h>

#include <stdio.h>

using namespace muduo;
using namespace muduo::net;

void runInThread()
{
    printf("runInThread(): pid = %d, tid = %d\n",
           getpid(), CurrentThread::tid());
}

int main()
{
    printf("main(): pid = %d, tid = %d\n",
           getpid(), CurrentThread::tid());

    EventLoopThread loopThread;
    EventLoop *loop = loopThread.startLoop();
    // 异步调用runInThread,即将runInThread添加到loop对象所在IO线程,让该IO线程执行
    loop->runInLoop(runInThread);
    sleep(1);
    // runAfter内部也调用了runInLoop,所以这里也是异步调用,让该IO线程添加一个2s定时器
    loop->runAfter(2, runInThread);
    sleep(3);
    //~EventLoopThread()会调用loop_->quit();

    printf("exit main().\n");
}

执行输出如下:

simba@ubuntu:~/Documents/build/debug/bin$ ./reactor_test06 20131108 03:29:12.749530Z  2628 TRACE IgnoreSigPipe Ignore SIGPIPE - EventLoop.cc:51 main(): pid = 2628, tid = 2628 20131108 03:29:12.753135Z  2629 TRACE updateChannel fd = 4 events = 3 - EPollPoller.cc:104 20131108 03:29:12.753794Z  2629 TRACE EventLoop EventLoop created 0xB7415F44 in thread 2629 - EventLoop.cc:76 20131108 03:29:12.754266Z  2629 TRACE updateChannel fd = 5 events = 3 - EPollPoller.cc:104 20131108 03:29:12.754707Z  2629 TRACE loop EventLoop 0xB7415F44 start looping - EventLoop.cc:108 20131108 03:29:12.755088Z  2629 TRACE poll 1 events happended - EPollPoller.cc:65 20131108 03:29:12.756033Z  2629 TRACE printActiveChannels {5: IN }  - EventLoop.cc:271 runInThread(): pid = 2628, tid = 2629 20131108 03:29:13.755730Z  2629 TRACE poll 1 events happended - EPollPoller.cc:65 20131108 03:29:13.756388Z  2629 TRACE printActiveChannels {5: IN }  - EventLoop.cc:271 20131108 03:29:15.755858Z  2629 TRACE poll 1 events happended - EPollPoller.cc:65 20131108 03:29:15.757316Z  2629 TRACE printActiveChannels {4: IN }  - EventLoop.cc:271 20131108 03:29:15.757469Z  2629 TRACE readTimerfd TimerQueue::handleRead() 1 at 1383881355.757345 - TimerQueue.cc:62 runInThread(): pid = 2628, tid = 2629 exit main(). 20131108 03:29:16.755942Z  2629 TRACE poll 1 events happended - EPollPoller.cc:65 20131108 03:29:16.755988Z  2629 TRACE printActiveChannels {5: IN }  - EventLoop.cc:271 20131108 03:29:16.756003Z  2629 TRACE loop EventLoop 0xB7415F44 stop looping - EventLoop.cc:133 simba@ubuntu:~/Documents/build/debug/bin$ 

主线程不是IO线程,根据前面的文章,timerfd_ = 4, wakeupFd_ = 5。主线程调用 loop->runInLoop(runInThread); 由于不是IO线

程调用runInLoop, 故调用queueInLoop() 将runInThead 添加到队列,然后wakeup() IO线程,IO线程在doPendingFunctors() 中取

出队列的runInThread()执行,可以看到IO线程的tid 跟主线程不一样。同理,loop->runAfter(2, runInThread); 也是一样的流程,需

要唤醒一下,此时只是执行runAfter() 添加了一个2s的定时器, 2s超时,timerfd_ 可读,先handleRead()一下然后执行回调函数

runInThread()。那为什么exit main() 之后wakeupFd_ 还会有可读事件呢?那是因为EventLoopThead 栈上对象析构,在析构函数内

loop_ ->quit(), 由于不是在IO线程调用quit(),故也需要唤醒一下,IO线程才能从poll 返回,这样再次循环判断 while (!quit_) 就能

退出IO线程。

2、EventLoopThreadPool(IO线程池类)

IO线程池的功能是开启若干个IO线程,并让这些IO线程处于事件循环的状态

class EventLoopThreadPool : boost::noncopyable
{
public:
    typedef boost::function<void(EventLoop *)> ThreadInitCallback;

    EventLoopThreadPool(EventLoop *baseLoop);
    ~EventLoopThreadPool();
    void setThreadNum(int numThreads)
    {
        numThreads_ = numThreads;
    }
    void start(const ThreadInitCallback &cb = ThreadInitCallback());

    // 如果loops_为空,则loop指向baseLoop_
    // 如果不为空,按照round-robin(RR,轮叫)的调度方式选择一个EventLoop
    EventLoop *getNextLoop();


private:

    EventLoop *baseLoop_;   // 与Acceptor所属EventLoop相同
    bool started_;
    int numThreads_;        // 线程数,除去mainReactor
    int next_;          // 新连接到来,所选择的EventLoop对象下标
    boost::ptr_vector<EventLoopThread> threads_;        // IO线程列表
    std::vector<EventLoop *> loops_;                    // EventLoop列表
};

void EventLoopThreadPool::start(const ThreadInitCallback &cb)
{
    assert(!started_);
    baseLoop_->assertInLoopThread();

    started_ = true;

    for (int i = 0; i < numThreads_; ++i)
    {
        EventLoopThread *t = new EventLoopThread(cb);
        threads_.push_back(t);
        loops_.push_back(t->startLoop());   // 启动EventLoopThread线程,在进入事件循环之前,会调用cb
    }
    if (numThreads_ == 0 && cb)
    {
        // 只有一个EventLoop,在这个EventLoop进入事件循环之前,调用cb
        cb(baseLoop_);
    }
}

现在使用 mainReactor + ThreadPool(subReactors) 模式,则baseLoop_ 与TcpServer 和 Acceptor 中的 loop_ 成员是相同的,即mainReactor 处理监听事件,已连接套接字事件轮询给线程池中的subReactors 处理。此时需要注意,创建一个TcpConnection对象时,需要绑定其中一个subReactor, 如下:

void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr)
{
    loop_->assertInLoopThread();
    // 按照轮叫的方式选择一个EventLoop
    EventLoop *ioLoop = threadPool_->getNextLoop();

    InetAddress localAddr(sockets::getLocalAddr(sockfd));

    TcpConnectionPtr conn(new TcpConnection(ioLoop,
                                            connName,
                                            sockfd,
                                            localAddr,
                                            peerAddr));

    connections_[connName] = conn;

    ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn));

}

测试程序:

#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>

#include <boost/bind.hpp>

#include <stdio.h>

using namespace muduo;
using namespace muduo::net;

class TestServer
{
public:
    TestServer(EventLoop *loop,
               const InetAddress &listenAddr, int numThreads)
        : loop_(loop),
          server_(loop, listenAddr, "TestServer"),
          numThreads_(numThreads)
    {
        server_.setConnectionCallback(
            boost::bind(&TestServer::onConnection, this, _1));
        server_.setMessageCallback(
            boost::bind(&TestServer::onMessage, this, _1, _2, _3));
        server_.setThreadNum(numThreads);
    }

    void start()
    {
        server_.start();
    }

private:
    void onConnection(const TcpConnectionPtr &conn)
    {
        if (conn->connected())
        {
            printf("onConnection(): new connection [%s] from %s\n",
                   conn->name().c_str(),
                   conn->peerAddress().toIpPort().c_str());
        }
        else
        {
            printf("onConnection(): connection [%s] is down\n",
                   conn->name().c_str());
        }
    }

    void onMessage(const TcpConnectionPtr &conn,
                   const char *data,
                   ssize_t len)
    {
        printf("onMessage(): received %zd bytes from connection [%s]\n",
               len, conn->name().c_str());
    }

    EventLoop *loop_;
    TcpServer server_;
    int numThreads_;
};


int main()
{
    printf("main(): pid = %d\n", getpid());

    InetAddress listenAddr(8888);
    EventLoop loop;

    TestServer server(&loop, listenAddr, 4);
    server.start();

    loop.loop();
}

此时共有5个IO线程,1个主线程(mainReactor)和 4个线程池中的线程(subReactor),server.start() 会启动线程池中的4个线程,并且启动mainReactor 监听:

// 该函数多次调用是无害的
// 该函数可以跨线程调用
void TcpServer::start()
{
    if (!started_)
    {
        started_ = true;
        threadPool_->start(threadInitCallback_);
    }

    if (!acceptor_->listenning())
    {
        // get_pointer返回原生指针
        loop_->runInLoop(
            boost::bind(&Acceptor::listen, get_pointer(acceptor_)));
    }
}

开启两个telnet 客户端连接服务器,其中一个输入aaaa, 另一个输入ddddd, 服务器端输出如下:

simba@ubuntu:~/Documents/build/debug/bin$ ./reactor_test10 main(): pid = 8628 20131108 11:33:15.190620Z  8628 TRACE updateChannel fd = 4 events = 3 - EPollPoller.cc:104 20131108 11:33:15.191246Z  8628 TRACE EventLoop EventLoop created 0xBFB77D50 in thread 8628 - EventLoop.cc:62 20131108 11:33:15.191568Z  8628 TRACE updateChannel fd = 5 events = 3 - EPollPoller.cc:104

20131108 11:33:15.192270Z  8629 TRACE updateChannel fd = 9 events = 3 - EPollPoller.cc:104 20131108 11:33:15.192625Z  8629 TRACE EventLoop EventLoop created 0xB7484F44 in thread 8629 - EventLoop.cc:62 20131108 11:33:15.192927Z  8629 TRACE updateChannel fd = 10 events = 3 - EPollPoller.cc:104

20131108 11:33:15.193356Z  8629 TRACE loop EventLoop 0xB7484F44 start looping - EventLoop.cc:94 20131108 11:33:15.193759Z  8630 TRACE updateChannel fd = 12 events = 3 - EPollPoller.cc:104 20131108 11:33:15.194100Z  8630 TRACE EventLoop EventLoop created 0xB6AFEF44 in thread 8630 - EventLoop.cc:62 20131108 11:33:15.194398Z  8630 TRACE updateChannel fd = 13 events = 3 - EPollPoller.cc:104

20131108 11:33:15.194786Z  8630 TRACE loop EventLoop 0xB6AFEF44 start looping - EventLoop.cc:94 20131108 11:33:15.195135Z  8631 TRACE updateChannel fd = 15 events = 3 - EPollPoller.cc:104 20131108 11:33:15.195534Z  8631 TRACE EventLoop EventLoop created 0xB60FEF44 in thread 8631 - EventLoop.cc:62 20131108 11:33:15.207467Z  8631 TRACE updateChannel fd = 16 events = 3 - EPollPoller.cc:104

20131108 11:33:15.208169Z  8631 TRACE loop EventLoop 0xB60FEF44 start looping - EventLoop.cc:94 20131108 11:33:15.208940Z  8632 TRACE updateChannel fd = 18 events = 3 - EPollPoller.cc:104 20131108 11:33:15.209576Z  8632 TRACE EventLoop EventLoop created 0xB58FDF44 in thread 8632 - EventLoop.cc:62 20131108 11:33:15.210087Z  8632 TRACE updateChannel fd = 19 events = 3 - EPollPoller.cc:104

20131108 11:33:15.210445Z  8628 TRACE updateChannel fd = 6 events = 3 - EPollPoller.cc:104 20131108 11:33:15.210750Z  8628 TRACE loop EventLoop 0xBFB77D50 start looping - EventLoop.cc:94 20131108 11:33:15.211122Z  8632 TRACE loop EventLoop 0xB58FDF44 start looping - EventLoop.cc:94 20131108 11:33:18.958878Z  8628 TRACE poll 1 events happended - EPollPoller.cc:65 20131108 11:33:18.959167Z  8628 TRACE printActiveChannels {6: IN}  - EventLoop.cc:257 20131108 11:33:18.959226Z  8628 INFO  TcpServer::newConnection [TestServer] - new connection[TestServer:0.0.0.0:8888#1] from 127.0.0.1:56411 - TcpServer.cc:93 20131108 11:33:18.959262Z  8628 DEBUG TcpConnection TcpConnection::ctor[TestServer:0.0.0.0:8888#1] at 0x8C84F98 fd=20 - TcpConnection.cc:62 20131108 11:33:18.959277Z  8628 TRACE newConnection [1] usecount=1 - TcpServer.cc:111 20131108 11:33:18.959300Z  8628 TRACE newConnection [2] usecount=2 - TcpServer.cc:113 20131108 11:33:18.959322Z  8628 TRACE newConnection [5] usecount=3 - TcpServer.cc:122 20131108 11:33:18.959343Z  8629 TRACE poll 1 events happended - EPollPoller.cc:65 20131108 11:33:18.959365Z  8629 TRACE printActiveChannels {10: IN }  - EventLoop.cc:257 20131108 11:33:18.959378Z  8629 TRACE connectEstablished [3] usecount=3 - TcpConnection.cc:78 20131108 11:33:18.959409Z  8629 TRACE updateChannel fd = 20 events = 3 - EPollPoller.cc:104 onConnection(): new connection [TestServer:0.0.0.0:8888#1] from 127.0.0.1:56411 20131108 11:33:18.959433Z  8629 TRACE connectEstablished [4] usecount=3 - TcpConnection.cc:83 20131108 11:33:23.111546Z  8628 TRACE poll 1 events happended - EPollPoller.cc:65 20131108 11:33:23.111628Z  8628 TRACE printActiveChannels {6: IN }  - EventLoop.cc:257 20131108 11:33:23.111662Z  8628 INFO  TcpServer::newConnection [TestServer] - new connection[TestServer:0.0.0.0:8888#2] from 127.0.0.1:56412 - TcpServer.cc:93 20131108 11:33:23.111680Z  8628 DEBUG TcpConnection TcpConnection::ctor[TestServer:0.0.0.0:8888#2] at 0x8C85128 fd=21 - TcpConnection.cc:62 20131108 11:33:23.111693Z  8628 TRACE newConnection [1] usecount=1 - TcpServer.cc:111 20131108 11:33:23.111722Z  8628 TRACE newConnection [2] usecount=2 - TcpServer.cc:113 20131108 11:33:23.111746Z  8628 TRACE newConnection [5] usecount=3 - TcpServer.cc:122 20131108 11:33:23.111769Z  8630 TRACE poll 1 events happended - EPollPoller.cc:65 20131108 11:33:23.111792Z  8630 TRACE printActiveChannels {13: IN}  - EventLoop.cc:257 20131108 11:33:23.111805Z  8630 TRACE connectEstablished [3] usecount=3 - TcpConnection.cc:78 20131108 11:33:23.111813Z  8630 TRACE updateChannel fd = 21 events = 3 - EPollPoller.cc:104 onConnection(): new connection [TestServer:0.0.0.0:8888#2] from 127.0.0.1:56412 20131108 11:33:23.111836Z  8630 TRACE connectEstablished [4] usecount=3 - TcpConnection.cc:83 20131108 11:33:25.219778Z  8631 TRACE poll  nothing happended - EPollPoller.cc:74 20131108 11:33:25.219829Z  8632 TRACE poll  nothing happended - EPollPoller.cc:74 20131108 11:33:28.969971Z  8629 TRACE poll  nothing happended - EPollPoller.cc:74 20131108 11:33:33.119151Z  8630 TRACE poll  nothing happended - EPollPoller.cc:74 20131108 11:33:33.119202Z  8628 TRACE poll  nothing happended - EPollPoller.cc:74 20131108 11:33:33.754975Z  8629 TRACE poll 1 events happended - EPollPoller.cc:65 20131108 11:33:33.755031Z  8629 TRACE printActiveChannels {20: IN}  - EventLoop.cc:257 20131108 11:33:33.755042Z  8629 TRACE handleEvent [6] usecount=2 - Channel.cc:67 onMessage(): received 6 bytes from connection [TestServer:0.0.0.0:8888#1] 20131108 11:33:33.755128Z  8629 TRACE handleEvent [12] usecount=2 - Channel.cc:69 20131108 11:33:35.230224Z  8631 TRACE poll  nothing happended - EPollPoller.cc:74 20131108 11:33:35.230274Z  8632 TRACE poll  nothing happended - EPollPoller.cc:74 20131108 11:33:36.540663Z  8630 TRACE poll 1 events happended - EPollPoller.cc:65 20131108 11:33:36.540715Z  8630 TRACE printActiveChannels {21: IN}  - EventLoop.cc:257 20131108 11:33:36.540727Z  8630 TRACE handleEvent [6] usecount=2 - Channel.cc:67 onMessage(): received 7 bytes from connection [TestServer:0.0.0.0:8888#2] 20131108 11:33:36.540780Z  8630 TRACE handleEvent [12] usecount=2 - Channel.cc:69 20131108 11:33:43.129769Z  8628 TRACE poll  nothing happended - EPollPoller.cc:74 20131108 11:33:43.765633Z  8629 TRACE poll  nothing happended - EPollPoller.cc:74

一个进程本来被打开的文件描述符就有0,1,2;

每个Reactor 的 EventLoop 对象构造时,默认使用的是EPollPoller,即EPollPoller::epollfd_ ;

此外还有两个channel(EventLoop::timeQueue_ ::timerfd_ 和 EventLoop::wakeupFd_ )

处于被poll()关注可读事件的状态,而且是一直关注直到事件循环结束。

即每个Reactor 都分别有这3个fd;

对于mainReactor来说,还有Acceptor::acceptSocket_.sockfd_ (listenfd);  Acceptor::idleFd_ ; (/dev/null)

按上述程序来说,mainReactor中:epollfd_ = 3; timerfd_ = 4; wakeupFd_ = 5; sockfd_ = 6; idleFd_ = 7;

(8,9,10),(11,12,13),(14,15,16),(17,18,19) 分别归4个IO线程所有

这样已连接套接字只能从20开始,而且均匀分配到4个subReactor 处理事件(可读事件(包括接收数据,连接关闭),可写事件(内核发送缓冲区不为满),错误事件)

当第一个客户端连接上来时,sockfd_ 可读,mainReactor 调用 TcpServer::newConnection(), 创建一个TcpConnection对象,绑定到线程池中的第一个IO线程上,函数内调用ioLoop->runInLoop(); 会唤醒第一个IO线程,即第一个IO线程的wakeupFd_ (10)可读,handleEvent() 处理后继续处理doPendingFunctors(),执行TcpConnection::connectEstablished(),接下去的流程包括接收数据(fd = 20 可读)可以参考EventLoop(三)的描述。

当然如果我们传递的numThreads_ = 0 或者不传递; 即只有一个mainReactor, 则监听套接字和已连接套接字事件都要由这个mainReactor处理。

参考:

《UNP》

muduo manual.pdf

《linux 多线程服务器编程:使用muduo c++网络库》

原文链接:https://cloud.tencent.com/developer/article/1119416

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏开发与安全

muduo网络库学习之EventLoop(四):EventLoopThread 类、EventLoopThreadPool 类

1、EventLoopThread(IO线程类) 任何一个线程,只要创建并运行了EventLoop,都称之为IO线程 IO线程不一定是主线程 muduo并发模型...

42760
来自专栏V站

PHP加密解密的解析

php做为一门当下非常流行的web语言,常常看到有人求解密php文件,想当年的asp也是一样。一些人不理解为什么要混淆(加密),甚至鄙视混淆(加密),在我看来混...

1.5K20
来自专栏一个会写诗的程序员的博客

【Kotlin 反应式编程】第1讲 你好,Reactive Programming

【Kotlin 反应式编程】第1讲 你好,Reactive Programming

9220
来自专栏酷玩时刻

微信支付之企业付款

付款之前需要充值: 在调用API接口付款或通过微信支付商户平台网页功能操作付款之前需要登录微信支付商户平台,通过网页充值功能充值(商户平台-交易中心)

19740
来自专栏Java与Android技术栈

Scrypt 不止是加密算法,也是莱特币的挖矿算法

Scrypt不仅计算所需时间长,而且占用的内存也多,使得并行计算多个摘要异常困难,因此利用rainbow table进行暴力攻击更加困难。Scrypt 没有在生...

14740
来自专栏世玉的专栏

利用 Jquery + css 自制无限极下拉分类

网上相关例子基本都是 ztree,然后个人去看了看官网,看了半天没找到合适又简单的,ztree,由于界面不适合项目里面的,要是修改他的样式我还可能出现更多的错误...

21110
来自专栏张善友的专栏

在.NET Core 里使用 BouncyCastle 的DES加密算法

.NET Core上面的DES等加密算法要等到1.2 才支持,我们可是急需这个算法的支持,文章《使用 JavaScriptService 在.NET Core ...

33970
来自专栏Java与Android技术栈

Transformer 在RxJava中的使用

Transformer,顾名思义是转换器的意思。早在 RxJava1.x 版本就有了Observable.Transformer、Single.Transfor...

96320
来自专栏安恒网络空间安全讲武堂

SharifCTF 2018 Crypto writeup

本文作者:HeartSky 之前的SharifCTF,其中密码学部分有许多有意思的题目,因此来分享下相关解题过程。 0x01 DES See known_p...

56070

.NET中的密钥加密

本教程将演示如何通过System.Security.Cryptography在.NET Framework 1.1中实现对称加密/密钥加密。

63280

扫码关注云+社区

领取腾讯云代金券