muduo网络库学习之EventLoop(二):进程(线程)wait/notify 和 EventLoop::runInLoop

1、进程(线程)wait/notify

pipe socketpair eventfd

eventfd 是一个比 pipe 更高效的线程间事件通知机制,一方面它比 pipe 少用一个 file descripor,节省了资源;另一方面,eventfd 的缓冲区管理也简单得多,全部“buffer” 只有定长8 bytes,不像 pipe 那样可能有不定长的真正 buffer。

// 该函数可以跨线程调用
void EventLoop::quit()
{
    quit_ = true;
    if (!isInLoopThread())
    {
        wakeup();
    }
}

如果不是当前IO线程调用quit,则需要唤醒(wakeup())当前IO线程,因为它可能还阻塞在poll的位置(EventLoop::loop()),这样再次循环判断 while (!quit_) 才能退出循环。

一般情况下如果没有调用quit(),poll没有事件发生,也会超时返回(默认10s),但会继续循环。

时序分析:

构造一个EventLoop对象,构造函数初始化列表,构造poller_, timeQueue_, wakeupFd_, wakeupChannel_ 等成员,在函数体中:

wakeupChannel_->setReadCallback(
    boost::bind(&EventLoop::handleRead, this));
// we are always reading the wakeupfd
wakeupChannel_->enableReading();

调用Channel::setReadCallback 注册wakeupChannel_ 的回调函数为EventLoop::handleRead, 调用Channel::enableReading(); 接着调用Channel::update(); 进而调用EventLoop::UpdataChannel(); 最后调用

Poller::updataChannel();(虚函数,具体由子类实现),此函数内添加一个新channel或者

更新此channel关注的事件,现在是将wakeupChannel_ 添加进PollPoller::channels_(假设Poller类用PollerPoller类实现) 中,并使用wakeupChannel_.fd_ 和 wakeupChannel_.events_ 构造一个struct pollfd, 并压入pollfds_; 以后将关注wakeupChannel_ (wakeupFd_) 的可读事件。

可以联想到的是当有多个socket 连接上来时,会存在多个channel,每个channel可以注册自己感兴趣的可读/可写事件的回调函数,并enableReading/Wirting,当然也可以disable Read/Write.

事件循环开始EventLoop::loop(),内部调用poll()(这里假设调用的是PollPoller::poll(), 内部调用::poll())。::poll() 阻塞返回即事件发生,如timerfd_超时可读; socket 有数据可读/可写; 非IO线程调用EventLoop:quit(), 进而调用wakeup(),非IO线程往wakeupFd_ 中write 入8个字节数据,此时wakeupFd_可读。现在假设是wakeupFd_ 可读,PollPoller::poll()调用PollPoller::fillActiveChannels()(虚函数), 函数内使用(struct pollfd).revents 设置此channel的revents_,然后将此channel 压入EventLoop::activeChannels_ 中后返回。PollPoller::poll() 返回,EventLoop::loop()中遍历activeChannels_,对每个活动channel调用Channel::handleEvent(),进而调用每个channel注册的读/写回调函数。

由上面分析可知,wakeupChannel_ 的回调函数为EventLoop::handleRead,函数内调用read 掉 wakeupFd_ 的数据,避免一直触发。

2、EventLoop::loop、runInLoop、queueInLoop、doPendingFunctors

EventLoop 有个pendingFunctors_ 成员:

typedef boost::function<void()> Functor;
std::vector<Functor> pendingFunctors_;

四个函数的流程图和实现如下:

 C++ Code 

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72

