前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >深入理解 Node.js 的 trace_event

深入理解 Node.js 的 trace_event

作者头像
theanarkh
发布2022-03-30 20:01:42
7460
发布2022-03-30 20:01:42
举报
文章被收录于专栏:原创分享原创分享

前言:trace event 简单来说就是在代码里静态插入埋点,当开启 trace event 的时候,每次经过这些埋点,就会记录执行的相关数据,通过这些数据,我们可以对系统进行分析。Node.js 内核支持 trace event 的功能,并实现了对某些模块的 trace 能力。本文介绍 trace event 在 Node.js 中的实现。

在 Node.js 收集 trace_event 数据的方式有两种,第一种是在启动 Node.js 时通过命令行参数。

代码语言:javascript
复制
node --trace-event-categories v8 --trace-event-file-pattern '${pid}-${rotation}.log' server.js

命令行参数可以指定我们需要收集哪些模块的 trace 数据以及这些 trace 数据写到哪个文件中。第二种方式是通过动态的方式去控制想要收集哪些模块的 trace 数据。

代码语言:javascript
复制
const trace_events = require('trace_events');const categories = ['node.perf', 'node.async_hooks'];const tracing = trace_events.createTracing({ categories });tracing.enable();// do something
tracing.disable();

下面以第二种方式为例介绍具体的实现(第一种只不过是第二种的简单情况)。通过 createTracing 可以创建一个 trace 对象。

代码语言:javascript
复制
function createTracing(options) {
  return new Tracing(options.categories);}

class Tracing {
  constructor(categories) {
    this[kHandle] = new CategorySet(categories);
    this[kCategories] = categories;
    this[kEnabled] = false;
  }

  enable() {
    if (!this[kEnabled]) {
      this[kEnabled] = true;
      this[kHandle].enable();
    }
  }}

新建了一个 CategorySet 对象然后调用其 enable 函数。可以看到,js 层只是对底层 CategorySet 的简单封装。接着看 C++ 层。

代码语言:javascript
复制
class NodeCategorySet : public BaseObject {
 public:

  static void New(const FunctionCallbackInfo<Value>& args);
  static void Enable(const FunctionCallbackInfo<Value>& args);
  static void Disable(const FunctionCallbackInfo<Value>& args);
 private:

  bool enabled_ = false;
  const std::set<std::string> categories_; // 对象关联的 trace 模块};

接着看 enable 函数的逻辑。

代码语言:javascript
复制
void NodeCategorySet::Enable(const FunctionCallbackInfo<Value>& args) {
  NodeCategorySet* category_set;
  ASSIGN_OR_RETURN_UNWRAP(&category_set, args.Holder());
  const auto& categories = category_set->GetCategories();
  // 非空并且没有启动则启动
  if (!category_set->enabled_ && !categories.empty()) {
    // 启动 trace agent,如果已经启动则直接返回
    StartTracingAgent();
    // 通过 writer 注册需要 trace 的模块
    GetTracingAgentWriter()->Enable(categories);
    category_set->enabled_ = true;
  }}

最终的核心逻辑在 StartTracingAgent 和 GetTracingAgentWriter()->Enable() 中,在分析这两个函数之前,首先来看一下 Node.js 初始化的过程中关于 trace agent 的逻辑。

代码语言:javascript
复制
struct V8Platform {
    bool initialized_ = false;
    inline void Initialize(int thread_pool_size) {
        // 创建一个 trace agent 对象
        tracing_agent_ = std::make_unique<tracing::Agent>();
        // 保存到某个地方,生产 trace 数据时使用
        node::tracing::TraceEventHelper::SetAgent(tracing_agent_.get());
        // 获取 agent 中的 controller,controller 负责管理 trace 数据的生产
        node::tracing::TracingController* controller = tracing_agent_->GetTracingController();
        // 创建一个 trace 观察者,在启动 trace 的时候被 V8 执行
        trace_state_observer_ = std::make_unique<NodeTraceStateObserver>(controller);
        // 保持到 controller 中
        controller->AddTraceStateObserver(trace_state_observer_.get());
        // tracing_file_writer_ 设置为默认值
        tracing_file_writer_ = tracing_agent_->DefaultHandle();
        // 通过命令行启动
        if (!per_process::cli_options->trace_event_categories.empty()) {
          StartTracingAgent();
        }
    }

