muduo网络库学习之EventLoop(一):事件循环类图简介和muduo 定时器TimeQueue

1、EventLoop、Channel、Poller 等类图如下:

黑色菱形:组合;白色菱形:聚合;白色三角形:继承;实线:关联;

Channel是selectable IO channel,负责注册与响应IO 事件,它不拥有file descriptor。 Channel是Acceptor、Connector、EventLoop、TimerQueue、TcpConnection的成员。

一个EventLoop对象对应一个Poller成员对象,boost::scoped_ptr<Poller> poller_;

 //Poller是个抽象类,具体可以是EPollPoller(默认) 或者PollPoller

Poller类里面有三个纯虚函数,需要子类实现:

/// Polls the I/O events.
/// Must be called in the loop thread.
virtual Timestamp poll(int timeoutMs, ChannelList *activeChannels) = 0;

/// Changes the interested I/O events.
/// Must be called in the loop thread.
virtual void updateChannel(Channel *channel) = 0;

/// Remove the channel, when it destructs.
/// Must be called in the loop thread.
virtual void removeChannel(Channel *channel) = 0;

对于PollPoller来说,一个fd对应一个struct pollfd(pollfd.fd),一个fd 对应一个channel*;这个fd 可以是socket, eventfd, timerfd, signalfd; 如下:

typedef std::vector<struct pollfd> PollFdList;
typedef std::map<int, Channel *> ChannelMap;    // key是文件描述符,value是Channel*
PollFdList pollfds_;
ChannelMap channels_;

对于EPollPoller 来说,一个channel* 对应一个fd, 一个channel* 对应一个struct epoll_event(epoll_event.data.ptr)

typedef std::vector<struct epoll_event> EventList;
typedef std::map<int, Channel *> ChannelMap;
EventList events_;
ChannelMap channels_;

一个线程最多只能有一个EventLoop对象,这种线程被称为IO线程。一个EventLoop对象对应多个Channel对象,但只有wakeupChannel_生存期由EventLoop控制,  timerfdChannel_生存期由TimeQueue管理。

