专栏首页原创分享nodejs源码解析之事件循环

nodejs源码解析之事件循环

nodejs的的事件循环由libuv的uv_run函数实现。在该函数中执行while循环,然后处理各种阶段(phase)的事件回调。事件循环的处理相当于一个消费者,消费由各业务代码生产的任务。下面看一下代码。

int uv_run(uv_loop_t* loop, uv_run_mode mode) {
  int timeout;
  int r;
  int ran_pending;

  r = uv__loop_alive(loop);
  if (!r)
    uv__update_time(loop);

  while (r != 0 && loop->stop_flag == 0) {
    // 更新loop的time字段
    uv__update_time(loop);
    // 执行超时回调
    uv__run_timers(loop);
    // 执行pending回调,ran_pending代表pending队列是否为空,即没有节点可以执行
    ran_pending = uv__run_pending(loop);
    // 继续执行各种队列
    uv__run_idle(loop);
    uv__run_prepare(loop);

    timeout = 0;
    // UV_RUN_ONCE并且有pending节点的时候,会阻塞式poll io,默认模式也是
    if ((mode == UV_RUN_ONCE && !ran_pending) || mode == UV_RUN_DEFAULT)
      timeout = uv_backend_timeout(loop);
    // poll io timeout是epoll_wait的超时时间
    uv__io_poll(loop, timeout);
    uv__run_check(loop);
    uv__run_closing_handles(loop);
    // 还有一次执行超时回调的机会
    if (mode == UV_RUN_ONCE) {
      uv__update_time(loop);
      uv__run_timers(loop);
    }

    r = uv__loop_alive(loop);
    if (mode == UV_RUN_ONCE || mode == UV_RUN_NOWAIT)
      break;
  }

  if (loop->stop_flag != 0)
    loop->stop_flag = 0;

  return r;
}

在这里插入图片描述 libuv分为几个阶段,这个可以从官网去了解到。下面分别分析各个阶段的相关代码。

1 定时器阶段

  // 更新loop的time字段
    uv__update_time(loop);
    // 执行超时回调
    uv__run_timers(loop);

首先更新当前的时间,然后判断哪个节点超时了。

static void uv__update_time(uv_loop_t* loop){
  loop->time = uv__hrtime(UV_CLOCK_FAST) / 1000000;
}

// 找出已经超时的节点,并且执行里面的回调
void uv__run_timers(uv_loop_t* loop) {
  struct heap_node* heap_node;
  uv_timer_t* handle;

  for (;;) {
    heap_node = heap_min(timer_heap(loop));
    if (heap_node == NULL)
      break;

    handle = container_of(heap_node, uv_timer_t, heap_node);
    // 如果当前节点的时间大于当前时间则返回,说明后面的节点也没有超时
    if (handle->timeout > loop->time)
      break;
    // 移除该计时器节点,重新插入最小堆,如果设置了repeat的话
    uv_timer_stop(handle);
    uv_timer_again(handle);
    // 执行超时回调
    handle->timer_cb(handle);
  }
}

libuv以二叉堆的形式维护了超时任务节点,每次判断最快超时的节点有没有超时,没有的话说明后面的节点也不会超时,有的话继续往下判断。定时器在nodejs里的生产者是setTimeout和setInterval。

2 pending阶段

官网解释是在上一轮的poll io阶段没有执行的io回调,会在下一轮循环的pending阶段被执行。我们先看pending阶段的处理。

static int uv__run_pending(uv_loop_t* loop) {
  QUEUE* q;
  QUEUE pq;
  uv__io_t* w;

  if (QUEUE_EMPTY(&loop->pending_queue))
    return 0;
  // 把pending_queue队列的节点移到pq,即清空了pending_queue
  QUEUE_MOVE(&loop->pending_queue, &pq);

  // 遍历pq队列
  while (!QUEUE_EMPTY(&pq)) {
    // 取出当前第一个需要处理的节点,即pq.next
    q = QUEUE_HEAD(&pq);
    // 把当前需要处理的节点移出队列
    QUEUE_REMOVE(q);
    // 重置一下prev和next指针,因为这时候这两个指针是指向队列中的两个节点
    QUEUE_INIT(q);
    w = QUEUE_DATA(q, uv__io_t, pending_queue);
    w->cb(loop, w, POLLOUT);
  }

  return 1;
}