    inline tracing::AgentWriterHandle* GetTracingAgentWriter() {
      return &tracing_file_writer_;
    }
    std::unique_ptr<NodeTraceStateObserver> trace_state_observer_;
    std::unique_ptr<tracing::Agent> tracing_agent_;
    tracing::AgentWriterHandle tracing_file_writer_;};

下面来看一下 对象间的整体关系和大致实现。

1. Agent

Agent 是 Node.js 中 trace event 的总负责人。

代码语言:javascript
复制
class Agent {
 public:
  Agent();
  ~Agent();
  // 获取 agent 中的 controller
  TracingController* GetTracingController() {
    TracingController* controller = tracing_controller_.get();
    return controller;
  }

  //  注册消费者 writer
  AgentWriterHandle AddClient(...);
  // 默认 writer
  AgentWriterHandle DefaultHandle();

  // 生产者调用这些函数生产 trace 数据
  void AppendTraceEvent(TraceObject* trace_event);
  void AddMetadataEvent(std::unique_ptr<TraceObject> event);

  // 通过 writer flush 数据到文件
  void Flush(bool blocking);
 private:

  void Start();
  void Enable(int id, const std::set<std::string>& categories);

  uv_thread_t thread_;
  uv_loop_t tracing_loop_;

  bool started_ = false;
  class ScopedSuspendTracing;

  // 记录需要 trace 的模块
  std::unordered_map<int, std::multiset<std::string>> categories_;
  // 记录消费者
  std::unordered_map<int, std::unique_ptr<AsyncTraceWriter>> writers_;
  std::unique_ptr<TracingController> tracing_controller_;};

从 Agent 的类结构我们可以大致了解到 Agent 的功能。我们可以通过 Agent 启动 trace 系统、注册消费者,提供接口给生产者生产数据并写到消费者中。接着看 agent 中的 tracing_controller_ 和 writers_ 对象。

2. TracingController

代码语言:javascript
复制
class TracingController : public v8::platform::tracing::TracingController {
 public:
  TracingController() : v8::platform::tracing::TracingController() {}

  int64_t CurrentTimestampMicroseconds() override {
    return uv_hrtime() / 1000;
  }
  void AddMetadataEvent(...);};

TracingController 继承 V8 的 TracingController。

代码语言:javascript
复制
class V8_PLATFORM_EXPORT TracingController : public V8_PLATFORM_NON_EXPORTED_BASE(v8::TracingController) {
 public:
  // 设置 TraceBuffer,用于保持生产者数据
  void Initialize(TraceBuffer* trace_buffer);
  // 生产者调用
  uint64_t AddTraceEvent(...) override;
  uint64_t AddTraceEventWithTimestamp(...) override;
  // 注册观察者,启动 trace 时通知观察者
  void AddTraceStateObserver(...) override;

  void StartTracing(TraceConfig* trace_config);
  void StopTracing();
 private:

  std::unordered_set<v8::TracingController::TraceStateObserver*> observers_;
  std::unique_ptr<TraceBuffer> trace_buffer_;};

TracingController 内部持有一个 TraceBuffer 对象,当产生 trace 数据时,生产者就会调用 TracingController 的 AddTraceEvent 函数把数据写入 TraceBuffer 中。另外 TracingController 还持有了一些观察者。这些观察者在启动 trace 时会被执行。

代码语言:javascript
复制
void TracingController::AddTraceStateObserver(v8::TracingController::TraceStateObserver* observer) {
  {
    base::MutexGuard lock(mutex_.get());
    observers_.insert(observer);
    // 如果还没启动 trace 则返回,等待启动时通知观察者
    if (!recording_.load(std::memory_order_acquire)) return;
  }
  // trace 已经开启,直接通知观察者
  observer->OnTraceEnabled();}

3. NodeTraceStateObserver

刚才提到 TracingController 持有一些观察者。Node.js 中的观察者是 NodeTraceStateObserver。

代码语言:javascript
复制
class NodeTraceStateObserver: public v8::TracingController::TraceStateObserver {
 public:
  //。trace 启动时被回调
  inline void OnTraceEnabled() override {
    // 省略部分代码
    trace_process->SetString("arch", per_process::metadata.arch.c_str());
    trace_process->SetString("platform", per_process::metadata.platform.c_str());
    trace_process->BeginDictionary("release");
    trace_process->SetString("name", per_process::metadata.release.name.c_str());
    // 产生 trace 数据
    TRACE_EVENT_METADATA1("__metadata", "node", "process", std::move(trace_process));
  }
 private:
  v8::TracingController* controller_;};

NodeTraceStateObserver 的逻辑很简单,就是在 trace 系统启动时执行 OnTraceEnabled,在 OnTraceEnabled 中会产生一个 trace 的 meta 数据,最终通过 TRACE_EVENT_METADATA1 写入 trace 文件,TRACE_EVENT_METADATA1 是一个宏,最终展开后调用 node::tracing::AddMetadataEvent -> AddMetadataEventImpl -> agent->GetTracingController()->AddMetadataEvent。

了解了 Node.js 初始化 agent 的逻辑后,就可以继续分析之前留下的 StartTracingAgent() 和 GetTracingAgentWriter()->Enable(categories) 的逻辑。首先看 StartTracingAgent。

代码语言:javascript
复制
inline void StartTracingAgent() {
    if (tracing_file_writer_.IsDefaultHandle()) {
      // 解析出命令后设置的需要 trace 的模块
      std::vector<std::string> categories = SplitString(per_process::cli_options->trace_event_categories, ',');
      // 注册消费者 writer 
      tracing_file_writer_ = tracing_agent_->AddClient(
          std::set<std::string>(std::make_move_iterator(categories.begin()),
                                std::make_move_iterator(categories.end())),
          std::unique_ptr<tracing::AsyncTraceWriter>(
              new tracing::NodeTraceWriter(
                  per_process::cli_options->trace_event_file_pattern)),
          tracing::Agent::kUseDefaultCategories);
    }}

在 Node.js 初始化时,tracing_file_writer_ 为默认值,所以如果还没有调用过 StartTracingAgent,则 IsDefaultHandle 为 true,反之 tracing_file_writer_ 会被 AddClient 重新赋值,第二次调用 StartTracingAgent 就直接返回了。因为我们现在是第一次执行 StartTracingAgent。所以 IsDefaultHandle 为 true。接着首先解析出需要 trace 的模块,然后调用 agent 的 AddClient 函数注册消费者。看一下 AddClient。

代码语言:javascript
复制
AgentWriterHandle Agent::AddClient(
    const std::set<std::string>& categories,
    std::unique_ptr<AsyncTraceWriter> writer,
    enum UseDefaultCategoryMode mode) {
  // 启动 trace 子线程,如果还没有启动的话
  Start();
  const std::set<std::string>* use_categories = &categories;
  int id = next_writer_id_++;
  AsyncTraceWriter* raw = writer.get();
  // 记录 writer 和 trace 的模块
  writers_[id] = std::move(writer);
  categories_[id] = { use_categories->begin(), use_categories->end() };
  {
    Mutex::ScopedLock lock(initialize_writer_mutex_);
    // 记录待初始化的 writer
    to_be_initialized_.insert(raw);
    // 通知 trace 子线程
    uv_async_send(&initialize_writer_async_);
    while (to_be_initialized_.count(raw) > 0)
      initialize_writer_condvar_.Wait(lock);
  }

  return AgentWriterHandle(this, id);}

trace 系统部分逻辑是跑在子线程的。注册 writer 时如果还没有启动 trace 子线程则启动它。

代码语言:javascript
复制
Agent::Agent() : tracing_controller_(new TracingController()) {
  tracing_controller_->Initialize(nullptr);
  uv_loop_init(&tracing_loop_), 0;
  // 注册 writer 时执行的回调
  uv_async_init(&tracing_loop_, &initialize_writer_async_, [](uv_async_t* async) {
    Agent* agent = ContainerOf(&Agent::initialize_writer_async_, async);
    agent->InitializeWritersOnThread();
  }), 0);
  uv_unref(reinterpret_cast<uv_handle_t*>(&initialize_writer_async_));}
void Agent::Start() {  if (started_)
    return;

  NodeTraceBuffer* trace_buffer_ = new NodeTraceBuffer(NodeTraceBuffer::kBufferChunks, this, &tracing_loop_);
  tracing_controller_->Initialize(trace_buffer_);
  uv_thread_create(&thread_, [](void* arg) {
    Agent* agent = static_cast<Agent*>(arg);
    uv_run(&agent->tracing_loop_, UV_RUN_DEFAULT);
  }, this);
  started_ = true;}

