基本实现原理
时间轮实现目前最多支持三个轮,外加一个OverFlow溢出任务链表。溢出任务链表是指超出三个轮的时间范围的任务,缓存在溢出链表中里面,当冷冻轮转完一轮后,遍历一次溢出列表,重新计算任务间隔时间。 下面是预处理阶段的对时间轮设置个数检查,必须在1,2,3范围内。
#if (TW_TIMER_WHEELS != 1 && TW_TIMER_WHEELS != 2 && TW_TIMER_WHEELS != 3)
#error TW_TIMER_WHEELS must be 1, 2 or 3
#endif
这里有个问题没有对溢出链表做判断,但是在代码中timer_add函数中可以任务只有时间轮等于3时才会支持溢出任务的添加,
#if TW_TIMER_WHEELS > 2
#if TW_OVERFLOW_VECTOR > 0
/*溢出任务添加算法。*/
triple_wrap_mask = (1 << (3 * TW_RING_SHIFT)) - 1;
interval_plus_time_to_wrap =
interval + (tw->current_tick & triple_wrap_mask);
if ((interval_plus_time_to_wrap >= 1 << (3 * TW_RING_SHIFT)))
{
t->expiration_time = tw->current_tick + interval;
ts = &tw->overflow;
timer_addhead (tw->timers, ts->head_index, t - tw->timers);
return;
}
#endif
...
#endif
按照下图的举例说明,假设设置时间间隔为1s 快速轮就是转一圈等于8秒; 慢速轮转一圈等于8×8秒; 冷冻轮转一圈等于8×8×8秒。 如果添加的任务时间,从当前开始计算,(interval + current_tick&8×8×8)>= 8×8×8,就缓存在溢出任务列表里面.
这里我们肯定会有疑问?比如添加一个任务间隔时间7s,interval=7s,而current_tick是510秒时,满足上述条件也会被添加到溢出任务列表里面?而不是直接添加到快速轮中。
1.每一个刻度上面都是一个任务列表,添加任务的时候通过计算间隔时间来判断将任务添加到那个轮的那个刻度上面。然后每滴答一次,都会检查当前刻度是否有任务需要执行; 2.如果快速轮转完一圈,慢速轮就是往下走一个刻度,然后判断将当前刻度里面的任务,如果没有剩余时间就执行任务,有剩余时间就添加到快速上对应刻度的任务列表; 3.同样的道理,慢速轮转完一圈,冷冻轮就会走一个刻度,执行上面2一样的动作; 4.处理溢出任务,如果冷冻轮转完一圈,就会遍历一次溢出任务列表,重新计算间隔时间,将任务添加到对应的轮上面。
1、假如当前2个轮,默认当前都是所有轮都指向0. 添加一个任务interval = 7,在fast_ring_offset= 7号槽的链表中。 添加一个任务interval = 20,在slow_ring_offset 2号,fast_ring_offset = 4. 计算过程 =2*8+4 =20 2、假如当前是3个轮,默认当前所有轮都指向0 添加一个任务interval = 90,glacier_ring_offset= 1;slow_ring_offset 3号,fast_ring_offset = 2. 1*8*8+3*8+2 = 90
时间轮记录任务的链表节点。
typedef struct
{
/** next, previous pool indices pool 索引,组成双向链表节点*/
u32 next;
u32 prev;
union
{
struct
{
#if (TW_TIMER_WHEELS == 3)
/** fast ring offset, only valid in the slow ring
临时记录计算当前数据在slow ring对应的槽位号*/
u16 fast_ring_offset;
/** slow ring offset, only valid in the glacier ring */
u16 slow_ring_offset;
#endif
#if (TW_TIMER_WHEELS == 2)
/** fast ring offset, only valid in the slow ring */
u16 fast_ring_offset;
/** slow ring offset, only valid in the glacier ring */
u16 pad;
#endif
};
#if (TW_OVERFLOW_VECTOR > 0)
u64 expiration_time;/*设置过期时间*/
#endif
};
/** user timer handle 用来存储用户数据,一般就是pool索引*/
u32 user_handle;
} TWT (tw_timer);
user_handle组成
时间轮每个槽位存储在双向链表表头节点在时间轮任务节点pool池中的索引。
typedef struct
{
/** Listhead of timers which expire in this interval */
u32 head_index;
} tw_timer_wheel_slot_t;
时间轮结构体,记录了时间轮定时器的所有信息,如时间轮数组、精确度、任务链表内存池、任务回调函数等
/*时间轮结构体*/
typedef struct
{
/** Timer pool 时间轮任务节点pool池*/
TWT (tw_timer) * timers;
/** Next time the wheel should run 下一次运行的时间*/
f64 next_run_time;
/** Last time the wheel ran 记录上次处理的时间,用于异常判断*/
f64 last_run_time;
/** Timer ticks per second 对应每秒的tick数 1.0/timer_interval*/
f64 ticks_per_second;
/** Timer interval, also needed to avoid fp divide in speed path
定期器init函数时设置的时间间隔,对应fast ring 一个槽位的时间,目前设置单位s*/
f64 timer_interval;
/** current tick 记录累计tick数,目前看用与溢出任务链表中*/
u64 current_tick;
/** current wheel indices 记录当前时间轮指向的槽位刻度*/
u32 current_index[TW_TIMER_WHEELS];
/** wheel arrays 时间轮数组,记录每个时间轮槽位的head的在Timer pool的索引*/
tw_timer_wheel_slot_t w[TW_TIMER_WHEELS][TW_SLOTS_PER_RING];
#if TW_OVERFLOW_VECTOR > 0 /*记录溢出时,head的在Timer pool的索引*/
tw_timer_wheel_slot_t overflow;
#endif
#if TW_FAST_WHEEL_BITMAP > 0
/** Fast wheel slot occupancy bitmap 快速轮槽占用位图,具体用途待分析*/
uword *fast_slot_bitmap;
#endif
/** expired timer callback, receives a vector of handles
过期回调函数*/
void (*expired_timer_callback) (u32 * expired_timer_handles);
/** vectors of expired timers 记录过期时间的用户数据
过期处理时,会把记录用户数据的pool资源释放。记录到临时取*/
u32 *expired_timer_handles;
/** maximum expirations 每次最大处理过期时间数量*/
u32 max_expirations;
/** current trace index trace trace 功能,目前不可用,存在编译问题*/
#if TW_START_STOP_TRACE_SIZE > 0
/* Start/stop/expire tracing */
u32 trace_index;
u32 trace_wrapped;
TWT (trace) traces[TW_START_STOP_TRACE_SIZE];
#endif
} TWT (tw_timer_wheel);
这里有个疑问? 为什么不直接存储 TWT (tw_timer) w[TW_TIMER_WHEELS][TW_SLOTS_PER_RING] 而是使用 tw_timer_wheel_slot_t w[TW_TIMER_WHEELS][TW_SLOTS_PER_RING] 从内存使用前者可能更省内存。
#define TW_TIMER_WHEELS 1 /*定义时间轮的个数*/
#define TW_SLOTS_PER_RING 2048 /*定义时间轮的槽位数,必须时2的次幂,使用位运算*/
#define TW_RING_SHIFT 11 /*2的11次幂 2048*/
#define TW_RING_MASK (TW_SLOTS_PER_RING -1) /*时间轮槽位数掩码*/
#define TW_TIMERS_PER_OBJECT 2 /*timer id的最大大小,小于此数,有使用者管理*/
#define LOG2_TW_TIMERS_PER_OBJECT 1 /*timer id 占用的bit数*/
#define TW_SUFFIX _2t_1w_2048sl /*结构体及函数后缀,*/
#define TW_FAST_WHEEL_BITMAP 0 /*是否记录快速轮bitmap*/
#define TW_TIMER_ALLOW_DUPLICATE_STOP 0 /*用户存在的hander 索引是否需要做合法性检测*/
#define TW_START_STOP_TRACE_SIZE 0 /*trace 功能,目前存在编译问题*/
#include <vppinfra/tw_timer_template.h>
建议:TW_TIMER_ALLOW_DUPLICATE_STOP 设置为 0 建议使用者使用下面函数在外层做安全性检查。 int TW (tw_timer_handle_is_free) (TWT (tw_timer_wheel) * tw, u32 handle);
/* 参数说明:
* TWT (tw_timer_wheel) * tw, 定时器主结构体;
* void *expired_timer_callback, 过期回调函数;
* f64 timer_interval_in_seconds, 每次滴答的间隔时间;
* u32 max_expirations, 一次回调后,到期执行的最大任务个数。
*/
void
TW (tw_timer_wheel_init) (TWT (tw_timer_wheel) * tw,
void *expired_timer_callback,
f64 timer_interval_in_seconds, u32 max_expirations)
{
int ring, slot;
tw_timer_wheel_slot_t *ts;
TWT (tw_timer) * t;
/*初始化主结构体 tw,赋值过期回调函数及每次最大处理过期任务数*/
clib_memset (tw, 0, sizeof (*tw));
tw->expired_timer_callback = expired_timer_callback;
tw->max_expirations = max_expirations;
if (timer_interval_in_seconds == 0.0)
{
clib_warning ("timer interval is zero");
abort ();
}
/*设置时间精度,每秒ticks数*/
tw->timer_interval = timer_interval_in_seconds;
tw->ticks_per_second = 1.0 / timer_interval_in_seconds;
vec_validate (tw->expired_timer_handles, 0);
_vec_len (tw->expired_timer_handles) = 0;
/*初始化时间轮任务槽*/
for (ring = 0; ring < TW_TIMER_WHEELS; ring++)
{
for (slot = 0; slot < TW_SLOTS_PER_RING; slot++)
{
ts = &tw->w[ring][slot];
pool_get (tw->timers, t);
clib_memset (t, 0xff, sizeof (*t));
t->next = t->prev = t - tw->timers;
ts->head_index = t - tw->timers;
}
}
/*开启溢出任务链时,初始化溢出轮任务列表*/
#if TW_OVERFLOW_VECTOR > 0
ts = &tw->overflow;
pool_get (tw->timers, t);
clib_memset (t, 0xff, sizeof (*t));
t->next = t->prev = t - tw->timers;
ts->head_index = t - tw->timers;
#endif
}
/* 参数说明:
* tw, 定时器主结构体;
* u32 user_id, 用户id;
* u32 timer_id, 定时器id;
* u64 interval; 间隔时间,不能为0。
* 返回值:返回当前任务所在内存池的索引,可以当初当前任务的句柄,可供后面使用。
*/
u32 TW (tw_timer_start) (TWT (tw_timer_wheel) * tw, u32 user_id, u32 timer_id,
u64 interval)
{
TWT (tw_timer) * t;
/*interval 间隔时间不能为0*/
ASSERT (interval);
/*从内存池获取任务节点,并初始化全FF*/
pool_get (tw->timers, t);
clib_memset (t, 0xff, sizeof (*t));
/*将user_id和timer_id 拼接user_handle*/
t->user_handle = TW (make_internal_timer_handle) (user_id, timer_id);
/*添加到任务队列*/
timer_add (tw, t, interval);
/*返回当前任务在内存池的索引*/
return t - tw->timers;
}
以快速轮为例,介绍添加任务队列
static inline void
timer_add (TWT (tw_timer_wheel) * tw, TWT (tw_timer) * t, u64 interval)
{
u16 fast_ring_offset;
tw_timer_wheel_slot_t *ts;
/*计算在快速轮的槽位号*/
fast_ring_offset = interval & TW_RING_MASK;
/*基于当前指向的槽位号重新计算,任务要放在那个任务槽内*/
fast_ring_offset += tw->current_index[TW_TIMER_RING_FAST] & TW_RING_MASK;
/*重新计算放在快速轮那个槽位 fast_ring_offset大于最大支持槽位*/
fast_ring_offset %= TW_SLOTS_PER_RING;
/*获取对应槽位双向链表头*/
ts = &tw->w[TW_TIMER_RING_FAST][fast_ring_offset];
/*当前任务添加到任务链表中*/
timer_addhead (tw->timers, ts->head_index, t - tw->timers);
}
/* 参数说明:
* TWT (tw_timer_wheel) * tw, 定时器主结构体;
* u32 handle,任务句柄,tw_timer_start函数的返回值;
*/
void TW (tw_timer_stop) (TWT (tw_timer_wheel) * tw, u32 handle)
{
TWT (tw_timer) * t;
#if TW_TIMER_ALLOW_DUPLICATE_STOP
/*判断当前任务是否已经停止,停止直接返回*/
if (pool_is_free_index (tw->timers, handle))
return;
#endif
/*获取对应的任务*/
t = pool_elt_at_index (tw->timers, handle);
/* in case of idiotic handle (e.g. passing a listhead index) */
ASSERT (t->user_handle != ~0);
/*从任务链表中删除*/
timer_remove (tw->timers, t);
/*资源释放进内存池*/
pool_put_index (tw->timers, handle);
}
/* 参数说明:
* TWT (tw_timer_wheel) * tw, 定时器主结构体;
* u32 handle,任务句柄,tw_timer_start函数的返回值;
* u64 interval,需要更新的时间间隔。
* 功能说明:
* 从轮子的任务队列里面移除任务节点;
* 在通过interval重新计算出任务所在轮子的任务列表并添加到队列里面。
*/
void TW (tw_timer_update) (TWT (tw_timer_wheel) * tw, u32 handle,
u64 interval)
{
TWT (tw_timer) * t;
t = pool_elt_at_index (tw->timers, handle);
/*从轮子的任务队列里面移除任务节点;*/
timer_remove (tw->timers, t);
/*在通过interval重新计算出任务所在轮子的任务列表并添加到队列里面。*/
timer_add (tw, t, interval);
}
/* 参数说明:
* TWT (tw_timer_wheel) * tw, 定时器主结构体;
* f64 now,当前运行时间,vpp里面通过vlib_time_now接口获取到;
* u32 * vec,保存到期任务的动态数组。
* 返回值:返回到期任务动态数组首地址。
* 功能说明:
* 判断溢出队列的任务添加到对应轮子的任务队列;
* 将冷冻轮子到期的任务添加到慢速轮子的队列里面去;
* 将慢速轮子到期的任务添加到快速轮子的队列里面去;
* 将到期的任务添加到到期任务动态数组返回给用户,或者通过回调通知用户。
*/
u32 *TW (tw_timer_expire_timers) (TWT (tw_timer_wheel) * tw, f64 now)
{
return TW (tw_timer_expire_timers_internal) (tw, now, 0 /* no vector */ );
}
u32 *TW (tw_timer_expire_timers_vec) (TWT (tw_timer_wheel) * tw, f64 now,
u32 * vec)
{
return TW (tw_timer_expire_timers_internal) (tw, now, vec);
};
这两个接口的效果是一样的,都是获取到期的任务给用户;两个接口都是TW (tw_timer_expire_timers_internal) 函数的封装; 区别1是u32 *vec参数的差异,表示保持任务的数组是用户提供,还是时间轮内tw->expired_timer_handles数组; 区别2是tw_timer_expire_timers是会判断回调是否存在,如果存在通过回调函数将过去任务通知给用户。
注意: 有一点就是是tw_timer_expire_timers_vec 函数传入vec和返回vec的地址可能是不一样的。这里需要特别小心,否则就会造成非法内存的访问或者内存泄漏的情况。
定时调度过期任务函数,大概的代码逻辑如下: 1.判断溢出队列的任务添加到对应轮子的任务队列; 2.将冷冻轮子到期的任务添加到慢速轮子的队列里面去; 3.将慢速轮子到期的任务添加到快速轮子的队列里面去; 4.将到期的任务添加到到期任务动态数组返回给用户,或者通过回调通知用户。
这里就解答前面的疑问了,主要是处理过期任务是完全判断三个轮都转一圈,才会处理溢出任务链表。并且根据剩余时间多少,判断需要再次放入到那个时间轮对应任务链表上。
static inline
u32 * TW (tw_timer_expire_timers_internal) (TWT (tw_timer_wheel) * tw,
f64 now,
u32 * callback_vector_arg)
{
u32 nticks, i;
tw_timer_wheel_slot_t *ts;
TWT (tw_timer) * t, *head;
u32 *callback_vector;
u32 fast_wheel_index;
u32 next_index;
u32 slow_wheel_index __attribute__ ((unused));
u32 glacier_wheel_index __attribute__ ((unused));
/*调用太快了,无法处理新的计时器到期任务 */
if (PREDICT_FALSE (now < tw->next_run_time))
return callback_vector_arg;
/* 计算当前的tick数量*/
nticks = tw->ticks_per_second * (now - tw->last_run_time);
if (nticks == 0)
return callback_vector_arg;
/* 记录下次调度的任务时间 */
tw->next_run_time = (now + tw->timer_interval);
/* 首次调用时,需要更新last_run_time */
if (PREDICT_FALSE
((tw->last_run_time == 0.0) || (now <= tw->last_run_time)))
{
tw->last_run_time = now;
return callback_vector_arg;
}
/*到期任务数值为空时,使用tw自带的*/
if (callback_vector_arg == 0)
{
_vec_len (tw->expired_timer_handles) = 0;
callback_vector = tw->expired_timer_handles;
}
else
callback_vector = callback_vector_arg;
for (i = 0; i < nticks; i++)
{
/*获取当前每个轮的槽位号*/
fast_wheel_index = tw->current_index[TW_TIMER_RING_FAST];
if (TW_TIMER_WHEELS > 1)
slow_wheel_index = tw->current_index[TW_TIMER_RING_SLOW];
if (TW_TIMER_WHEELS > 2)
glacier_wheel_index = tw->current_index[TW_TIMER_RING_GLACIER];
#if TW_OVERFLOW_VECTOR > 0
/* 需要等到三个轮都转一圈,才处理溢出任务链,前面遗留的问题原因*/
if (PREDICT_FALSE (fast_wheel_index == TW_SLOTS_PER_RING
&& slow_wheel_index == TW_SLOTS_PER_RING
&& glacier_wheel_index == TW_SLOTS_PER_RING))
{
u64 interval;
u32 new_glacier_ring_offset, new_slow_ring_offset;
u32 new_fast_ring_offset;
ts = &tw->overflow;
head = pool_elt_at_index (tw->timers, ts->head_index);
next_index = head->next;
/* 清空溢出任务链表 */
head->next = head->prev = ts->head_index;
/* traverse slot, place timers wherever they go */
while (next_index != head - tw->timers)
{
t = pool_elt_at_index (tw->timers, next_index);
next_index = t->next;
/* Remove from the overflow vector (hammer) */
t->next = t->prev = ~0;
ASSERT (t->expiration_time >= tw->current_tick);
/*计算剩余任务时间*/
interval = t->expiration_time - tw->current_tick;
/*剩余任务时间仍大于冷冻轮时间,放入溢出任务链表*/
if (interval >= (1 << (3 * TW_RING_SHIFT)))
{
ts = &tw->overflow;
timer_addhead (tw->timers, ts->head_index, t - tw->timers);
continue;
}
/* 计算冷冻轮、慢轮、快速轮的槽位号 */
new_glacier_ring_offset = interval >> (2 * TW_RING_SHIFT);
interval -= (new_glacier_ring_offset << (2 * TW_RING_SHIFT));
/* Note: the wheels are at (0,0,0), no add-with-carry needed */
new_slow_ring_offset = interval >> TW_RING_SHIFT;
interval -= (new_slow_ring_offset << TW_RING_SHIFT);
new_fast_ring_offset = interval & TW_RING_MASK;
t->slow_ring_offset = new_slow_ring_offset;
t->fast_ring_offset = new_fast_ring_offset;
/* 全部都为0,说明任务时间到期了 */
if (PREDICT_FALSE (t->slow_ring_offset == 0 &&
t->fast_ring_offset == 0 &&
new_glacier_ring_offset == 0))
{
vec_add1 (callback_vector, t->user_handle);
#if TW_START_STOP_TRACE_SIZE > 0
TW (tw_timer_trace) (tw, 0xfe, t->user_handle,
t - tw->timers);
#endif
pool_put (tw->timers, t);
}
/* 任务放进冷冻轮 */
else if (new_glacier_ring_offset)
{
ts = &tw->w[TW_TIMER_RING_GLACIER][new_glacier_ring_offset];
timer_addhead (tw->timers, ts->head_index, t - tw->timers);
}
/* 任务放入慢速轮*/
else if (t->slow_ring_offset)
{
/* Add to slow ring */
ts = &tw->w[TW_TIMER_RING_SLOW][t->slow_ring_offset];
timer_addhead (tw->timers, ts->head_index, t - tw->timers);
}
/* 任务放入快速轮 */
else
{
ts = &tw->w[TW_TIMER_RING_FAST][t->fast_ring_offset];
timer_addhead (tw->timers, ts->head_index, t - tw->timers);
#if TW_FAST_WHEEL_BITMAP
tw->fast_slot_bitmap =
clib_bitmap_set (tw->fast_slot_bitmap,
t->fast_ring_offset, 1);
#endif
}
}
}
#endif
#if TW_TIMER_WHEELS > 2
/*
* Double odometer-click? Process one slot in the glacier ring...
*/
if (PREDICT_FALSE (fast_wheel_index == TW_SLOTS_PER_RING
&& slow_wheel_index == TW_SLOTS_PER_RING))
{
glacier_wheel_index %= TW_SLOTS_PER_RING;
ts = &tw->w[TW_TIMER_RING_GLACIER][glacier_wheel_index];
head = pool_elt_at_index (tw->timers, ts->head_index);
next_index = head->next;
/* Make slot empty */
head->next = head->prev = ts->head_index;
/* traverse slot, deal timers into slow ring */
while (next_index != head - tw->timers)
{
t = pool_elt_at_index (tw->timers, next_index);
next_index = t->next;
/* Remove from glacier ring slot (hammer) */
t->next = t->prev = ~0;
/* Timer expires Right Now */
if (PREDICT_FALSE (t->slow_ring_offset == 0 &&
t->fast_ring_offset == 0))
{
vec_add1 (callback_vector, t->user_handle);
#if TW_START_STOP_TRACE_SIZE > 0
TW (tw_timer_trace) (tw, 0xfe, t->user_handle,
t - tw->timers);
#endif
pool_put (tw->timers, t);
}
/* Timer expires during slow-wheel tick 0 */
else if (PREDICT_FALSE (t->slow_ring_offset == 0))
{
ts = &tw->w[TW_TIMER_RING_FAST][t->fast_ring_offset];
timer_addhead (tw->timers, ts->head_index, t - tw->timers);
#if TW_FAST_WHEEL_BITMAP
tw->fast_slot_bitmap =
clib_bitmap_set (tw->fast_slot_bitmap,
t->fast_ring_offset, 1);
#endif
}
else /* typical case */
{
/* Add to slow ring */
ts = &tw->w[TW_TIMER_RING_SLOW][t->slow_ring_offset];
timer_addhead (tw->timers, ts->head_index, t - tw->timers);
}
}
}
#endif
#if TW_TIMER_WHEELS > 1
/*
* Single odometer-click? Process a slot in the slow ring,
*/
if (PREDICT_FALSE (fast_wheel_index == TW_SLOTS_PER_RING))
{
slow_wheel_index %= TW_SLOTS_PER_RING;
ts = &tw->w[TW_TIMER_RING_SLOW][slow_wheel_index];
head = pool_elt_at_index (tw->timers, ts->head_index);
next_index = head->next;
/* Make slot empty */
head->next = head->prev = ts->head_index;
/* traverse slot, deal timers into fast ring */
while (next_index != head - tw->timers)
{
t = pool_elt_at_index (tw->timers, next_index);
next_index = t->next;
/* Remove from sloe ring slot (hammer) */
t->next = t->prev = ~0;
/* Timer expires Right Now */
if (PREDICT_FALSE (t->fast_ring_offset == 0))
{
vec_add1 (callback_vector, t->user_handle);
#if TW_START_STOP_TRACE_SIZE > 0
TW (tw_timer_trace) (tw, 0xfe, t->user_handle,
t - tw->timers);
#endif
pool_put (tw->timers, t);
}
else /* typical case */
{
/* Add to fast ring */
ts = &tw->w[TW_TIMER_RING_FAST][t->fast_ring_offset];
timer_addhead (tw->timers, ts->head_index, t - tw->timers);
#if TW_FAST_WHEEL_BITMAP
tw->fast_slot_bitmap =
clib_bitmap_set (tw->fast_slot_bitmap,
t->fast_ring_offset, 1);
#endif
}
}
}
#endif
/* Handle the fast ring 获取当前快速轮需要处理的槽位号*/
fast_wheel_index %= TW_SLOTS_PER_RING;
ts = &tw->w[TW_TIMER_RING_FAST][fast_wheel_index];
head = pool_elt_at_index (tw->timers, ts->head_index);
next_index = head->next;
/* Make slot empty 清空快速轮*/
head->next = head->prev = ts->head_index;
/* Construct vector of expired timer handles to give the user
处理所有的已到期任务*/
while (next_index != ts->head_index)
{
t = pool_elt_at_index (tw->timers, next_index);
next_index = t->next;
vec_add1 (callback_vector, t->user_handle);
#if TW_START_STOP_TRACE_SIZE > 0
TW (tw_timer_trace) (tw, 0xfe, t->user_handle, t - tw->timers);
#endif
pool_put (tw->timers, t);
}
/* If any timers expired, tell the user */
if (callback_vector_arg == 0 && vec_len (callback_vector))
{
/* The callback is optional. We return the u32 * handle vector */
if (tw->expired_timer_callback)
{
tw->expired_timer_callback (callback_vector);
vec_reset_length (callback_vector);
}
tw->expired_timer_handles = callback_vector;
}
#if TW_FAST_WHEEL_BITMAP
tw->fast_slot_bitmap = clib_bitmap_set (tw->fast_slot_bitmap,
fast_wheel_index, 0);
#endif
/*累计tick计数*/
tw->current_tick++;
fast_wheel_index++;
/*更新快速轮下一次要处理的槽位号*/
tw->current_index[TW_TIMER_RING_FAST] = fast_wheel_index;
#if TW_TIMER_WHEELS > 1
if (PREDICT_FALSE (fast_wheel_index == TW_SLOTS_PER_RING))
slow_wheel_index++;
tw->current_index[TW_TIMER_RING_SLOW] = slow_wheel_index;
#endif
#if TW_TIMER_WHEELS > 2
if (PREDICT_FALSE (slow_wheel_index == TW_SLOTS_PER_RING))
glacier_wheel_index++;
tw->current_index[TW_TIMER_RING_GLACIER] = glacier_wheel_index;
#endif
if (vec_len (callback_vector) >= tw->max_expirations)
break;
}
if (callback_vector_arg == 0)
tw->expired_timer_handles = callback_vector;
tw->last_run_time += i * tw->timer_interval;
return callback_vector;
}
/*该接口很简单,主要是释放资源*/
void TW (tw_timer_wheel_free) (TWT (tw_timer_wheel) * tw)
{
int i, j;
tw_timer_wheel_slot_t *ts;
TWT (tw_timer) * head, *t;
u32 next_index;
/*遍历任务链表,将相应的任务全部释放到pool资源池*/
for (i = 0; i < TW_TIMER_WHEELS; i++)
{
for (j = 0; j < TW_SLOTS_PER_RING; j++)
{
ts = &tw->w[i][j];
head = pool_elt_at_index (tw->timers, ts->head_index);
next_index = head->next;
while (next_index != ts->head_index)
{
t = pool_elt_at_index (tw->timers, next_index);
next_index = t->next;
pool_put (tw->timers, t);
}
pool_put (tw->timers, head);
}
}
#if TW_OVERFLOW_VECTOR > 0
/*溢出任务链表资源释放*/
ts = &tw->overflow;
head = pool_elt_at_index (tw->timers, ts->head_index);
next_index = head->next;
while (next_index != ts->head_index)
{
t = pool_elt_at_index (tw->timers, next_index);
next_index = t->next;
pool_put (tw->timers, t);
}
pool_put (tw->timers, head);
#endif
/*这里居然直接把tw初始化了,why?
*tw->timers tw->expired_timer_handles 资源还没有释放呢?*/
clib_memset (tw, 0, sizeof (*tw));
}
这个释放函数存在内存泄漏,使用者一定要注意了。 不过大部分的应用场景是全局有效的,不会调用释放接口。
vpp在tw timer提供了一套模板,用户可以根据自己的业务设置不同的时间精度.
定义自己使用的时间轮,方便扩展。 下面是定义自己的test_tw_timer.h头文件
#ifndef __TEST_TW_TIME_H__
#define __TEST_TW_TIME_H__
/* ... So that a client app can create multiple wheel geometries */
#undef TW_TIMER_WHEELS
#undef TW_SLOTS_PER_RING
#undef TW_RING_SHIFT
#undef TW_RING_MASK
#undef TW_TIMERS_PER_OBJECT
#undef LOG2_TW_TIMERS_PER_OBJECT
#undef TW_SUFFIX
#undef TW_OVERFLOW_VECTOR
#undef TW_FAST_WHEEL_BITMAP
#undef TW_TIMER_ALLOW_DUPLICATE_STOP
#undef TW_START_STOP_TRACE_SIZE
#define TW_TIMER_WHEELS 1
#define TW_SLOTS_PER_RING 4096
#define TW_RING_SHIFT 12
#define TW_RING_MASK (TW_SLOTS_PER_RING -1)
#define TW_TIMERS_PER_OBJECT 2
#define LOG2_TW_TIMERS_PER_OBJECT 0 /*不使用timerid*/
#define TW_SUFFIX _mytest /*后缀名称也需要变更,防止调用到其他的库的函数*/
#define TW_FAST_WHEEL_BITMAP 0
#define TW_TIMER_ALLOW_DUPLICATE_STOP 0
#define TW_START_STOP_TRACE_SIZE 0
#include <vppinfra/tw_timer_template.h>
typedef TWT (tw_timer_wheel) teest_tw_timer_wheel;
typedef TWT (tw_timer) test_tw_timer;
#endif
下面是定义自己的test_tw_timer.c头文件
#include <vppinfra/error.h>
#include <test_tw_timer.h>
#include <vppinfra/tw_timer_template.c>
test_tw_time_api.h
#include <test_tw_timer.h>
#include <vppinfra/tw_timer_template.h>
always_inline void
test__timer_start (test_tw_timer_wheel * tw, u32 *timers_handle,u32 interval)
{
*timers_handle = TW(tw_timer_start) (tw, session->session_id.index,0, interval);
}
always_inline voidu
test_timer_stop (test_tw_timer_wheel * tw, u32 timers_handle)
{
if (timers_handle == TEST_TIMER_HANDLE_INVALID)
{
return;
}
/*对timers_handle做安全性判断*/
if (PREDICT_FALSE(!(TW (tw_timer_handle_is_free)(tw, timers_handle))))
{
ASSERT(0);
TW(tw_timer_stop) (tw, timers_handle);
}
timers_handle = TEST_TIMER_HANDLE_INVALID;
}
always_inline void
test_timer_update (test_tw_timer_wheel * tw,u32 *timers_handle,u32 interval)
{
/*判断timers_handle如果时非法的就start函数,否则调用更新*/
if (*timers_handle == TEST_TIMER_HANDLE_INVALID)
{
*timers_handle = TW(tw_timer_start) (tw, session->session_id.index, 0, interval);
}
else
{
/*对timers_handle做安全性判断*/
if (PREDICT_FALSE(TW (tw_timer_handle_is_free)(tw, *timers_handle)))
{
ASSERT(0);
return;
}
TW(tw_timer_update) (tw, timers_handle, interval);
}
}
always_inline u32 *
test_timer_expire_timers_vec (test_tw_timer_wheel * tw,clib_time_type_t time_now,u32 *vec)
{
return TW(tw_timer_expire_timers_vec)(tw,time_now,vec);
}
增加tw_timer的内存监控
static clib_error_t *
show_test_tw_timer_command_fn(vlib_main_t * vm, unformat_input_t * input,
vlib_cli_command_t * cmd)
{
u32 i;
flowpp_tw_timer *tw_timer;
flowpp_tw_timer_wheel *tw_timer_wheel = &timer_wheel;
vlib_cli_output (vm, "%-8s%-18s%-18s%-18s%-18s%-18s",
"Id","Next_time","Last_time","Cur_tick","Max_elts","free_elts");
{
tw_timer = tw_timer_wheel->timers;
vlib_cli_output(vm,"%-8d%-18.4e%-18.4e%-18d%-18d%-18d", i,tw_timer_wheel->next_run_time,
tw_timer_wheel->last_run_time,tw_timer_wheel->current_tick,
vec_len(tw_timer),vec_len((pool_header(tw_timer))->free_indices));
}
return 0;
}
VLIB_CLI_COMMAND (show_test_tw_timer_command, static) =
{
.path = "show test tw_timer info",
.short_help = "show test tw_timer info",
.function = show_test_tw_timer_command_fn,
};
1、手动扩充pool资源池,放在在转发过程中动态扩充。
always_inline void
test_timer_start (test_tw_timer_wheel * tw, u32 *timers_handle,u32 interval,u32 max_num)
{
*timers_handle = TW(tw_timer_start) (tw, session->session_id.index,0, interval);
/*对pool资源池进行扩充,转发node进行扩充代码新能损耗*/
pool_alloc(tw->timers,max_num);
}
2、在时间范围允许情况下,还是尽量选择快速时间轮。避免在过期调度中频繁的任务搬移。 3、参考tcp协议栈对老化的处理,实现一个无锁化会话管理。
在使用中遇到一个问题,下面是我的一个使用场景:
u32 *vec = NULL; /*这是一个全局的*/
vec = test_timer_expire_timers_vec(tw,now,vec);
for (i = 0; i < vec_len(vec);i++)
{
/*处理过期时间*/
.....
/*判断不满足过期的条件,需要重新加入到过期链表中*/
test_timer_start(tw,timers_handle,intenal);
}
有没有发现问题?自己思考吧,经常犯的一个低级错误。
参考文章:
1、vpp tw_timer:(https://www.yuque.com/taohuaban/fc6dp0/krbska)