前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Vpp --node节点调度总结

Vpp --node节点调度总结

作者头像
dpdk-vpp源码解读
发布2023-03-07 16:57:38
1.4K0
发布2023-03-07 16:57:38
举报
文章被收录于专栏:DPDK VPP源码分析DPDK VPP源码分析

Vpp的功能逻辑被划分为多个featuce arc(比如ipv4、IPv6、l2、MPLS等),每个feature arc是由一定顺序的node组成,每个node完成一个功能。本文主要是来介绍一下node的类型及调度模式。

  • 1、调度类型的描述 调度类型分为4种类型internal、input、pre-input、process。具体的类型描述和运行线程如下:
  • 2、调度方式

个人理解:

从代码上来看只有Input类型的node节点注册的时候state设置成中断方式,才会出现中断和轮训的切换,默认全是轮询方式。 PRE_INPUT类型node只能按照轮询当时来调度。设置成state为中断也是一样的。

中断方式和轮询方式之间切换(类似内核NAPI机制)。

模式切换依据累计报文数量,在vlib_main_or_worker_loop启动中设置。

  • 3、PRE_INPUT和INPUT类型调度

Pre_input类型只能在main线程上被调度,一般为Process类型提供输入。

inPut类型的node可以运行在main线程和worker线程,一般为internal类型的node提供报文输入。

流程是在vlib_main_or_worker_loop函数的while(1)循环中,如下面代码:

代码语言:javascript
复制
 if (is_main)/*在main线程上处理pre—input类型node*/
  vec_foreach (n, nm->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT])
    cpu_time_now = dispatch_node (vm, n,
          VLIB_NODE_TYPE_PRE_INPUT,
          VLIB_NODE_STATE_POLLING,
          /* frame */ 0,
          cpu_time_now);
      /* 处理input类型nodes. */
      vec_foreach (n, nm->nodes_by_type[VLIB_NODE_TYPE_INPUT])
  cpu_time_now = dispatch_node (vm, n,
              VLIB_NODE_TYPE_INPUT,
              VLIB_NODE_STATE_POLLING,
              /* frame */ 0,
              cpu_time_now);

中断类型node是将node的索引保存在pending_interrupt_node_runtime_indices数组中,依次遍历:

代码语言:javascript
复制
 /*pending_interrupt_node_runtime_indices数组大小默认32个*/
uword l = _vec_len (nm->pending_interrupt_node_runtime_indices);
  uword i;
  if (l > 0)
    {     ...........
    for (i = 0; i < l; i++)
    {      /*获取vlib_node_runtime_t的地址*/
      n = vec_elt_at_index (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT],
              last_node_runtime_indices[i]);
      cpu_time_now = dispatch_node (vm, n, VLIB_NODE_TYPE_INPUT,
        VLIB_NODE_STATE_INTERRUPT,
        /* frame */ 0,
        cpu_time_now);
    }
    }
      }
  • 4 Internal类型的调度

INPUT 类型的 node 是如何把报文发送给 INTERNAL 类型的 node 的。整个过程可以分成以下几步:

1、确定把报文发送给第几个孩子节点

确定把报文送给哪个孩子节点处理,是由该 node 的业务逻辑决定的,通常有两种方式:

一是通过解析报文的内容来决定,比如 ethernet-input 节点通过解析报文是 ipv4 还是 ipv6 报文把报文发送给ip4-input 或 ip6-input 去处理;

二是通过查找转发表来决定,比如 ip4-lookup 通过查找路由表来决定把报文发送给 ip4-arp 还是 ip4-rewrite 节点来处理。

2.找到该孩子节点对应的用于存放报文的结构 vlib_frame_t

根据 node 图的初始化流程可知,确定把报文发送给第几个孩子节点之后,就可以获取该孩子节点对应的 vlib_next_frame_t 结构了。vlib_next_frame_t 结构中的 frame_index 字段指向vlib_frame_t 结构,vlib_frame_t 结构是存放报文索引的地方(也就vlib_buffer_t结构索引)。

3.把报文放入 vlib_frame_t 结构

vlib_frame_t 结构最后一个字段为可变数组,初始化的时候会分配一块内存,用于存放报文的索引。

4.创建 vlib_pending_frame_t 结构,并把它加入数组 vm->node_main. pending_frames 等待调度vlib_pending_frame_t 结构记录报文所在的结构 vlib_next_frame_t 的 index,以及处理这些报文的node 的 vlib_node_runtime_t 结构的索引,这样通过 vlib_pending_frame_t 结构里面的信息就可以把报文分发给指定的 node 处理了。

下面以ipv4 feature arc来介绍internal类型调度:

1、ipv4_input_node节点注册(hdp\src\vnet\ip\ip4_input.c)

代码语言:javascript
复制
/* ip_input_node节点注册 */
VLIB_REGISTER_NODE (ip4_input_node) =
{
    .function = ip4_input,
    .name = "ip4-input",
    .vector_size = sizeof (u32),
    .n_errors = IP4_N_ERROR,
    .error_strings = ip4_error_strings,
    .n_next_nodes = IP4_INPUT_N_NEXT,
    .next_nodes = {
        [IP4_INPUT_NEXT_DROP] = "error-drop",
        [IP4_INPUT_NEXT_PUNT] = "error-punt",
        [IP4_INPUT_NEXT_LOOKUP] = "ip4-lookup",
        [IP4_INPUT_NEXT_LOOKUP_MULTICAST] = "mcast4-lookup",
        [IP4_INPUT_NEXT_ICMP_ERROR] = "ip4-icmp-error",
    },
    .format_buffer = format_ip4_header,
    .format_trace = format_ip4_input_trace,
};

2、节点处理函数处理: 函数vlib_get_next_frame功能是:找到该孩子节点对应的用于存放报文的结构 vlib_frame_t, 返回 :to_next :返回当前要存放报文索引位置,n_left_to_next:存放报文剩余空间大小。

代码语言:javascript
复制
  vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next);

函数vlib_put_next_frame ,将报文挂接到先将报文挂到加入数组 vm->node_main.pending_frames 等待调度处理

代码语言:javascript
复制
vlib_put_next_frame (vm, node, next_index, n_left_to_next);

3、vm->node_main.pending_frames调度

代码语言:javascript
复制
/* internal 类型的node调度*/
      for (i = 0; i < _vec_len (nm->pending_frames); i++)
  cpu_time_now = dispatch_pending_node (vm, i, cpu_time_now);
      /* Reset pending vector for next iteration. */
      _vec_len (nm->pending_frames) = 0;
  • 5 process类型node调度流程

1、调度相关执行流程介绍

1)第一次调度所有的 process 执行的代码:dispatch_process 函数

process类型节点只能在main线程运行,首先执行在vlib_main_or_worker_loop函数中调用

主要功能:设置挂起点,调用处理函数。

dispatch_process->vlib_process_startup(设置挂起点)->()p->node_runtime->function

代码语言:javascript
复制
if (is_main)
    {
      uword i;
      nm->current_process_index = ~0;
    /*遍历所有process类型node*/
      for (i = 0; i < vec_len (nm->processes); i++)
  cpu_time_now = dispatch_process (vm, nm->processes[i], /* frame */ 0,
           cpu_time_now);
    } 

2)恢复挂起的 process 的代码:dispatch_suspended_process 函数

需要恢复执行的 process 有两种原因:一是等待的时钟已经到时(即定时器到期),二是等待的事件已经发生。下面我们介绍过 process 等待时钟的时候会把自己加入定时器,让定时器到时后调度它重新执行,对应的代码在 main线程主循环里:

代码语言:javascript
复制
if (is_main)
   {
    /* Check if process nodes have expired from timing wheel. */
    ASSERT (nm->data_from_advancing_timing_wheel != 0);
      /*获取所有超时的 process,即数组 data_from_advancing_timing_wheel 中存放
        的是所有超时的 process 的 index。Process 等待的事件发生时则是通过
        vlib_process_signal_event_helper 函数向该数组里 data_from_advancing_timing_wheel 
        加入自己的index 来实现恢复执行的。也就是说,不管是因为等待的时钟到来还
       是因为等待的事件发生了,都需要把自己的 index 加入数组 data_from_advancing_timing_wheel
       才能实现恢复执行。*/
    nm->data_from_advancing_timing_wheel =
      TW (tw_timer_expire_timers_vec)
      ((TWT (tw_timer_wheel) *) nm->timing_wheel, vlib_time_now (vm),
       nm->data_from_advancing_timing_wheel);

    ASSERT (nm->data_from_advancing_timing_wheel != 0);

    if (PREDICT_FALSE
        (_vec_len (nm->data_from_advancing_timing_wheel) > 0))
      {
        uword i;

      processes_timing_wheel_data:
        for (i = 0; i < _vec_len (nm->data_from_advancing_timing_wheel);
       i++)
    {
      u32 d = nm->data_from_advancing_timing_wheel[i];
      u32 di = vlib_timing_wheel_data_get_index (d);
         /*其实加入数组 data_from_advancing_timing_wheel 中的不是 process 的 index,
          而是 process 的 index 左移一位再或上 0 或 1,1 表示该 process 等待的是时间事件
          (timed even),1 当前没用,所以最后一位总是 0(等待时间发生)。
          */
      if (vlib_timing_wheel_data_is_timed_event (d))
        {
          
          vlib_signal_timed_event_data_t *te =
      pool_elt_at_index (nm->signal_timed_event_data_pool,
             di);
          vlib_node_t *n =
      vlib_get_node (vm, te->process_node_index);
          vlib_process_t *p =
      vec_elt (nm->processes, n->runtime_index);
          void *data;
        /*时间事件调用vlib_process_signal_event_helper,转成等待时间已经发生,下一次来处理*/
          data =
      vlib_process_signal_event_helper (nm, n, p,
                te->event_type_index,
                te->n_data_elts,
                te->n_data_elt_bytes);
          if (te->n_data_bytes < sizeof (te->inline_event_data))
      clib_memcpy (data, te->inline_event_data,
             te->n_data_bytes);
          else
      {
        clib_memcpy (data, te->event_data_as_vector,
               te->n_data_bytes);
        vec_free (te->event_data_as_vector);
      }
          pool_put (nm->signal_timed_event_data_pool, te);
        }
      else
        {
          cpu_time_now = clib_cpu_time_now ();
        /*等待时间恢复,调用dispatch_suspended_process*/
          cpu_time_now =
      dispatch_suspended_process (vm, di, cpu_time_now);
        }
    }
        _vec_len (nm->data_from_advancing_timing_wheel) = 0;
      }
  }
      .................
      /*mian线程判断时钟轮触发是否有数据,如果有直接调转处理*/
      if (is_main && _vec_len (nm->data_from_advancing_timing_wheel) > 0)
  goto processes_timing_wheel_data;

dispatch_suspended_process->vlib_process_resume 函数来恢复到挂起点。

代码语言:javascript
复制
static_always_inline uword
vlib_process_resume (vlib_process_t * p)
{
  uword r;
  p->flags &= ~(VLIB_PROCESS_IS_SUSPENDED_WAITING_FOR_CLOCK
    | VLIB_PROCESS_IS_SUSPENDED_WAITING_FOR_EVENT
    | VLIB_PROCESS_RESUME_PENDING);
  r = clib_setjmp (&p->return_longjmp, VLIB_PROCESS_RETURN_LONGJMP_RETURN);
  if (r == VLIB_PROCESS_RETURN_LONGJMP_RETURN)
     /*恢复到挂起点*/
    clib_longjmp (&p->resume_longjmp, VLIB_PROCESS_RESUME_LONGJMP_RESUME);
  return r;
}

而挂起的原因有等待事件和等待时钟,分别对应的函数是vlib_process_wait_for_event 和 vlib_process_wait_for_event_or_clock。

代码语言:javascript
复制
always_inline uword *
vlib_process_wait_for_event (vlib_main_t * vm)
{
  vlib_node_main_t *nm = &vm->node_main;
  vlib_process_t *p;
  uword r;
  /*
  判断事件位图 non_empty_event_type_bitmap 是否为空,如果不为空则表示有事件需要处理,直
  接返回该事件位图 non_empty_event_type_bitmap。否则表示没有事件可以处理,此时需要把自
  己挂起等待事件发生,先调用 clib_setjmp 设置事件发生后恢复执行时的“返回点”,再调用
  clib_longjmp 跳转到调度该 process 之前设置的“返回点”,即跳出该 process 的执行回到 main
  的主循环中。设置标志位 VLIB_PROCESS_IS_SUSPENDED_WAITING_FOR_EVENT 表示是因
  为等待事件而挂起的。
  */
  p = vec_elt (nm->processes, nm->current_process_index);
  if (clib_bitmap_is_zero (p->non_empty_event_type_bitmap))
    {
      p->flags |= VLIB_PROCESS_IS_SUSPENDED_WAITING_FOR_EVENT;
      r =
  clib_setjmp (&p->resume_longjmp, VLIB_PROCESS_RESUME_LONGJMP_SUSPEND);
      if (r == VLIB_PROCESS_RESUME_LONGJMP_SUSPEND)
  clib_longjmp (&p->return_longjmp,
          VLIB_PROCESS_RETURN_LONGJMP_SUSPEND);
    }

  return p->non_empty_event_type_bitmap;
}

