muduo网络库学习之EventLoop(一):事件循环类图简介和muduo 定时器TimeQueue

1、EventLoop、Channel、Poller 等类图如下:

黑色菱形:组合;白色菱形:聚合;白色三角形:继承;实线:关联;

Channel是selectable IO channel,负责注册与响应IO 事件,它不拥有file descriptor。 Channel是Acceptor、Connector、EventLoop、TimerQueue、TcpConnection的成员。

一个EventLoop对象对应一个Poller成员对象,boost::scoped_ptr<Poller> poller_;

 //Poller是个抽象类,具体可以是EPollPoller(默认) 或者PollPoller

Poller类里面有三个纯虚函数,需要子类实现:

/// Polls the I/O events.
/// Must be called in the loop thread.
virtual Timestamp poll(int timeoutMs, ChannelList *activeChannels) = 0;

/// Changes the interested I/O events.
/// Must be called in the loop thread.
virtual void updateChannel(Channel *channel) = 0;

/// Remove the channel, when it destructs.
/// Must be called in the loop thread.
virtual void removeChannel(Channel *channel) = 0;

对于PollPoller来说,一个fd对应一个struct pollfd(pollfd.fd),一个fd 对应一个channel*;这个fd 可以是socket, eventfd, timerfd, signalfd; 如下:

typedef std::vector<struct pollfd> PollFdList;
typedef std::map<int, Channel *> ChannelMap;    // key是文件描述符,value是Channel*
PollFdList pollfds_;
ChannelMap channels_;

对于EPollPoller 来说,一个channel* 对应一个fd, 一个channel* 对应一个struct epoll_event(epoll_event.data.ptr)

typedef std::vector<struct epoll_event> EventList;
typedef std::map<int, Channel *> ChannelMap;
EventList events_;
ChannelMap channels_;

一个线程最多只能有一个EventLoop对象,这种线程被称为IO线程。一个EventLoop对象对应多个Channel对象,但只有wakeupChannel_生存期由EventLoop控制,  timerfdChannel_生存期由TimeQueue管理。

