前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Node.js C++ 层的任务管理

Node.js C++ 层的任务管理

作者头像
theanarkh
发布2022-12-06 09:38:01
1.3K0
发布2022-12-06 09:38:01
举报
文章被收录于专栏:原创分享原创分享

好久没更新了,今天写个笔记。

我们都知道 Node.js 是基于事件循环来运行的,本质上是一个生产者 / 消费者模型,所以就少不了任务的管理机制,不过本文不是介绍事件循环中的任务管理,而是 C++ 层的任务管理。本文主要介绍 SetImmediate、SetImmediateThreadsafe、RequestInterrupt、AddCleanupHook 这四个 API 产生的任务。时间关系,随便写写,权当笔记。

任务管理机制的初始化

首先来看一下 Node.js 启动的过程中,和任务管理相关的逻辑。

代码语言:javascript
复制
uv_check_start(immediate_check_handle(), CheckImmediate)
uv_async_init(
    event_loop(),
    &task_queues_async_,
    [](uv_async_t* async) {
      Environment* env = ContainerOf(&Environment::task_queues_async_, async);
      env->RunAndClearNativeImmediates();
    })

CheckImmediate 是在 check 阶段执行的函数,task_queues_async_ 则用于线程间通信,即当子线程往主线程提交任务时,通过 task_queues_async_ 通知主线程,然后主线程执行 uv_async_init 注册的回调。上面的代码就是消费者的逻辑。后面再详细分析里面的处理流程。

提交任务

接下来逐个看一下生产者的逻辑。

代码语言:javascript
复制
template <typename Fn>
void Environment::SetImmediate(Fn&& cb, CallbackFlags::Flags flags) {
  auto callback = native_immediates_.CreateCallback(std::move(cb), flags);
  native_immediates_.Push(std::move(callback));
  // ...
}

SetImmediate 用于同线程的代码提交任务。

代码语言:javascript
复制
template <typename Fn>
void Environment::SetImmediateThreadsafe(Fn&& cb, CallbackFlags::Flags flags) {
  auto callback = native_immediates_threadsafe_.CreateCallback(
      std::move(cb), flags);
  {
    Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
    native_immediates_threadsafe_.Push(std::move(callback));
    if (task_queues_async_initialized_)
      uv_async_send(&task_queues_async_);
  }
}

SetImmediateThreadsafe 用于子线程给主线程提交任务,所以需要加锁。

代码语言:javascript
复制
template <typename Fn>
void Environment::RequestInterrupt(Fn&& cb) {
  auto callback = native_immediates_interrupts_.CreateCallback(
      std::move(cb), CallbackFlags::kRefed);
  {
    Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
    native_immediates_interrupts_.Push(std::move(callback));
    if (task_queues_async_initialized_)
      uv_async_send(&task_queues_async_);
  }
  RequestInterruptFromV8();
}

RequestInterrupt 用于子线程给主线程提交代码,他和 SetImmediateThreadsafe 有一个很重要的区别是调用了 RequestInterruptFromV8。

代码语言:javascript
复制
void Environment::RequestInterruptFromV8() {
  isolate()->RequestInterrupt([](Isolate* isolate, void* data) {
    std::unique_ptr<Environment*> env_ptr { static_cast<Environment**>(data) };
    Environment* env = *env_ptr;
    env->RunAndClearInterrupts();
  }, interrupt_data);
}

RequestInterrupt 可以使得提交的代码在 JS 代码死循环时依然会被执行。接着看 AddCleanupHook。

代码语言:javascript
复制
void Environment::AddCleanupHook(CleanupQueue::Callback fn, void* arg) {
  cleanup_queue_.Add(fn, arg);
}

AddCleanupHook 用于注册线程退出前的回调。生产者的逻辑都比较简单,就是往任务队列里插入一个任务,如果是涉及到线程间的任务,则通知主线程。

消费者

接下来看一下消费者的逻辑,根据前面的分析可以知道,消费者有几个:CheckImmediate,task_queues_async_ 的处理函数、RequestInterrupt 注册的函数、退出前回调处理函数。先看 CheckImmediate。

代码语言:javascript
复制
void Environment::CheckImmediate(uv_check_t* handle) {
  Environment* env = Environment::from_immediate_check_handle(handle);
  env->RunAndClearNativeImmediates();
}

void Environment::RunAndClearNativeImmediates(bool only_refed) {
  RunAndClearInterrupts();

  auto drain_list = [&](NativeImmediateQueue* queue) {
    while (auto head = queue->Shift()) {
        head->Call(this);
    }
    return false;
  };
  while (drain_list(&native_immediates_)) {}
  NativeImmediateQueue threadsafe_immediates;
  if (native_immediates_threadsafe_.size() > 0) {
    Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
    threadsafe_immediates.ConcatMove(std::move(native_immediates_threadsafe_));
  }
  while (drain_list(&threadsafe_immediates)) {}
}

void Environment::RunAndClearInterrupts() {
  while (native_immediates_interrupts_.size() > 0) {
    NativeImmediateQueue queue;
    {
      Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
      queue.ConcatMove(std::move(native_immediates_interrupts_));
    }
    while (auto head = queue.Shift())
      head->Call(this);
  }
}

CheckImmediate 函数中处理了SetImmediate、SetImmediateThreadsafe 和 RequestInterrupt 产生的任务。但是如果主线程阻塞在 Poll IO 阶段时,只有子线程提交任务时会唤醒主线程,具体是通过 task_queues_async_ 结构体,看一下处理函数。

代码语言:javascript
复制
env->RunAndClearNativeImmediates();

可以看到这时候也是处理了SetImmediate、SetImmediateThreadsafe 和 RequestInterrupt 产生的任务。最后来看一下处理退出前回调的函数,具体时机是 FreeEnvironment 函数中的 env->RunCleanup()。

代码语言:javascript
复制
void Environment::RunCleanup() {
  RunAndClearNativeImmediates(true);
  while (!cleanup_queue_.empty() || principal_realm_->HasCleanupHooks() ||
         native_immediates_.size() > 0 ||
         native_immediates_threadsafe_.size() > 0 ||
         native_immediates_interrupts_.size() > 0) {
    // 见 CleanupQueue::Drain
    cleanup_queue_.Drain();
    RunAndClearNativeImmediates(true);
  }
}

// cleanup_queue_.Drain();
void CleanupQueue::Drain() {
  std::vector<CleanupHookCallback> callbacks(cleanup_hooks_.begin(),
                                             cleanup_hooks_.end());
  std::sort(callbacks.begin(),
            callbacks.end(),
            [](const CleanupHookCallback& a, const CleanupHookCallback& b) {
              return a.insertion_order_counter_ > b.insertion_order_counter_;
            });

  for (const CleanupHookCallback& cb : callbacks) {
    cb.fn_(cb.arg_);
    cleanup_hooks_.erase(cb);
  }
}

RunCleanup 中同时处理了 SetImmediate、SetImmediateThreadsafe、 RequestInterrupt 产生的任务和注册的退出前回调。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-12-02,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 编程杂技 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 任务管理机制的初始化
  • 提交任务
  • 消费者
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档