就是把pending队列了的节点逐个执行。然后我们看一下pending队列的节点是如何生产出来的。

void uv__io_feed(uv_loop_t* loop, uv__io_t* w) {
  if (QUEUE_EMPTY(&w->pending_queue))
    QUEUE_INSERT_TAIL(&loop->pending_queue, &w->pending_queue);
}

libuv通过uv__io_feed函数生产pending任务,从libuv的代码中我们看到io错误的时候会调这个函数(还有其他情况)。

if (handle->delayed_error)
    uv__io_feed(handle->loop, &handle->io_watcher);

最后io关闭的时候会从pending队列移除对应的节点。

void uv__io_close(uv_loop_t* loop, uv__io_t* w) {
  uv__io_stop(loop, w, POLLIN | POLLOUT | UV__POLLRDHUP | UV__POLLPRI);
  QUEUE_REMOVE(&w->pending_queue);
  uv__platform_invalidate_fd(loop, w->fd);
}

3 idle阶段

idle节点是自定义的一些任务,也是维护一个任务队列,每次循环的时候,如果队列不为空则逐个执行任务节点。在nodejs中setImmediate的实现使用了idle这个阶段。

// ToggleImmediateRef在timer.js中使用
void Environment::ToggleImmediateRef(bool ref) {
  if (ref) {
    /*
     Idle handle is needed only to stop the event loop from blocking in poll.
     防止在poll io中阻塞,有回调则poll io的timeout是0
     */
    uv_idle_start(immediate_idle_handle(), [](uv_idle_t*){ });
  } else {
    uv_idle_stop(immediate_idle_handle());
  }
}

idle的处理逻辑可以参考这篇文章libuv之idle、check、prepare---loop-watcher.c

4 prepare阶段

类似idle阶段,自定义的任务队列,是poll io前最后一个阶段。

5 poll io阶段

poll io是处理网络io、文件io的地方。可能会引起nodejs的短暂阻塞。

// 最长阻塞时间
timeout = uv_backend_timeout(loop);
uv__io_poll(loop, timeout);

看看最长阻塞时间是怎么算的。

int uv_backend_timeout(const uv_loop_t* loop) {
  if (loop->stop_flag != 0)
    return 0;

  if (!uv__has_active_handles(loop) && !uv__has_active_reqs(loop))
    return 0;

  if (!QUEUE_EMPTY(&loop->idle_handles))
    return 0;

  if (!QUEUE_EMPTY(&loop->pending_queue))
    return 0;

  if (loop->closing_handles)
    return 0;

  return uv__next_timeout(loop);
}

没有需要处理的任务或者有需要处理的回调则不阻塞,否则取定时器二叉堆中最快到期的节点的时间作为阻塞时间。然后进入uv__io_poll。uv__io_poll是经典的epoll处理模式。使用先把业务感兴趣的事件注册到epoll中,然后在epoll_wait中等待事件的到来。最后执行对应事件的回调。下面看一下核心的代码。

while (!QUEUE_EMPTY(&loop->watcher_queue)) {
    q = QUEUE_HEAD(&loop->watcher_queue);
    QUEUE_REMOVE(q);
    QUEUE_INIT(q);

    w = QUEUE_DATA(q, uv__io_t, watcher_queue);

    e.events = w->pevents;
    e.data = w->fd;

    if (w->events == 0)
      op = UV__EPOLL_CTL_ADD;
    else
      op = UV__EPOLL_CTL_MOD;
    uv__epoll_ctl(loop->backend_fd, op, w->fd, &e);
}
    nfds = uv__epoll_wait(loop->backend_fd,
                            events,
                            ARRAY_SIZE(events),
                            timeout);

    loop->watchers[loop->nwatchers] = (void*) events;
    loop->watchers[loop->nwatchers + 1] = (void*) (uintptr_t) nfds;

    for (i = 0; i < nfds; i++) {
      pe = events + i;
      fd = pe->data;
       w->cb(loop, w, pe->events);
    }