always_inline f64
vlib_process_wait_for_event_or_clock (vlib_main_t * vm, f64 dt)
{
  vlib_node_main_t *nm = &vm->node_main;
  vlib_process_t *p;
  f64 wakeup_time;
  uword r;

  p = vec_elt (nm->processes, nm->current_process_index);

  if (vlib_process_suspend_time_is_zero (dt)
      || !clib_bitmap_is_zero (p->non_empty_event_type_bitmap))
    return dt;

  wakeup_time = vlib_time_now (vm) + dt;

  /* Suspend waiting for both clock and event to occur. */
  p->flags |= (VLIB_PROCESS_IS_SUSPENDED_WAITING_FOR_EVENT
         | VLIB_PROCESS_IS_SUSPENDED_WAITING_FOR_CLOCK);
  r = clib_setjmp (&p->resume_longjmp, VLIB_PROCESS_RESUME_LONGJMP_SUSPEND);
  if (r == VLIB_PROCESS_RESUME_LONGJMP_SUSPEND)
    {
        /*设置恢复时钟间隔*/
      p->resume_clock_interval = dt * 1e5;
      /*跳转到main主线程中,返回挂起标识位*/
      clib_longjmp (&p->return_longjmp, VLIB_PROCESS_RETURN_LONGJMP_SUSPEND);
    }

  /* Return amount of time still left to sleep.
     If <= 0 then we've been waken up by the clock (and not an event). */
  return wakeup_time - vlib_time_now (vm);
}

介绍事件机制之前,先来了解一下几个相关的数据结构:

每个 process 都可以定义自己的事件类型,如命令行 process 的事件类型为:每种事件类型对应一个任意数值,可以是从 0 开始也可以从任意数开始。向 process 发送事件的时候要指定事件的类型,以及需要该 process 处理的该事件对应的数据。事件数据的存储结构为数组 pending_event_data_by_type_index,下标是事件类型的值。由于事件类型可以指定定义且其值可以为任意值,因此需要一张映射表来把自定义的所有事件的值映射成从 0 开始的连续的值,这里用的映射表为哈希表 event_type_index_by_type_opaque,key是自定义的事件的值,value 为从 event_type_pool 池中分配的 elt 的索引,这个索引作为数组pending_event_data_by_type_index 的下标(后面称为事件类型索引),elt 存放的自定义的事件的值。同时,为了快速查找当前 process 有是否有时间或者有哪些事件需要处理,使用一个位图 non_empty_event_type_bitmap 来记录当前需要处理事件,事件类型索引对应的 bit 置 1 时表

示该事件等待被处理,否则不是。

向某个 process 发送事件的过称为:

1、 先在哈希表 event_type_index_by_type_opaque 中查找该事件对应的事件类型索引,如果找

不到则从 event_type_pool 池分配一个索引作为该事件的事件类型索引

2、 以事件类型索引为下标,把事件对应的数据加入数组 pending_event_data_by_type_index 中

3、 在位图 non_empty_event_type_bitmap 中设置事件类型索引对应的 bit 位。

代码语言:javascript
复制
/*
参数 node_index 为 process 对应的 node 的 index,
type_opaque 为自定义的事件的类型,
data 为该事件对应的数据,
直接调用 vlib_process_signal_event_data 函数发送事件,返回值 d 为存放事
件数据的位置,直接把 data 写进去即可。
*/
always_inline void
vlib_process_signal_event (vlib_main_t * vm,
         uword node_index, uword type_opaque, uword data)
{
  uword *d = vlib_process_signal_event_data (vm, node_index, type_opaque,
               1 /* elts */ , sizeof (uword));
  d[0] = data;
}

2、以BFD process为例学习process调度模型用法

1、bfd process类型node节点注册

代码语言:javascript
复制
/*BFD process类型node节点注册*/
 VLIB_REGISTER_NODE (bfd_process_node, static) = {
  .function = bfd_process,
  .type = VLIB_NODE_TYPE_PROCESS,
  .name = "bfd-process",
  .n_next_nodes = 0,
  .next_nodes = {},
};

2、bfd_process函数处理流程

代码语言:javascript
复制
/*
 * bfd process node function
 */
