libev源码解析——定时器监视器和组织形式

        我们先看下定时器监视器的数据结构。(转载请指明出于breaksoftware的csdn博客)

/* invoked after a specific time, repeatable (based on monotonic clock) */
/* revent EV_TIMEOUT */
typedef struct ev_timer
{
  EV_WATCHER_TIME (ev_timer)

  ev_tstamp repeat; /* rw */
} ev_timer;

/* invoked at some specific time, possibly repeating at regular intervals (based on UTC) */
/* revent EV_PERIODIC */
typedef struct ev_periodic
{
  EV_WATCHER_TIME (ev_periodic)

  ev_tstamp offset; /* rw */
  ev_tstamp interval; /* rw */
  ev_tstamp (*reschedule_cb)(struct ev_periodic *w, ev_tstamp now) EV_THROW; /* rw */
} ev_periodic;

        ev_timer是相对时间定时器监视器,ev_periodic是绝对时间定时器监视器。可以注意到,这两个监视器结构开头都拥有近乎相同的成员变量,即EV_WATCHER_TIME宏中定义的那部分

#define EV_WATCHER_TIME(type)			\
  EV_WATCHER (type)				\
  ev_tstamp at;     /* private */

        为什么说“近乎相同”,是因为它们传递给EV_WATCHER宏的参数不同,而从会导致各自拥有一个名称不同的回调函数指针,而其他变量连名称都一样。

        对这两种监视器,libev并没有像《libev源码解析——监视器(watcher)结构和组织形式》文中所述,将这些监视器关联到文件描述符作为下标的anfds结构中。

        为什么不放在上述结构中?因为保存到anfds中的监视器,都是要求文件描述符对应的事件发生而被触发。而定时器是要求文件描述符对应的事件没有发生,通过等待超时而被触发(或者被其他无关事件触发,顺带执行)。所以将定时器监视器保存在这个结构中是没有用的。同时这也意味着,向pendings结构(《libev源码解析——调度策略》)中记录本次触发的监视器的数据来源并非只有anfds,还有定时器监视器相关的结构。

        那这个结构是什么呢?我们先看下基础结构定义

  /* base class, nothing to see here unless you subclass */
typedef struct ev_watcher_time
{
  EV_WATCHER_TIME (ev_watcher_time)
} ev_watcher_time;

typedef ev_watcher_time *WT;

typedef struct {
  ev_tstamp at;
  WT w;
} ANHE;

        ev_watcher_timer和ev_timer、ev_periodic有着近乎一致的数据格式——它们都是使用EV_WATCHER_TIME宏构成开始的几个成员变量。

        这意味着,在忽略回调函数名的情况下,我们可以使用ev_watcher_timer指针指向ev_timer结构体对象或者ev_periodic结构体对象。

        因为ANHE结构中变量w可以指向ev_timer,那么目前我们有两种保存定时器监视器的方式(以ev_timer为例):

  1. 连续的ev_timer结构体对象构成的数组
  2. 连续的ANHE结构对象构成的数组

        这两种结构在内存中的布局如下:

        这两种结构都有自己的优缺点:

  1. 结构比较简单,各个监视器的位置关系由数组下标维护。其缺点调整监视器位置比较低效,因为这意味着比较多的内存需要被转移。
  2. 结构则相对复杂,因为其通过一个额外ANHE数组维护各个监视器的关系。但是它可以方便的调整监视器的位置——只要调整各个指针即可。

        对于结构的选择,应该依据应用场景决定。我们在《libev源码解析——定时器原理》中提到,libev需要寻找到“下次执行时间”离现在最近的监视器。如果采用连续的ev_timer结构存储,则可以分为如下两种处理方式:

  • ev_timer数组中各个元素位置不变,只是修改“下次执行时间”。对于这样乱序的数组,只有遍历的办法才能找到最近的。然而这种方式效率很低。
  • ev_timer数组中各个元素位置可变。这样在修改“下次执行时间”后,就需要对数组元素重新布局。

        而如果采用连续的ANHE结构存储,则对于元素位置发生变化的场景,只要修改两个字段即可,而不用像上面方案那样要修改七个字段。相比较而言,这个方案更优。libev选用的就是这个方案。