从上面的代码中我们看到我们把感兴趣的事件和回调打包成一个watcher追加到loop->watcher_queue队列,在poll io阶段就会被libuv处理。

6 check阶段

check阶段类似idle和prepare,是用户自定义任务的。nodejs中setImmediate是利用这个阶段实现的,具体可以看这篇文章nodejs之setImmediate源码分析。

7 closing_handles阶段

当一个handle调用uv_close关闭的时候,可以注册一个回调,在closing_handles阶段就会被执行。

void uv_close(uv_handle_t* handle, uv_close_cb close_cb) {
  handle->flags |= UV_HANDLE_CLOSING;
  handle->close_cb = close_cb;
  ...
  uv__make_close_pending(handle);
}

void uv__make_close_pending(uv_handle_t* handle) {
  handle->next_closing = handle->loop->closing_handles;
  handle->loop->closing_handles = handle;
}

上面代码把给handle绑定了一个close_cb然后插入到closing_handles队列。然后在closing_handles阶段被执行。

static void uv__run_closing_handles(uv_loop_t* loop) {
  uv_handle_t* p;
  uv_handle_t* q;

  p = loop->closing_handles;
  loop->closing_handles = NULL;

  while (p) {
    q = p->next_closing;
    uv__finish_close(p);
    p = q;
  }
}
static void uv__finish_close(uv_handle_t* handle) {
  ...
  if (handle->close_cb) {
    handle->close_cb(handle);
  }
}

至此,完成了一轮事件循环。这就是nodejs用libuv实现的事件循环。我们首先要了解各个阶段都是处理什么的,然后把我们的任务直接或者间接地加到对应阶段的任务队列里就可以了。

本文分享自微信公众号 - 编程杂技(theanarkh),作者:theanarkh

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-01-04

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 理解libuv的基本原理

    libuv的实现是一个很经典生产者-消费者模型。libuv在整个生命周期中,每一次循环都执行每个阶段(phase)维护的任务队列。逐个执行节点里的回调,在回调中...

    theanarkh
  • libuv源码解析之数据结构全景图

    theanarkh
  • libuv源码分析之stream第二篇

    关闭流的写端就是相当于给流发送一个关闭请求,把请求挂载到流中,然后注册等待可写事件,在可写事件触发的时候就会执行关闭操作。这个我们后面分析。

    theanarkh
  • HDOJ 2042 不容易系列之二

    Problem Description 你活的不容易,我活的不容易,他活的也不容易。不过,如果你看了下面的故事,就会知道,有位老汉比你还不容易。

    谙忆
  • Pandas-14.统计函数

    悠扬前奏
  • libuv源码阅读(17)--queue-cancel

    总结:cancel把会work从待处理的队列中移动到loop的wq就绪队列中,然后让loop的异步任务handler来处理它

    wanyicheng
  • java敏感词校验

    https://juejin.im/post/5b6ef984e51d45667f3878ef http://cmsblogs.com/?p=1031 ht...

    居士
  • 中国首个区块链标准《区块链参考架构》发布

    作者:数据观 ? 2017年5月16日上午,在杭州国际博览中心举行的区块链技术应用峰会暨首届中国区块链开发大赛成果发布会上,首个区块链标准《区块链 参考架构》...

    钱塘数据
  • ​教育部要求导师接受学生和同行评价,作为招生、职称、评奖的重要依据

    9 月 28 日,教育部网站公布了由国务院学位委员会、教育部印发的《关于进一步严格规范学位与研究生教育质量管理的若干意见》全文。该意见对研究生培养教育的全流程做...

    气象学家
  • 3000字!5大SQL数据清洗方法!

    日常工作中,分析师会接到一些专项分析的需求,首先会搜索脑中的分析体悉,根据业务需求构建相应的分析模型(不只是机器学习模型),根据模型填充相应维度表,这些维度特征...

    Rocky0429

扫码关注云+社区

领取腾讯云代金券