(boost::scoped_ptr<Channel> wakeupChannel_; // 纳入poller_来管理     int wakeupFd_;   // eventfd函数创建 )

其余以Channel* 方式管理,如下:

typedef std::vector<Channel *> ChannelList;
ChannelList activeChannels_;        // Poller返回的活动通道

下面是Channel 类简化:

 C++ Code 

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50

/// /// A selectable I/O channel. /// /// This class doesn't own the file descriptor. /// The file descriptor could be a socket, /// an eventfd, a timerfd, or a signalfd class Channel : boost::noncopyable { public:     typedef boost::function<void()> EventCallback;     typedef boost::function<void(Timestamp)> ReadEventCallback;     Channel(EventLoop *loop, int fd);     ~Channel();     void handleEvent(Timestamp receiveTime);     void setReadCallback(const ReadEventCallback &cb)     {         readCallback_ = cb;     }     void setWriteCallback(const EventCallback &cb)     {         writeCallback_ = cb;     }     void setCloseCallback(const EventCallback &cb)     {         closeCallback_ = cb;     }     void setErrorCallback(const EventCallback &cb)     {         errorCallback_ = cb;     }     void enableReading()     {         events_ |= kReadEvent;         update();     }     ............ private:     boost::weak_ptr<void> tie_;     const int  fd_;         // 文件描述符,但不负责关闭该文件描述符     int        events_;     // 关注的事件     int        revents_;        // poll/epoll返回的事件     int        index_;          // used by PollPoller.表示在poll的事件数组中的序号                                // used by EPollPoller. 表示某channel的状态(新创建,已关注,取消关注)     ReadEventCallback readCallback_;     EventCallback writeCallback_;     EventCallback closeCallback_;     EventCallback errorCallback_; };

#define POLLIN      0x0001
#define POLLPRI     0x0002
#define POLLOUT     0x0004
#define POLLERR     0x0008
#define POLLHUP     0x0010
#define POLLNVAL    0x0020

const int Channel::kNoneEvent = 0;
const int Channel::kReadEvent = POLLIN | POLLPRI;
const int Channel::kWriteEvent = POLLOUT;

2、定时函数选择 和 muduo 定时器

(1)、Linux 的计时函数,用于获得当前时间:

time(2) / time_t (秒) ftime(3) / struct timeb (毫秒)  gettimeofday(2) / struct timeval (微秒)  clock_gettime(2) / struct timespec (纳秒) gmtime / localtime / timegm / mktime / strftime / struct tm (这些与当前时间无关)

(2)、定时函数,用于让程序等待一段时间或安排计划任务:

sleep alarm usleep nanosleep clock_nanosleep getitimer / setitimer timer_create / timer_settime / timer_gettime / timer_delete timerfd_create / timerfd_gettime / timerfd_settime

取舍如下:

• (计时)只使用gettimeofday 来获取当前时间。

• (定时)只使用timerfd_* 系列函数来处理定时。

gettimeofday 入选原因:(这也是muduo::Timestamp class 的主要设计考虑)

1. time 的精度太低,ftime 已被废弃,clock_gettime 精度最高,但是它系统调用的开销比gettimeofday 大。 2. 在x86-64 平台上,gettimeofday 不是系统调用,而是在用户态实现的(搜vsyscall),没有上下文切换和陷入内核的开销。 3. gettimeofday 的分辨率(resolution) 是1 微秒,足以满足日常计时的需要。muduo::Timestamp 用一个int64_t 来表示从Epoch 到现在的微秒数,其范围可达上下30 万年。

timerfd_* 入选的原因:

These system calls create and operate on a timer that delivers timer expiration notifications via a file descriptor.

#include <sys/timerfd.h> int timerfd_create(int clockid, int flags);

// timerfd_create() creates a new timer object, and returns a file descriptor that refers to that timer. int timerfd_settime(int fd, int flags, const struct itimerspec *new_value, struct itimerspec *old_value); int timerfd_gettime(int fd, struct itimerspec *curr_value)

sleep / alarm / usleep 在实现时有可能用了信号 SIGALRM,在多线程程序中处理信号是个相当麻烦的事情,应当尽量避免 nanosleep 和 clock_nanosleep 是线程安全的,但是在非阻塞网络编程中,绝对不能用让线程挂起的方式来等待一段时间,程序会失去响应。正确的做法是注册一个时间回调函数。 getitimer 和 timer_create 也是用信号来 deliver 超时,在多线程程序中也会有麻烦。 timer_create 可以指定信号的接收方是进程还是线程,算是一个进步,不过在信号处理函数(signal handler)能做的事情实在很受限。 timerfd_create 把时间变成了一个文件描述符,该“文件”在定时器超时的那一刻变得可读,这样就能很方便地融入到 select/poll 框架中,用统一的方式来处理 IO 事件和超时事件,这也正是 Reactor 模式的长处。 传统的Reactor 利用select/poll/epoll 的timeout 来实现定时功能,但poll 和epoll 的定时精度只有毫秒,远低于timerfd_settime 的定时精度。

(3)、muduo的定时器由三个类实现,TimerId、Timer、TimerQueue,用户只能看到第一个类,其它两个都是内部实现细节

TimerId 只有两个成员,TimerId主要用于取消Timer:

/// An opaque identifier, for canceling Timer.
///
class TimerId : public muduo::copyable
{
    // default copy-ctor, dtor and assignment are okay

    friend class TimerQueue;

private:
    Timer *timer_;
    int64_t sequence_; //时钟序号
};

Timer 有多个数据成员,可以设置每个Timer超时的回调函数

///
/// Internal class for timer event.
///
class Timer : boost::noncopyable
{
 public:

  void run() const
  {
    callback_();
  }

 private:
  const TimerCallback callback_;        // 定时器回调函数
  Timestamp expiration_;                // 下一次的超时时刻
  const double interval_;               // 超时时间间隔,如果是一次性定时器,该值为0
  const bool repeat_;                   // 是否重复
  const int64_t sequence_;              // 定时器序号

  static AtomicInt64 s_numCreated_;     // 定时器计数,当前已经创建的定时器数量
};

TimerQueue的公有接口很简单,只有两个函数addTimer和cancel, TimerQueue 数据结构的选择,能快速根据当前时间找到已到期的定时器,也要高效的添加和删除Timer,因而可以用二叉搜索树,用map或者set.

lower_bound(x); 返回第一个>=x 的元素的iterator位置;upper_bound(); 返回第一个>x的元素的iterator位置。

RVO优化:在linux g++ 会优化,VC++ 在release 模式下会优化,即函数返回对象时不会调用拷贝函数。

///
/// A best efforts timer queue.
/// No guarantee that the callback will be on time.
///
class TimerQueue : boost::noncopyable
{
public:
    ///
    /// Schedules the callback to be run at given time,
    /// repeats if @c interval > 0.0.
    ///
    /// Must be thread safe. Usually be called from other threads.
    // 一定是线程安全的,可以跨线程调用。通常情况下被其它线程调用。
    TimerId addTimer(const TimerCallback &cb,
                     Timestamp when,
                     double interval);

    void cancel(TimerId timerId);

private:

    typedef std::pair<Timestamp, Timer *> Entry;
    typedef std::set<Entry> TimerList;

    EventLoop *loop_;       // 所属EventLoop
    const int timerfd_;  // timerfd_create 函数创建
    Channel timerfdChannel_;
    // Timer list sorted by expiration
    TimerList timers_;  // timers_是按到期时间排序
};

EventLoop类中的定时器操作函数:

TimerId EventLoop::runAt(const Timestamp &time, const TimerCallback &cb)
{
    return timerQueue_->addTimer(cb, time, 0.0);
}

TimerId EventLoop::runAfter(double delay, const TimerCallback &cb)
{
    Timestamp time(addTime(Timestamp::now(), delay));
    return runAt(time, cb);
}

TimerId EventLoop::runEvery(double interval, const TimerCallback &cb)
{
    Timestamp time(addTime(Timestamp::now(), interval));
    return timerQueue_->addTimer(cb, time, interval);
}

void EventLoop::cancel(TimerId timerId)
{
    return timerQueue_->cancel(timerId);
}

3、时序分析:

构造一个EventLoop对象,构造函数初始化列表,构造 timeQueue_ 成员 timerQueue_(new TimerQueue(this)),

调用TimeQueue 构造函数,函数内:

timerfdChannel_.setReadCallback(
    boost::bind(&TimerQueue::handleRead, this));
// we are always reading the timerfd, we disarm it with timerfd_settime.
timerfdChannel_.enableReading();

即注册timerfdChannel_的回调函数为TimerQueue::handleRead(), 并关注此channel 的可读事件。

TimerQueue 中有多个定时器,一次性的和重复的,事件循环开始EventLoop::loop(),当最早到期定时器超时时,poll() 返回timerfd_ 的可读事件(timerfdChannel_),调用Channel::handleEvent(),调用readCallback_(receiveTime); 进而调用Channel::setReadCallback 注册的TimerQueue::handleRead(), 在函数内先read  掉timerfd_数据,避免一直触发可读事件,接着遍历TimerQueue中此时所有超时的定时器,调用每个定时器构造时传递的回调函数。

测试程序:

#include <muduo/net/EventLoop.h>
//#include <muduo/net/EventLoopThread.h>
#include <muduo/base/Thread.h>

#include <boost/bind.hpp>

#include <stdio.h>
#include <unistd.h>

using namespace muduo;
using namespace muduo::net;

int cnt = 0;
EventLoop *g_loop;

void printTid()
{
    printf("pid = %d, tid = %d\n", getpid(), CurrentThread::tid());
    printf("now %s\n", Timestamp::now().toString().c_str());
}

void print(const char *msg)
{
    printf("msg %s %s\n", Timestamp::now().toString().c_str(), msg);
    if (++cnt == 20)
    {
        g_loop->quit();
    }
}

void cancel(TimerId timer)
{
    g_loop->cancel(timer);
    printf("cancelled at %s\n", Timestamp::now().toString().c_str());
}

int main()
{
    printTid();
    sleep(1);
    {
        EventLoop loop;
        g_loop = &loop;

        print("main");
        loop.runAfter(1, boost::bind(print, "once1"));
        loop.runAfter(1.5, boost::bind(print, "once1.5"));
        loop.runAfter(2.5, boost::bind(print, "once2.5"));
        loop.runAfter(3.5, boost::bind(print, "once3.5"));
        TimerId t45 = loop.runAfter(4.5, boost::bind(print, "once4.5"));
        loop.runAfter(4.2, boost::bind(cancel, t45));
        loop.runAfter(4.8, boost::bind(cancel, t45));
        loop.runEvery(2, boost::bind(print, "every2"));
        TimerId t3 = loop.runEvery(3, boost::bind(print, "every3"));
        loop.runAfter(9.001, boost::bind(cancel, t3));

        loop.loop();
        print("main loop exits");
    }

}

输出比较多,删除了一些重复的:

simba@ubuntu:~/Documents/build/debug/bin$ ./reactor_test04 20131107 13:46:35.850671Z  4042 TRACE IgnoreSigPipe Ignore SIGPIPE - EventLoop.cc:51 pid = 4042, tid = 4042 now 1383831995.852329 20131107 13:46:36.853813Z  4042 TRACE updateChannel fd = 4 events = 3 - EPollPoller.cc:104 20131107 13:46:36.854568Z  4042 TRACE EventLoop EventLoop created 0xBFB125F4 in thread 4042 - EventLoop.cc:76 20131107 13:46:36.855189Z  4042 TRACE updateChannel fd = 5 events = 3 - EPollPoller.cc:104 msg 1383831996.855730 main 20131107 13:46:36.856275Z  4042 TRACE loop EventLoop 0xBFB125F4 start looping - EventLoop.cc:108 20131107 13:46:37.856698Z  4042 TRACE poll 1 events happended - EPollPoller.cc:65 20131107 13:46:37.857372Z  4042 TRACE printActiveChannels {4: IN }  - EventLoop.cc:271 20131107 13:46:37.858261Z  4042 TRACE readTimerfd TimerQueue::handleRead() 1 at 1383831997.858215 - TimerQueue.cc:62 msg 1383831997.858568 once1 20131107 13:46:38.356775Z  4042 TRACE poll 1 events happended - EPollPoller.cc:65 20131107 13:46:38.356855Z  4042 TRACE printActiveChannels {4: IN}  - EventLoop.cc:271 20131107 13:46:38.356883Z  4042 TRACE readTimerfd TimerQueue::handleRead() 1 at 1383831998.356876 - TimerQueue.cc:62 msg 1383831998.356910 once1.5 msg 1383831998.856871 every2 msg 1383831999.356891 once2.5 msg 1383831999.856996 every3 msg 1383832000.356955 once3.5 msg 1383832000.857969 every2 cancelled at 1383832001.057005 cancelled at 1383832001.657036 msg 1383832002.858077 every3 msg 1383832002.858094 every2 msg 1383832004.859132 every2 cancelled at 1383832005.858189 msg 1383832005.858198 every3 msg 1383832006.860228 every2 msg 1383832008.861321 every2

....省略every2 msg 1383832020.867925 main loop exits

程序中设置了多次定时器,0,1,2文件描述符被标准输入输出占据,epollfd_ = 3(epoll_create1 创建), timerfd_ = 4, wakeupFd_ = 5(见这里), 可以看到每次定时时间到,timerfd_ 就会可读,执行定时器回调函数。4.5s的定时不会超时,因为还没到时间的时候已经被取消了; 间隔3s的定时只超时3次,因为9s后被取消了;间隔2s的超时执行20次后g_loop->quit(),loop.loop()循环中判断条件后退出事件循环。

参考:

《UNP》

muduo manual.pdf

《linux 多线程服务器编程:使用muduo c++网络库》

http://www.ibm.com/developerworks/cn/linux/l-cn-timers/

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏zcqshine's blog

SpringMVC下获取验证码

4308
来自专栏進无尽的文章

编码篇-学会小用宏和条件编译

宏定义在C系开发中可以说占有举足轻重的作用。底层框架自不必说,为了编译优化和方便,以及跨平台能力,宏被大量使用,可以说底层开发离开define将寸步难行。而在更...

1251
来自专栏jeremy的技术点滴

《Network Programming with Go》阅读重点备忘(一)

3397
来自专栏小鄧子的技术博客专栏

All RxJava - 为Retrofit添加重试

在我们的日常开发中离不开I/O操作,尤其是网络请求,但并不是所有的请求都是可信赖的,因此我们必须为APP添加请求重试功能。

1811
来自专栏屈定‘s Blog

造轮子--Excel报表工具

由于公司内部之前对于excel封装操作并不是很方便,而且对于特殊的需求不是很容易满足,这个月的任务是迁移部分业务小报表顺便重构下,因此这里造个轮子,便于导入和导...

1373
来自专栏java一日一条

50个常见的 Java 错误及避免方法(第三部分)

当我们尝试调用带有错误参数的Java代码时,通常会产生此Java错误消息(@ghacksnews):

1203
来自专栏何俊林

LeakCanary的原理,你知道么?

2022
来自专栏开发与安全

muduo网络库学习之muduo_inspect 库涉及到的类

muduo inspect 库通过HTTP方式为服务器提供监控接口, 现在只实现进程相关信息的监控,通过成员ProcessInspector 实现。 Pro...

1935
来自专栏刘望舒

LeakCanary看这一篇文章就够了

LeakCanary是Square公司基于MAT开源的一个内存泄漏检测工具,在发生内存泄漏的时候LeakCanary会自动显示泄漏信息。

5825
来自专栏恰同学骚年

ASP.Net请求处理机制初步探索之旅 - Part 5 ASP.Net MVC请求处理流程

开篇:上一篇我们了解了在WebForm模式下一个Page页面的生命周期,它经历了初始化Init、加载Load以及呈现Render三个重要阶段,其中构造了页面控件...

1163

扫码关注云+社区