void noinline
ev_timer_start (EV_P_ ev_timer *w) EV_THROW
{
……
  ++timercnt;
  ev_start (EV_A_ (W)w, timercnt + HEAP0 - 1);
  array_needsize (ANHE, timers, timermax, ev_active (w) + 1, EMPTY2);
  ANHE_w (timers [ev_active (w)]) = (WT)w;
  ANHE_at_cache (timers [ev_active (w)]);
……
}

        timers是保存“相对时间定时器”监视器的数组结构。在ev_periodic_start代码中,我们可以看到“绝对时间定时器”监视器保存在名字叫periodics的数组中。

        下一步我们就需要寻找一种相对高效的排序算法。首先我们想到的是顺序排序,而且离现在时间最近的元素排在数组第一位。为了方便描述,我们之后将时间改成整数表示。假设我们有如下的监视器:1、3、5、7、9、11、13。如果时间为1的监视器执行了,则将其时间差改成12,则数组的移动方式如下

        w2~w6可以见得,“牵一发而动全身”。那是否有更好的方式?libev采用的是最小堆。关于最小堆操作的示例,可以参见《最小堆 构建、插入、删除的过程图解》。以上例,则操作如下图

       可见,我们将操作元素的次数降低到3次,比顺序排列的方案要少很多。

       这种操作是自上而下的,对应的代码是

inline_speed void
downheap (ANHE *heap, int N, int k)
{
  ANHE he = heap [k];

  for (;;)
    {
      int c = k << 1;

      if (c >= N + HEAP0)
        break;

      c += c + 1 < N + HEAP0 && ANHE_at (heap [c]) > ANHE_at (heap [c + 1])
           ? 1 : 0;

      if (ANHE_at (he) <= ANHE_at (heap [c]))
        break;

      heap [k] = heap [c];
      ev_active (ANHE_w (heap [k])) = k;
      
      k = c;
    }

  heap [k] = he;
  ev_active (ANHE_w (he)) = k;
}

        这个行为发生在定时器监视器向pendings结构中记录时,因为离当前时间最近的监视器马上要执行,所以要修改它下一次执行的时间,然后重新布局结构。

inline_size void
timers_reify (EV_P)
{
  EV_FREQUENT_CHECK;

  if (timercnt && ANHE_at (timers [HEAP0]) < mn_now)
    {
      do
        {
          ev_timer *w = (ev_timer *)ANHE_w (timers [HEAP0]);

          /*assert (("libev: inactive timer on timer heap detected", ev_is_active (w)));*/

          /* first reschedule or stop timer */
          if (w->repeat)
            {
              ev_at (w) += w->repeat;
              if (ev_at (w) < mn_now)
                ev_at (w) = mn_now;

              assert (("libev: negative ev_timer repeat value found while processing timers", w->repeat > 0.));

              ANHE_at_cache (timers [HEAP0]);
              downheap (timers, timercnt, HEAP0);

        还有一种是自下而上的重新布局,这种发生在新增一个定时器时。

        相应代码是

inline_speed void
upheap (ANHE *heap, int k)
{
  ANHE he = heap [k];

  for (;;)
    {
      int p = HPARENT (k);

      if (UPHEAP_DONE (p, k) || ANHE_at (heap [p]) <= ANHE_at (he))
        break;

      heap [k] = heap [p];
      ev_active (ANHE_w (heap [k])) = k;
      k = p;
    }

  heap [k] = he;
  ev_active (ANHE_w (he)) = k;
}
void noinline
ev_timer_start (EV_P_ ev_timer *w) EV_THROW
{
  if (expect_false (ev_is_active (w)))
    return;

  ev_at (w) += mn_now;

  assert (("libev: ev_timer_start called with negative timer repeat value", w->repeat >= 0.));

  EV_FREQUENT_CHECK;

  ++timercnt;
  ev_start (EV_A_ (W)w, timercnt + HEAP0 - 1);
  array_needsize (ANHE, timers, timermax, ev_active (w) + 1, EMPTY2);
  ANHE_w (timers [ev_active (w)]) = (WT)w;
  ANHE_at_cache (timers [ev_active (w)]);
  upheap (timers, ev_active (w));

  EV_FREQUENT_CHECK;

  /*assert (("libev: internal timer heap corruption", timers [ev_active (w)] == (WT)w));*/
}

        至此,《libev源码解析》系列已经讲完。经过这个系列分析,我们应该可以对libev整体实现原理和关键结构有了充分的了解。

        至于具体实现,可以参见《libev中文手册》

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区

领取腾讯云代金券