static uword
bfd_process (vlib_main_t * vm, vlib_node_runtime_t * rt, vlib_frame_t * f)
{
  bfd_main_t *bm = &bfd_main;
  u32 *expired = 0;
  uword event_type, *event_data = 0;
  /* So we can send events to the bfd process 
       保存BFD process node的索引
  */
  bm->bfd_process_node_index = bfd_process_node.index;

  while (1)
    {
      u64 now = clib_cpu_time_now ();
      u64 next_expire = timing_wheel_next_expiring_elt_time (&bm->wheel);
      BFD_DBG ("timing_wheel_next_expiring_elt_time(%p) returns %lu",
         &bm->wheel, next_expire);
      if ((i64) next_expire < 0)
  { /*还没有过期,等待事件*/
    BFD_DBG ("wait for event without timeout");
    /*等待事件到来,如果没有事件,设置等待事件方式挂起。
         这样就会跳转到dispatch_process->vlib_process_startup 返回
         VLIB_PROCESS_RETURN_LONGJMP_SUSPEND挂起动作。
    */
    (void) vlib_process_wait_for_event (vm);
    event_type = vlib_process_get_events (vm, &event_data);
  }
      else
  {
    f64 timeout = ((i64) next_expire - (i64) now) / bm->cpu_cps;
    BFD_DBG ("wait for event with timeout %.02f", timeout);
    if (timeout < 0)
      {
        BFD_DBG ("negative timeout, already expired, skipping wait");
        event_type = ~0;
      }
    else
      {
        /* 如果有事件,函数vlib_process_wait_for_event_or_clock,直接返回,处理事件。
             如果没有事件,设置flags成挂起等待事件和等待时钟。跳转到dispatch_process->vlib_process_startup
             函数中 返回VLIB_PROCESS_RETURN_LONGJMP_SUSPEND再次挂起动作。
      */
        (void) vlib_process_wait_for_event_or_clock (vm, timeout);
        event_type = vlib_process_get_events (vm, &event_data);
      }
  }
      now = clib_cpu_time_now ();
      switch (event_type)
  {
  case ~0:    /* no events => timeout */
    /* nothing to do here */
    break;
  case BFD_EVENT_RESCHEDULE:
    /* nothing to do here - reschedule is done automatically after
     * each event or timeout */
    break;
  case BFD_EVENT_NEW_SESSION:
    if (!pool_is_free_index (bm->sessions, *event_data))
      {
        bfd_session_t *bs =
    pool_elt_at_index (bm->sessions, *event_data);
        bfd_send_periodic (vm, rt, bm, bs, now);
          
          bfd_on_config_change (vm, rt, bm, bs, now); /*add new*/
        bfd_set_timer (bm, bs, now, 1);
      }
    else
      {
        BFD_DBG ("Ignoring event for non-existent session index %u",
           (u32) * event_data);
      }
    break;
  case BFD_EVENT_CONFIG_CHANGED:
    if (!pool_is_free_index (bm->sessions, *event_data))
      {
        bfd_session_t *bs =
    pool_elt_at_index (bm->sessions, *event_data);
        bfd_on_config_change (vm, rt, bm, bs, now);
      }
    else
      {
        BFD_DBG ("Ignoring event for non-existent session index %u",
           (u32) * event_data);
      }
    break;
  default:
    clib_warning ("BUG: event type 0x%wx", event_type);
    break;
  }
      BFD_DBG ("advancing wheel, now is %lu", now);
      BFD_DBG ("timing_wheel_advance (%p, %lu, %p, 0);", &bm->wheel, now,
         expired);
      expired = timing_wheel_advance (&bm->wheel, now, expired, 0);
      BFD_DBG ("Expired %d elements", vec_len (expired));
      u32 *p = NULL;
      vec_foreach (p, expired)
      {
  const u32 bs_idx = *p;
  if (!pool_is_free_index (bm->sessions, bs_idx))
    {
      bfd_session_t *bs = pool_elt_at_index (bm->sessions, bs_idx);
      bfd_on_timeout (vm, rt, bm, bs, now);
      bfd_set_timer (bm, bs, now, 1);
    }
      }
      if (expired)
  {
    _vec_len (expired) = 0;
  }
      if (event_data)
  {
    _vec_len (event_data) = 0;
  }
    }

  return 0;
}

3、发送事件

代码语言:javascript
复制
/*创建一个新的bfd,通过vlib_process_signal_event 来发布一个事件,
   会调到bfd_process函数去处理
   */
void  bfd_session_start (bfd_main_t * bm, bfd_session_t * bs)
{
  BFD_DBG ("\nStarting session: %U", format_bfd_session, bs);
  bfd_set_effective_required_min_rx (bm, bs,
             bs->config_required_min_rx_clocks);
  bfd_recalc_tx_interval (bm, bs);
  bfd_calc_report_tx_interval(bm, bs);
  vlib_process_signal_event (bm->vlib_main, bm->bfd_process_node_index,
           BFD_EVENT_NEW_SESSION, bs->bs_idx);
  bfd_notify_listeners (bm, BFD_LISTEN_EVENT_CREATE, bs);
}

总结:

本文只是简单介绍了vpp的基本调度类型。并通过一些事例详细介绍了internal和preocess类型的基本用法和处理逻辑。这些都是最基本的。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-07-26,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 DPDK VPP源码分析 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档