Agent::Start 首先初始化了和生产者相关的逻辑,否则注册消费者 writer 就毫无意义。这里新建了一个 NodeTraceBuffer 并设置到 controller 中。后面分析生产者时再分析这部分代码。接着创建了一个线程,trace 子线程中单独跑了一个事件循环,并且通过异步方式和主线程通信,所以每次注册 writer 的时候,主线程都通过 uv_async_send(&initialize_writer_async_) 通知 子线程。从而子线程执行回调 agent->InitializeWritersOnThread()。

代码语言:javascript
复制
void Agent::InitializeWritersOnThread() {
  Mutex::ScopedLock lock(initialize_writer_mutex_);
  while (!to_be_initialized_.empty()) {
    AsyncTraceWriter* head = *to_be_initialized_.begin();
    head->InitializeOnThread(&tracing_loop_);
    to_be_initialized_.erase(head);
  }
  initialize_writer_condvar_.Broadcast(lock);}

InitializeWritersOnThread 遍历待注册的 writer 并执行它的 InitializeWritersOnThread 函数。这里以 Node.js 的 writer NodeTraceWriter 为例。

代码语言:javascript
复制
void NodeTraceWriter::InitializeOnThread(uv_loop_t* loop) {
  tracing_loop_ = loop;
  flush_signal_.data = this;
  int err = uv_async_init(tracing_loop_, &flush_signal_, [](uv_async_t* signal) {
    NodeTraceWriter* trace_writer = ContainerOf(&NodeTraceWriter::flush_signal_, signal);
    trace_writer->FlushPrivate();
  });}

writer 往子线程事件循环中注册了一个异步回调,这个回调会在需要写入数据到文件里被执行,后面会分析。至此,StartTracingAgent 就分析完了。接着看 GetTracingAgentWriter()->Enable(categories)。GetTracingAgentWriter 返回到是一个 AgentWriterHandle 对象。

代码语言:javascript
复制
void AgentWriterHandle::Enable(const std::set<std::string>& categories) {
  if (agent_ != nullptr) agent_->Enable(id_, categories);}
void Agent::Enable(int id, const std::set<std::string>& categories) {  categories_[id].insert(categories.begin(), categories.end());}

这样就完成了 trace 系统的初始化和订阅了需要 trace 的模块。接下来分析生产者。以同步打开文件 API 为例。下面是 open 函数的 trace 埋点。

代码语言:javascript
复制
FS_SYNC_TRACE_BEGIN(open);int result = SyncCall(env, args[4], &req_wrap_sync, "open",                      uv_fs_open, *path, flags, mode);FS_SYNC_TRACE_END(open);

宏展开后

代码语言:javascript
复制
#define FS_SYNC_TRACE_BEGIN(syscall, ...)                                  \
  if (GET_TRACE_ENABLED)                                                   \
  TRACE_EVENT_BEGIN(TRACING_CATEGORY_NODE2(fs, sync), TRACE_NAME(syscall), \
  ##__VA_ARGS__);

继续

代码语言:javascript
复制
// 判断是否订阅了当前模块的 traceif (*node::tracing::TraceEventHelper::GetCategoryGroupEnabled("node,node.fs,node.fs.sync") != 0) {    // 通过 agent 的 controller 写入 trace 数据
    controller->AddTraceEvent(...);}

AddTraceEvent 由 V8 实现。

代码语言:javascript
复制
uint64_t TracingController::AddTraceEvent(...) {
  int64_t now_us = CurrentTimestampMicroseconds();
  return AddTraceEventWithTimestamp(...);}

uint64_t TracingController::AddTraceEventWithTimestamp(...) {
 TraceObject* trace_object = trace_buffer_->AddTraceEvent(&handle);}

通过层层调用,最终调用 TraceBuffer 的 AddTraceEvent,对应 Node.js 的 NodeTraceBuffer。

代码语言:javascript
复制
TraceObject* NodeTraceBuffer::AddTraceEvent(uint64_t* handle) {
  // buffer 是否已经满了,是则 flush
  if (!TryLoadAvailableBuffer()) {
    *handle = 0;
    return nullptr;
  }
  // 否则缓存
  return current_buf_.load()->AddTraceEvent(handle);}

我们只需要看 TryLoadAvailableBuffer。