(boost::scoped_ptr<Channel> wakeupChannel_; // 纳入poller_来管理     int wakeupFd_;   // eventfd函数创建 )

其余以Channel* 方式管理,如下:

typedef std::vector<Channel *> ChannelList;
ChannelList activeChannels_;        // Poller返回的活动通道

下面是Channel 类简化:

 C++ Code 

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50

/// /// A selectable I/O channel. /// /// This class doesn't own the file descriptor. /// The file descriptor could be a socket, /// an eventfd, a timerfd, or a signalfd class Channel : boost::noncopyable { public:     typedef boost::function<void()> EventCallback;     typedef boost::function<void(Timestamp)> ReadEventCallback;     Channel(EventLoop *loop, int fd);     ~Channel();     void handleEvent(Timestamp receiveTime);     void setReadCallback(const ReadEventCallback &cb)     {         readCallback_ = cb;     }     void setWriteCallback(const EventCallback &cb)     {         writeCallback_ = cb;     }     void setCloseCallback(const EventCallback &cb)     {         closeCallback_ = cb;     }     void setErrorCallback(const EventCallback &cb)     {         errorCallback_ = cb;     }     void enableReading()     {         events_ |= kReadEvent;         update();     }     ............ private:     boost::weak_ptr<void> tie_;     const int  fd_;         // 文件描述符,但不负责关闭该文件描述符     int        events_;     // 关注的事件     int        revents_;        // poll/epoll返回的事件     int        index_;          // used by PollPoller.表示在poll的事件数组中的序号                                // used by EPollPoller. 表示某channel的状态(新创建,已关注,取消关注)     ReadEventCallback readCallback_;     EventCallback writeCallback_;     EventCallback closeCallback_;     EventCallback errorCallback_; };

#define POLLIN      0x0001
#define POLLPRI     0x0002
#define POLLOUT     0x0004
#define POLLERR     0x0008
#define POLLHUP     0x0010
#define POLLNVAL    0x0020

const int Channel::kNoneEvent = 0;
const int Channel::kReadEvent = POLLIN | POLLPRI;
const int Channel::kWriteEvent = POLLOUT;

2、定时函数选择 和 muduo 定时器

(1)、Linux 的计时函数,用于获得当前时间:

time(2) / time_t (秒) ftime(3) / struct timeb (毫秒)  gettimeofday(2) / struct timeval (微秒)  clock_gettime(2) / struct timespec (纳秒) gmtime / localtime / timegm / mktime / strftime / struct tm (这些与当前时间无关)

(2)、定时函数,用于让程序等待一段时间或安排计划任务:

sleep alarm usleep nanosleep clock_nanosleep getitimer / setitimer timer_create / timer_settime / timer_gettime / timer_delete timerfd_create / timerfd_gettime / timerfd_settime

取舍如下:

• (计时)只使用gettimeofday 来获取当前时间。

• (定时)只使用timerfd_* 系列函数来处理定时。

gettimeofday 入选原因:(这也是muduo::Timestamp class 的主要设计考虑)

1. time 的精度太低,ftime 已被废弃,clock_gettime 精度最高,但是它系统调用的开销比gettimeofday 大。 2. 在x86-64 平台上,gettimeofday 不是系统调用,而是在用户态实现的(搜vsyscall),没有上下文切换和陷入内核的开销。 3. gettimeofday 的分辨率(resolution) 是1 微秒,足以满足日常计时的需要。muduo::Timestamp 用一个int64_t 来表示从Epoch 到现在的微秒数,其范围可达上下30 万年。

timerfd_* 入选的原因:

These system calls create and operate on a timer that delivers timer expiration notifications via a file descriptor.

#include <sys/timerfd.h> int timerfd_create(int clockid, int flags);

// timerfd_create() creates a new timer object, and returns a file descriptor that refers to that timer. int timerfd_settime(int fd, int flags, const struct itimerspec *new_value, struct itimerspec *old_value); int timerfd_gettime(int fd, struct itimerspec *curr_value)

sleep / alarm / usleep 在实现时有可能用了信号 SIGALRM,在多线程程序中处理信号是个相当麻烦的事情,应当尽量避免 nanosleep 和 clock_nanosleep 是线程安全的,但是在非阻塞网络编程中,绝对不能用让线程挂起的方式来等待一段时间,程序会失去响应。正确的做法是注册一个时间回调函数。 getitimer 和 timer_create 也是用信号来 deliver 超时,在多线程程序中也会有麻烦。 timer_create 可以指定信号的接收方是进程还是线程,算是一个进步,不过在信号处理函数(signal handler)能做的事情实在很受限。 timerfd_create 把时间变成了一个文件描述符,该“文件”在定时器超时的那一刻变得可读,这样就能很方便地融入到 select/poll 框架中,用统一的方式来处理 IO 事件和超时事件,这也正是 Reactor 模式的长处。 传统的Reactor 利用select/poll/epoll 的timeout 来实现定时功能,但poll 和epoll 的定时精度只有毫秒,远低于timerfd_settime 的定时精度。

(3)、muduo的定时器由三个类实现,TimerId、Timer、TimerQueue,用户只能看到第一个类,其它两个都是内部实现细节

TimerId 只有两个成员,TimerId主要用于取消Timer:

/// An opaque identifier, for canceling Timer.
///
class TimerId : public muduo::copyable
{
    // default copy-ctor, dtor and assignment are okay

    friend class TimerQueue;

private:
    Timer *timer_;
    int64_t sequence_; //时钟序号
};

Timer 有多个数据成员,可以设置每个Timer超时的回调函数

///
/// Internal class for timer event.
///
class Timer : boost::noncopyable
{
 public:

  void run() const
  {
    callback_();
  }

 private:
  const TimerCallback callback_;        // 定时器回调函数
  Timestamp expiration_;                // 下一次的超时时刻
  const double interval_;               // 超时时间间隔,如果是一次性定时器,该值为0
  const bool repeat_;                   // 是否重复
  const int64_t sequence_;              // 定时器序号

  static AtomicInt64 s_numCreated_;     // 定时器计数,当前已经创建的定时器数量
};

TimerQueue的公有接口很简单,只有两个函数addTimer和cancel, TimerQueue 数据结构的选择,能快速根据当前时间找到已到期的定时器,也要高效的添加和删除Timer,因而可以用二叉搜索树,用map或者set.

lower_bound(x); 返回第一个>=x 的元素的iterator位置;upper_bound(); 返回第一个>x的元素的iterator位置。

RVO优化:在linux g++ 会优化,VC++ 在release 模式下会优化,即函数返回对象时不会调用拷贝函数。

///
/// A best efforts timer queue.
/// No guarantee that the callback will be on time.
///
class TimerQueue : boost::noncopyable
{
public:
    ///
    /// Schedules the callback to be run at given time,
    /// repeats if @c interval > 0.0.
    ///
    /// Must be thread safe. Usually be called from other threads.
    // 一定是线程安全的,可以跨线程调用。通常情况下被其它线程调用。
    TimerId addTimer(const TimerCallback &cb,
                     Timestamp when,
                     double interval);

    void cancel(TimerId timerId);

private:

    typedef std::pair<Timestamp, Timer *> Entry;
    typedef std::set<Entry> TimerList;

    EventLoop *loop_;       // 所属EventLoop
    const int timerfd_;  // timerfd_create 函数创建
    Channel timerfdChannel_;
    // Timer list sorted by expiration
    TimerList timers_;  // timers_是按到期时间排序
};

EventLoop类中的定时器操作函数:

TimerId EventLoop::runAt(const Timestamp &time, const TimerCallback &cb)
{
    return timerQueue_->addTimer(cb, time, 0.0);
}

TimerId EventLoop::runAfter(double delay, const TimerCallback &cb)
{
    Timestamp time(addTime(Timestamp::now(), delay));
    return runAt(time, cb);
}

TimerId EventLoop::runEvery(double interval, const TimerCallback &cb)
{
    Timestamp time(addTime(Timestamp::now(), interval));
    return timerQueue_->addTimer(cb, time, interval);
}

void EventLoop::cancel(TimerId timerId)
{
    return timerQueue_->cancel(timerId);
}

3、时序分析:

构造一个EventLoop对象,构造函数初始化列表,构造 timeQueue_ 成员 timerQueue_(new TimerQueue(this)),

调用TimeQueue 构造函数,函数内:

timerfdChannel_.setReadCallback(
    boost::bind(&TimerQueue::handleRead, this));
// we are always reading the timerfd, we disarm it with timerfd_settime.
timerfdChannel_.enableReading();

即注册timerfdChannel_的回调函数为TimerQueue::handleRead(), 并关注此channel 的可读事件。

TimerQueue 中有多个定时器,一次性的和重复的,事件循环开始EventLoop::loop(),当最早到期定时器超时时,poll() 返回timerfd_ 的可读事件(timerfdChannel_),调用Channel::handleEvent(),调用readCallback_(receiveTime); 进而调用Channel::setReadCallback 注册的TimerQueue::handleRead(), 在函数内先read  掉timerfd_数据,避免一直触发可读事件,接着遍历TimerQueue中此时所有超时的定时器,调用每个定时器构造时传递的回调函数。

测试程序:

#include <muduo/net/EventLoop.h>
//#include <muduo/net/EventLoopThread.h>
#include <muduo/base/Thread.h>

#include <boost/bind.hpp>

#include <stdio.h>
#include <unistd.h>

using namespace muduo;
using namespace muduo::net;

int cnt = 0;
EventLoop *g_loop;

void printTid()
{
    printf("pid = %d, tid = %d\n", getpid(), CurrentThread::tid());
    printf("now %s\n", Timestamp::now().toString().c_str());
}

void print(const char *msg)
{
    printf("msg %s %s\n", Timestamp::now().toString().c_str(), msg);
    if (++cnt == 20)
    {
        g_loop->quit();
    }
}

void cancel(TimerId timer)
{
    g_loop->cancel(timer);
    printf("cancelled at %s\n", Timestamp::now().toString().c_str());
}

int main()
{
    printTid();
    sleep(1);
    {
        EventLoop loop;
        g_loop = &loop;

        print("main");
        loop.runAfter(1, boost::bind(print, "once1"));
        loop.runAfter(1.5, boost::bind(print, "once1.5"));
        loop.runAfter(2.5, boost::bind(print, "once2.5"));
        loop.runAfter(3.5, boost::bind(print, "once3.5"));
        TimerId t45 = loop.runAfter(4.5, boost::bind(print, "once4.5"));
        loop.runAfter(4.2, boost::bind(cancel, t45));
        loop.runAfter(4.8, boost::bind(cancel, t45));
        loop.runEvery(2, boost::bind(print, "every2"));
        TimerId t3 = loop.runEvery(3, boost::bind(print, "every3"));
        loop.runAfter(9.001, boost::bind(cancel, t3));

        loop.loop();
        print("main loop exits");
    }

}

输出比较多,删除了一些重复的:

simba@ubuntu:~/Documents/build/debug/bin$ ./reactor_test04 20131107 13:46:35.850671Z  4042 TRACE IgnoreSigPipe Ignore SIGPIPE - EventLoop.cc:51 pid = 4042, tid = 4042 now 1383831995.852329 20131107 13:46:36.853813Z  4042 TRACE updateChannel fd = 4 events = 3 - EPollPoller.cc:104 20131107 13:46:36.854568Z  4042 TRACE EventLoop EventLoop created 0xBFB125F4 in thread 4042 - EventLoop.cc:76 20131107 13:46:36.855189Z  4042 TRACE updateChannel fd = 5 events = 3 - EPollPoller.cc:104 msg 1383831996.855730 main 20131107 13:46:36.856275Z  4042 TRACE loop EventLoop 0xBFB125F4 start looping - EventLoop.cc:108 20131107 13:46:37.856698Z  4042 TRACE poll 1 events happended - EPollPoller.cc:65 20131107 13:46:37.857372Z  4042 TRACE printActiveChannels {4: IN }  - EventLoop.cc:271 20131107 13:46:37.858261Z  4042 TRACE readTimerfd TimerQueue::handleRead() 1 at 1383831997.858215 - TimerQueue.cc:62 msg 1383831997.858568 once1 20131107 13:46:38.356775Z  4042 TRACE poll 1 events happended - EPollPoller.cc:65 20131107 13:46:38.356855Z  4042 TRACE printActiveChannels {4: IN}  - EventLoop.cc:271 20131107 13:46:38.356883Z  4042 TRACE readTimerfd TimerQueue::handleRead() 1 at 1383831998.356876 - TimerQueue.cc:62 msg 1383831998.356910 once1.5 msg 1383831998.856871 every2 msg 1383831999.356891 once2.5 msg 1383831999.856996 every3 msg 1383832000.356955 once3.5 msg 1383832000.857969 every2 cancelled at 1383832001.057005 cancelled at 1383832001.657036 msg 1383832002.858077 every3 msg 1383832002.858094 every2 msg 1383832004.859132 every2 cancelled at 1383832005.858189 msg 1383832005.858198 every3 msg 1383832006.860228 every2 msg 1383832008.861321 every2

....省略every2 msg 1383832020.867925 main loop exits

程序中设置了多次定时器,0,1,2文件描述符被标准输入输出占据,epollfd_ = 3(epoll_create1 创建), timerfd_ = 4, wakeupFd_ = 5(见这里), 可以看到每次定时时间到,timerfd_ 就会可读,执行定时器回调函数。4.5s的定时不会超时,因为还没到时间的时候已经被取消了; 间隔3s的定时只超时3次,因为9s后被取消了;间隔2s的超时执行20次后g_loop->quit(),loop.loop()循环中判断条件后退出事件循环。

参考:

《UNP》

muduo manual.pdf

《linux 多线程服务器编程:使用muduo c++网络库》

http://www.ibm.com/developerworks/cn/linux/l-cn-timers/

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏jianhuicode

Binder进程间通信详解

1844
来自专栏非著名程序员

我所理解的Intent 和Intent-filter

1.Intent Intent 是一个消息传递对象,可以使用它从其他应用组件请求操作。尽管 Intent 可以通过多种方式促进组件之间的通信,但其基本用例主要包...

1779
来自专栏光变

Java使用slf4j输出日志

431
来自专栏奔跑的蛙牛技术博客

并发知识5

锁和条件不能解决线程中的所有问题 账户1:200; 账户2:300; 线程1:从账户1转移300到账户2 线程2: 从账户2转移400到账户1 线程一和线...

632
来自专栏小灰灰

JDK容器学习之TreeMap (一) : 底层数据结构

TreeMap 在日常的工作中,相比较与HashMap而言,TreeMap的使用会少很多,即使在某些场景,需要使用到排序的Map时,也更多的是选择 Linke...

1789
来自专栏10km的专栏

fastjson:javabean按字段(field)序列化存储为Map并反序列化改进

需求说明 最近的项目应用到redis数据库,需要将java bean存储在redis数据库。因为需要对数据库中的某个字段进行修改,所以在redis上不能用简单的...

1728
来自专栏编码小白

ofbiz实体引擎(四) ModelReader的作用

public class ModelReader implements Serializable { public static final Stri...

2668
来自专栏武培轩的专栏

剑指Offer-求1+2+3+...+n

package Other; /** * 求1+2+3+...+n * 求1+2+3+...+n,要求不能使用乘除法、for、while、if、else、...

2964
来自专栏函数式编程语言及工具

Cats(1)- 从Free开始,Free cats

  cats是scala的一个新的函数式编程工具库,其设计原理基本继承了scalaz:大家都是haskell typeclass的scala版实现。当然,cat...

19510
来自专栏菩提树下的杨过

c#4.0中的动态编程

c#4.0中的dynamic早已不是新闻了,虽然内部用反射机制,略微会有一些性能上的额外开销,但是有些特殊场景还是很有用的,二害相权,取其轻吧(也正是因为这些动...

1788

扫码关注云+社区