// 事件循环,该函数不能跨线程调用 // 只能在创建该对象的线程中调用 void EventLoop::loop() {// 断言当前处于创建该对象的线程中   assertInLoopThread();     while (!quit_)     {         pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);         eventHandling_ = true;         for (ChannelList::iterator it = activeChannels_.begin();                 it != activeChannels_.end(); ++it)         {             currentActiveChannel_ = *it;             currentActiveChannel_->handleEvent(pollReturnTime_);         }         currentActiveChannel_ = NULL;         eventHandling_ = false;         doPendingFunctors();     } } // 为了使IO线程在空闲时也能处理一些计算任务 // 在I/O线程中执行某个回调函数,该函数可以跨线程调用 void EventLoop::runInLoop(const Functor& cb) {   if (isInLoopThread())   {     // 如果是当前IO线程调用runInLoop,则同步调用cb     cb();   }   else   {     // 如果是其它线程调用runInLoop,则异步地将cb添加到队列,让IO线程处理     queueInLoop(cb);   } } void EventLoop::queueInLoop(const Functor& cb) {   {   MutexLockGuard lock(mutex_);   pendingFunctors_.push_back(cb);   }   // 调用queueInLoop的线程不是当前IO线程则需要唤醒当前IO线程,才能及时执行doPendingFunctors();   // 或者调用queueInLoop的线程是当前IO线程(比如在doPendingFunctors()中执行functors[i]() 时又调用了queueInLoop())   // 并且此时正在调用pending functor,需要唤醒当前IO线程   // 因为在此时doPendingFunctors() 过程中又添加了任务,故循环回去poll的时候需要被唤醒返回,进而继续执行doPendingFunctors()   // 只有当前IO线程的事件回调中调用queueInLoop才不需要唤醒  //  即在handleEvent()中调用queueInLoop 不需要唤醒,因为接下来马上就会执行doPendingFunctors();   if (!isInLoopThread() || callingPendingFunctors_)   {     wakeup();   } } // 该函数只会被当前IO线程调用 void EventLoop::doPendingFunctors() {   std::vector<Functor> functors;   callingPendingFunctors_ = true;   {   MutexLockGuard lock(mutex_);   functors.swap(pendingFunctors_);   }   for (size_t i = 0; i < functors.size(); ++i)   {     functors[i]();   }   callingPendingFunctors_ = false; }

这样,TimeQueue的两个公有成员函数都可以跨线程调用,因为即使是被非IO线程调用,也会放进Queue,然后让当前IO线程来执行:

// 该函数可以跨线程调用
TimerId TimerQueue::addTimer(const TimerCallback &cb,
                             Timestamp when,
                             double interval)
{
    Timer *timer = new Timer(cb, when, interval);

    loop_->runInLoop(   // addTimeInLoop 只能在当前IO线程调用
        boost::bind(&TimerQueue::addTimerInLoop, this, timer));

    return TimerId(timer, timer->sequence());
}
// 该函数可以跨线程调用
void TimerQueue::cancel(TimerId timerId)
{
    loop_->runInLoop(   // cancelInLoop 只能在当前IO线程调用
        boost::bind(&TimerQueue::cancelInLoop, this, timerId));
}

进而EventLoop类中的定时器操作函数 runAt, runAfter, runEvery, cancel 都可以跨线程调用,因为实现中调用了TimerQueue::addTimer 和 TimeQueue::cancel .

关于doPendingFunctors 的补充说明:

1、不是简单地在临界区内依次调用Functor,而是把回调列表swap到functors中,这样一方面减小了临界区的长度(意味着不会阻塞其它线程的queueInLoop()),另一方面,也避免了死锁(因为Functor可能再次调用queueInLoop())

2、由于doPendingFunctors()调用的Functor可能再次调用queueInLoop(cb),这时,queueInLoop()就必须wakeup(),否则新增的cb可能就不能及时调用了

3、muduo没有反复执行doPendingFunctors()直到pendingFunctors_为空而是每次poll 返回就执行一次,这是有意的,否则IO线程可能陷入死循环,无法处理IO事件。

测试代码:

#include <muduo/net/EventLoop.h>
//#include <muduo/net/EventLoopThread.h>
//#include <muduo/base/Thread.h>

#include <stdio.h>

using namespace muduo;
using namespace muduo::net;

EventLoop *g_loop;
int g_flag = 0;

void run4()
{
    printf("run4(): pid = %d, flag = %d\n", getpid(), g_flag);
    g_loop->quit();
}

void run3()
{
    printf("run3(): pid = %d, flag = %d\n", getpid(), g_flag);
    g_loop->runAfter(3, run4);
    g_flag = 3;
}

void run2()
{
    printf("run2(): pid = %d, flag = %d\n", getpid(), g_flag);
    g_loop->queueInLoop(run3);
}

void run1()
{
    g_flag = 1;
    printf("run1(): pid = %d, flag = %d\n", getpid(), g_flag);
    g_loop->runInLoop(run2);
    g_flag = 2;
}

int main()
{
    printf("main(): pid = %d, flag = %d\n", getpid(), g_flag);

    EventLoop loop;
    g_loop = &loop;

    loop.runAfter(2, run1);
    loop.loop();
    printf("main(): pid = %d, flag = %d\n", getpid(), g_flag);
}

执行结果如下:

simba@ubuntu:~/Documents/build/debug/bin$ ./reactor_test05 20131108 02:17:05.204800Z  2319 TRACE IgnoreSigPipe Ignore SIGPIPE - EventLoop.cc:51 main(): pid = 2319, flag = 0 20131108 02:17:05.207647Z  2319 TRACE updateChannel fd = 4 events = 3 - EPollPoller.cc:104 20131108 02:17:05.208332Z  2319 TRACE EventLoop EventLoop created 0xBF9382D4 in thread 2319 - EventLoop.cc:76 20131108 02:17:05.208746Z  2319 TRACE updateChannel fd = 5 events = 3 - EPollPoller.cc:104 20131108 02:17:05.209198Z  2319 TRACE loop EventLoop 0xBF9382D4 start looping - EventLoop.cc:108 20131108 02:17:07.209614Z  2319 TRACE poll 1 events happended - EPollPoller.cc:65 20131108 02:17:07.218039Z  2319 TRACE printActiveChannels {4: IN }  - EventLoop.cc:271 20131108 02:17:07.218162Z  2319 TRACE readTimerfd TimerQueue::handleRead() 1 at 1383877027.218074 - TimerQueue.cc:62 run1(): pid = 2319, flag = 1 run2(): pid = 2319, flag = 1 run3(): pid = 2319, flag = 2 20131108 02:17:10.218763Z  2319 TRACE poll 1 events happended - EPollPoller.cc:65 20131108 02:17:10.218841Z  2319 TRACE printActiveChannels {4: IN }  - EventLoop.cc:271 20131108 02:17:10.218860Z  2319 TRACE readTimerfd TimerQueue::handleRead() 1 at 1383877030.218854 - TimerQueue.cc:62 run4(): pid = 2319, flag = 3 20131108 02:17:10.218885Z  2319 TRACE loop EventLoop 0xBF9382D4 stop looping - EventLoop.cc:133 main(): pid = 2319, flag = 3 simba@ubuntu:~/Documents/build/debug/bin$ 

此程序是单线程,以后再举多线程的例子。由前面文章可知,timerfd_ = 4,2s后执行定时器回调函数run1(),在run1()内

g_loop->runInLoop(run2); 由于是在当前IO线程,故马上执行run2(),此时flag 还是为1;在run2()内g_loop->queueInLoop(run3);

即把run3()添加到队列,run2()返回,继续g_flag=2,此时loop内已经处理完事件,执行doPendingFunctors(),就执行了run3(), 

run3()内设置另一个3s定时器,run3()执行完回到loop继续poll, 3s后超时执行run4(),此时flag=3。

参考:

《UNP》

muduo manual.pdf

《linux 多线程服务器编程:使用muduo c++网络库》

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏牛客网

今日头条/字节跳动 一、二面凉经(后端开发工程师)

视频面,约的下午三点,时间到了准时开始面试。一面结束后想着已经挂了,没想到状态变成了等待二面,二面完没多久通知面试结束,不通过。

142
来自专栏扎心了老铁

python concurrent.futures

python因为其全局解释器锁GIL而无法通过线程实现真正的平行计算。这个论断我们不展开,但是有个概念我们要说明,IO密集型 vs. 计算密集型。 IO密集型:...

3637
来自专栏月牙寂

[以太坊源代码分析] II. 数据的呈现和组织,缓存和更新

本文转载来源自:http://blog.csdn.net/teaspring/article/details/75390210 感谢原作者teaspring...

3877
来自专栏牛肉圆粉不加葱

Spark 内存管理的前世今生(下)

在《Spark 内存管理的前世今生(上)》中,我们介绍了 UnifiedMemoryManager 是如何管理内存的。然而,UnifiedMemoryManag...

732
来自专栏Java Edge

深入理解并发容器-ConcurrentHashMap(JDK8版本)1 概述3应用场景4 源码解析

2698
来自专栏Java3y

数据库面试题(开发者必看)

数据库常见面试题(开发者篇) ? ? 这里写图片描述 什么是存储过程?有哪些优缺点? 什么是存储过程?有哪些优缺点? 存储过程就像我们编程语言中的函数一样,封装...

3995
来自专栏牛肉圆粉不加葱

揭开Spark Streaming神秘面纱⑤ - Block 的生成与存储

ReceiverSupervisorImpl共提供了4个将从 receiver 传递过来的数据转换成 block 并存储的方法,分别是:

602
来自专栏Elson的web征途

Promise 原理探究

你真的了解Promise吗?对我而言,除了知道如何使用then解决回调地狱以外,其他的还真的一知半解。虽然ES6的generator和ES7的async awa...

3445
来自专栏Golang语言社区

package sync

sync包提供了基本的同步基元,如互斥锁。除了Once和WaitGroup类型,大部分都是适用于低水平程序线程,高水平的同步使用channel通信更好一些。

922
来自专栏栗霖积跬步之旅

java多线程编程核心技术——第五章总结

定时器Timer的使用   在JDK中Timer类主要负责计划任务的功能,也就是在指定的时间开始执行某一个任务。 ?   Timer类的主要作用是设置计划任务,...

1849

扫码关注云+社区