代码语言:javascript
复制
bool NodeTraceBuffer::TryLoadAvailableBuffer() {
  InternalTraceBuffer* prev_buf = current_buf_.load();
  if (prev_buf->IsFull()) {
    uv_async_send(&flush_signal_);
  }
  return true;}

如果 buffer 满了,则通知 flush_signal_,那么 flush_signal_ 是什么呢?这是在 NodeTraceBuffer 初始化时设置的。

代码语言:javascript
复制
NodeTraceBuffer::NodeTraceBuffer(size_t max_chunks,
    Agent* agent, uv_loop_t* tracing_loop)
    : tracing_loop_(tracing_loop),
      buffer1_(max_chunks, 0, agent),
      buffer2_(max_chunks, 1, agent) {
  flush_signal_.data = this;
  // 回调 NonBlockingFlushSignalCb
  int err = uv_async_init(tracing_loop_, &flush_signal_,NonBlockingFlushSignalCb);}

可以看到 NodeTraceBuffer 在 trace 子线程中设置了一个回调,当主线程写入的 trace 数据满了则通知子线程处理。具体逻辑在 NonBlockingFlushSignalCb。

代码语言:javascript
复制
void NodeTraceBuffer::NonBlockingFlushSignalCb(uv_async_t* signal) {
  NodeTraceBuffer* buffer = static_cast<NodeTraceBuffer*>(signal->data);
  if (buffer->buffer1_.IsFull() && !buffer->buffer1_.IsFlushing()) {
    buffer->buffer1_.Flush(false);
  }
  if (buffer->buffer2_.IsFull() && !buffer->buffer2_.IsFlushing()) {
    buffer->buffer2_.Flush(false);
  }}

NodeTraceBuffer 内部维护了几个内部 buffer 用于存储数据(InternalTraceBuffer 对象)。当 内部 buffer 满了则调用 Flush。

代码语言:javascript
复制
void InternalTraceBuffer::Flush(bool blocking) {
  {
    Mutex::ScopedLock scoped_lock(mutex_);
    if (total_chunks_ > 0) {
      flushing_ = true;
      for (size_t i = 0; i < total_chunks_; ++i) {
        auto& chunk = chunks_[i];
        for (size_t j = 0; j < chunk->size(); ++j) {
          TraceObject* trace_event = chunk->GetEventAt(j);
          if (trace_event->name()) {
            // 交给 agent 处理
            agent_->AppendTraceEvent(trace_event);
          }
        }
      }
      total_chunks_ = 0;
      flushing_ = false;
    }
  }
  agent_->Flush(blocking);}

Flush 最终会通知 agent 进行数据的处理并调用 agent 的 Flush。

代码语言:javascript
复制
void Agent::AppendTraceEvent(TraceObject* trace_event) {
  for (const auto& id_writer : writers_)
    id_writer.second->AppendTraceEvent(trace_event);}
void Agent::Flush(bool blocking) {  for (const auto& id_writer : writers_)
    id_writer.second->Flush(blocking);}

agent 也只是简单调用 writer 进行数据的消费。

代码语言:javascript
复制
void NodeTraceWriter::AppendTraceEvent(TraceObject* trace_event) {
  Mutex::ScopedLock scoped_lock(stream_mutex_);
  if (total_traces_ == 0) {
    // 打开 trace 文件
    OpenNewFileForStreaming();
    json_trace_writer_.reset(TraceWriter::CreateJSONTraceWriter(stream_));
  }
  ++total_traces_;
  // 缓存数据
  json_trace_writer_->AppendTraceEvent(trace_event);}

AppendTraceEvent 只是把数据放到内存里。等待 Flush 时写到文件。

代码语言:javascript
复制
void NodeTraceWriter::Flush(bool blocking) {
  int err = uv_async_send(&flush_signal_);}

和 buffer 一样,消费者进行数据处理时也是需要通知 trace 子线程。这个在介绍注册 writer 时已经分析过。具体处理函数是 trace_writer->FlushPrivate()。这个函数就是把数据写到 trace 文件,就不再具体分析。trace 系统整体架构和处理流程如下。

生成 trace 文件后,可以使用 chrome://tracing/ 进行分析。比如分析 openSync 函数。

对应 trace 文件的数据。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-01-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 编程杂技 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. Agent
  • 2. TracingController
  • 3. NodeTraceStateObserver
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档