前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >内核低分辨率定时器实现

内核低分辨率定时器实现

作者头像
DragonKingZhu
发布2020-03-24 10:58:45
6150
发布2020-03-24 10:58:45
举报

什么是低分辨率定时器

低分辨率定时器可以分为周期性和动态性,这里只讨论周期性。在jiffies小节中知道,linux系统会在每个时钟中断会增加jiffies的值,同时还会去处理到期的定时器。而系统时钟中断的速度取决于HZ的值,如果HZ配置为1000,则每秒会产生1000次时钟中断。如果按照样的话,是不是HZ的值越大越好,其实不然。如果HZ的值越大,则会造成系统的负载也会越大。所以HZ的值一般在每个平台是不一样的。假设HZ=250,那么系统会在每4ms会产生一个时钟中断,然后会去处理超时的定时器。但是4ms对有些设备是可以满足的,对一些要求延迟到us的设备是不满足的,所以linux设计者就推出了高精度定时器Hrtimer,所以把之前依赖HZ的值的定时器称为低分辨率定时器。

低分辨率定时器示例

代码语言:javascript
复制
#include <linux/init.h>                                                                         
#include <linux/module.h>
#include <linux/time.h>


static struct timer_list test_time;
static int count=0;

static void test_time_function(unsigned long data)
{
    printk(KERN_EMERG "timer: test_time_function: count=%d, data=%lu\n", count,data);
    printk(KERN_EMERG "timer: jiffies = %lu\n", jiffies);
    count++;

    test_time.expires = jiffies + HZ;           ----------------------------------E
    add_timer(&test_time);                      ----------------------------------F
}

static int time_test_open(void)
{
    printk(KERN_EMERG "timer: time_test_open\n");
    init_timer(&test_time);                     -------------------------------------A
    test_time.expires = jiffies + HZ;           -------------------------------------B
    test_time.function = &test_time_function;   -------------------------------------C
    test_time.data = 123;                       -------------------------------------D    
    add_timer(&test_time);                      -------------------------------------H

    return 0;
}

static void time_test_close(void)
{
    printk(KERN_EMERG "timer: time_test_close\n");
    del_timer(&test_time);                    -------------------------------------G
}

module_init(time_test_open);
module_exit(time_test_close);
MODULE_LICENSE("GPL");

如下是对应的运行结果:

代码语言:javascript
复制
timer: time_test_open
timer: test_time_function: count=0, data=123
timer: jiffies = 4294950523
timer: test_time_function: count=1, data=123
timer: jiffies = 4294950775
timer: test_time_function: count=2, data=123
timer: jiffies = 4294951026
timer: test_time_function: count=3, data=123
timer: jiffies = 4294951277
timer: test_time_function: count=4, data=123
timer: jiffies = 4294951528
timer: test_time_function: count=5, data=123
timer: jiffies = 4294951779
timer: test_time_function: count=6, data=123
timer: jiffies = 4294952030
timer: test_time_function: count=7, data=123
timer: jiffies = 4294952282
timer: test_time_function: count=8, data=123
timer: jiffies = 4294952534
timer: time_test_close

在分析简单的定时器之前,有必要看一下struct timer_list结构体。

代码语言:javascript
复制
struct timer_list {
	/*
	 * All fields that change during normal runtime grouped to the
	 * same cacheline
	 */
	struct list_head entry;
	unsigned long expires;
	struct tvec_base *base;

	void (*function)(unsigned long);
	unsigned long data;

	int slack;
};

为了更好的分析,去掉其中的一些调试选项之后如上。

entry: 双链表结构,将系统中的定时器彼此连接起来。

expires: 定时器的到期时间,单位是jiffies。

function: 定时器到期后的回调处理函数。

data: 是传递给回调处理函数的参数。

base: 是一个tvec_base类型的指针,系统中每个cpu都对应一个tvec_base结构。而base指向定时器所属cpu的tvec_base结构。

slack: 对于有些不要求很精确的定时器,可以使用该字段计算出新的到期时间。

还需要分析下tvec_base结构体构成。

代码语言:javascript
复制
struct tvec_base {
	spinlock_t lock;
	struct timer_list *running_timer;
	unsigned long timer_jiffies;
	unsigned long next_timer;
	unsigned long active_timers;
	unsigned long all_timers;
	int cpu;
	struct tvec_root tv1;
	struct tvec tv2;
	struct tvec tv3;
	struct tvec tv4;
	struct tvec tv5;
} ____cacheline_aligned;

running_timer: 该字段指向当前cpu正在执行的定时器对应的timer_list结构。

timer_jiffies: 它的单位是jiffies, 代表的是一个时间点,此时间点前面的该定时器已经执行完毕。通常timer_jiffies的值和jiffies的值是相同的,但是在系统负载过高的时候,jiffies的值就会和timer_jiffies的值差开。

next_timer: 该字段代表下一个到期的定时器。

active_timer: 该字段代表已经激活的定时器。

all_timer: 该字段代表所有的定时器。

tv1-tv5: 该字段用于对加入到系统中的定时器分组,每个成员都是一个链表数组。其中tv1的大小为TVR_SIZE, tv2-tv5的大小为TVN_SIZE。该大小是根据CONFIG_BASE_SMALL是否配置来决定的。

代码语言:javascript
复制
#define TVN_BITS (CONFIG_BASE_SMALL ? 4 : 6)
#define TVR_BITS (CONFIG_BASE_SMALL ? 6 : 8)
#define TVN_SIZE (1 << TVN_BITS)
#define TVR_SIZE (1 << TVR_BITS)
#define TVN_MASK (TVN_SIZE - 1)
#define TVR_MASK (TVR_SIZE - 1)
#define MAX_TVAL ((unsigned long)((1ULL << (TVR_BITS + 4*TVN_BITS)) - 1))

struct tvec {
	struct list_head vec[TVN_SIZE];
};

struct tvec_root {
	struct list_head vec[TVR_SIZE];
};

在默认的情况下CONFIG_BASE_SMALL没有使能,也就是CONFIG_BASE_SMALL=0,这时候TVN_SIZE=64, TVR_SIZE=256。而如果CONFIG_BASE_SMALL使能的话,CONFIG_BASE_SMALL=1,则TVN_SIZE=64, TVR_SIZE=16。至于如何分组,会到后面详细说明。 同时,系统为每个cpu定义一个tvec_base的结构体指针。

代码语言:javascript
复制
struct tvec_base boot_tvec_bases;
static DEFINE_PER_CPU(struct tvec_base *, tvec_bases) = &boot_tvec_bases;

接下来分析上述的示例。

A: 调用init_timer初始化一个定时器。最终调用到do_init_timer

代码语言:javascript
复制
static void do_init_timer(struct timer_list *timer, unsigned int flags,
			  const char *name, struct lock_class_key *key)
{
	struct tvec_base *base = raw_cpu_read(tvec_bases);

	timer->entry.next = NULL; 
	timer->base = (void *)((unsigned long)base | flags);
	timer->slack = -1;
	lockdep_init_map(&timer->lockdep_map, name, key, 0);
}

初始化timer_list结构体中的各个成员。

B: 设置超时时间,超时时间设置为1s

C: 设置超时回调函数,当超时时间1s到后,会调用test_time_function函数。

D: 设置回调函数的参数,设置参数为123

H: 使用函数add_timer将定时器加入到系统中。关于实现在后面分析。

E: 在超时函数中在设置超时时间为1s

F: 在次使用add_timer将定时器加入到系统中,这样可以实现循环定时。

G: 从系统中删除定时器。

从测试结果中可以看出,jiffies的差值大约是250左右,因为我的机器配置的HZ数就是250,正好是1s。

低分辨率定时器实现原理

低分辨率定时器使用的timer wheel机制来管理系统中定时器,在timer wheel的机制下,系统中的定时器不是使用单一的链表进行管理,因为单一的链表管理起来比较查找,插入,排序效率地下,为了达到高效访问,并且消耗极小的cpu资源,linux系统采用了五个链表数组来进行管理。五个数组之间就像手表一样存在分秒进位的操作。在tv1中存放这timer_jiffies到timer_jiffies+256,也就是说tv1存放定时器范围为0-255。如果在每一个tick上可能有多个相同的定时器都要处理,这时候就使用链表将相同的定时器串在一起超时的时候处理。tv2有64个单元,每个单元有256个tick,因此tv2的超时范围为256-256*64-1(2^14 -1), 就这样一次类推到tv3, tv4, tv5上。每个

数组

idx范围

tv1

0--2^8-1

tv2

2^8--2^14-1

tv3

2^14--2^20-1

tv4

2^20--2^26-1

tv5

2^26--2^32-1

timer wheel的结构图如下:

确定后五个链表之后,就需要将通过add_timer添加到系统的定时器加入到系统当中,加入到tv1-tv5的核心代码如下:

代码语言:javascript
复制
static void __internal_add_timer(struct tvec_base *base, struct timer_list *timer)
{
	unsigned long expires = timer->expires;
	unsigned long idx = expires - base->timer_jiffies;    ----------------------A
	struct list_head *vec;

	if (idx < TVR_SIZE) {                                 ---------------------B
		int i = expires & TVR_MASK;
		vec = base->tv1.vec + i;
	} else if (idx < 1 << (TVR_BITS + TVN_BITS)) {         ---------------------C
		int i = (expires >> TVR_BITS) & TVN_MASK;
		vec = base->tv2.vec + i;
	} else if (idx < 1 << (TVR_BITS + 2 * TVN_BITS)) {
		int i = (expires >> (TVR_BITS + TVN_BITS)) & TVN_MASK;
		vec = base->tv3.vec + i;
	} else if (idx < 1 << (TVR_BITS + 3 * TVN_BITS)) {
		int i = (expires >> (TVR_BITS + 2 * TVN_BITS)) & TVN_MASK;
		vec = base->tv4.vec + i;
	} else if ((signed long) idx < 0) {
		/*
		 * Can happen if you add a timer with expires == jiffies,
		 * or you set a timer to go off in the past
		 */
		vec = base->tv1.vec + (base->timer_jiffies & TVR_MASK);
	} else {
		int i;
		/* If the timeout is larger than MAX_TVAL (on 64-bit
		 * architectures or with CONFIG_BASE_SMALL=1) then we
		 * use the maximum timeout.
		 */
		if (idx > MAX_TVAL) {
			idx = MAX_TVAL;
			expires = idx + base->timer_jiffies;
		}
		i = (expires >> (TVR_BITS + 3 * TVN_BITS)) & TVN_MASK;
		vec = base->tv5.vec + i;
	}
	/*
	 * Timers are FIFO:
	 */
	list_add_tail(&timer->entry, vec);
}

A: 确认超时的时间差,通过超期时间减去time_jiffies的值

B: 如果index小于256,说明超时时间小于256个jiffies,需要放入到tv1中。例如expires=10005 time_jiffies=10000, 则index=5, i=5, 则将此定时器放入到tv1的第5位置中。

C: 如果index小于1<<14, 则将此定时器放入到tv2中。

剩余的依次类推,这样一下,将所有的定时器都放入了指定的位置。就等到超时的时候去处理。

定时器的到时处理

Linux内核中时钟中断的softirq为TIMER_SOFTIRQ, 对应的中断处理函数为run_timer_softirq,该过程发生在init_timer函数中,在该函数中系统会注册对应TIMER_SOFTIRQ对于的中断处理函数。

代码语言:javascript
复制
void __init init_timers(void)
{
	int err;

	/* ensure there are enough low bits for flags in timer->base pointer */
	BUILD_BUG_ON(__alignof__(struct tvec_base) & TIMER_FLAG_MASK);

	err = timer_cpu_notify(&timers_nb, (unsigned long)CPU_UP_PREPARE,
			       (void *)(long)smp_processor_id());
	BUG_ON(err != NOTIFY_OK);

	init_timer_stats();
	register_cpu_notifier(&timers_nb);
	open_softirq(TIMER_SOFTIRQ, run_timer_softirq);
}

当每次时钟中断发生后,系统就会调用到run_local_timers, 在run_local_timers函数中就会调用TIMER_SOFTIRQ对于的中断处理函数。

代码语言:javascript
复制
void run_local_timers(void)
{
	hrtimer_run_queues();
	raise_softirq(TIMER_SOFTIRQ);
}

对于的中断处理函数为:

代码语言:javascript
复制
static void run_timer_softirq(struct softirq_action *h)
{
	struct tvec_base *base = __this_cpu_read(tvec_bases);

	hrtimer_run_pending();

	if (time_after_eq(jiffies, base->timer_jiffies))
		__run_timers(base);
}

在每次中断处理函数中,系统会处理timer_jiffies到期的定时器,调用__run_timer

代码语言:javascript
复制
static inline void __run_timers(struct tvec_base *base)
{
	struct timer_list *timer;

	spin_lock_irq(&base->lock);
	if (catchup_timer_jiffies(base)) { -----------------------------------A
		spin_unlock_irq(&base->lock);
		return;
	}
	while (time_after_eq(jiffies, base->timer_jiffies)) { ----------------B
		struct list_head work_list;
		struct list_head *head = &work_list;
		int index = base->timer_jiffies & TVR_MASK; ------------------C

		/*
		 * Cascade timers:
		 */
		if (!index &&                               -------------------D
			(!cascade(base, &base->tv2, INDEX(0))) &&
				(!cascade(base, &base->tv3, INDEX(1))) &&
					!cascade(base, &base->tv4, INDEX(2)))
			cascade(base, &base->tv5, INDEX(3));
		++base->timer_jiffies;
		list_replace_init(base->tv1.vec + index, head);        ------------------E
		while (!list_empty(head)) {
			void (*fn)(unsigned long);
			unsigned long data;
			bool irqsafe;

			timer = list_first_entry(head, struct timer_list,entry);
			fn = timer->function;
			data = timer->data;                                        -----------------------------F
			irqsafe = tbase_get_irqsafe(timer->base);

			timer_stats_account_timer(timer);

			base->running_timer = timer;
			detach_expired_timer(timer, base);                           ----------------------------------G

			if (irqsafe) {
				spin_unlock(&base->lock);
				call_timer_fn(timer, fn, data);                         ----------------------————H
				spin_lock(&base->lock);
			} else {
				spin_unlock_irq(&base->lock);
				call_timer_fn(timer, fn, data);
				spin_lock_irq(&base->lock);
			}
		}
	}
	base->running_timer = NULL;
	spin_unlock_irq(&base->lock);
}

A: 判断all_timer是否还有值,如果为0,说明没有要处理的定时器,直接返回退出。

B: 处理timer_jiffies到期的定时器,这时候jiffies已经在timer_jiffies之后

C: 计算出索引值,通过timer_jiffies的低8位做&运算,就可以得到index

D: 假设这时候是一个超时为10s的定时器, 也就是index=10, 不为空,跳过if语句判断

E: 将timer_jiffies的值加1, 然后取出到期的定时器放入到head中。如果有多个定时器都是10s超时,都会放入到head中。

F: 每次取出链表中的第一个元素,然后设置回调函数和参数

G: 设置当前定时器,以及将当前定时器从定时器链表中删除

H: 调用定时器的回调函数。

至此10s超时的定时器就会调用到处理函数中去。

当处理tv1的链表从0处理到255之后,index就会变为0,因为这时候tv1中的定时器已经处理完毕,需要处理tv2中的。

这时候计算tv2中的索引值可以使用kernel中提供的宏定义,如下:

代码语言:javascript
复制
#define INDEX(N) ((base->timer_jiffies >> (TVR_BITS + (N) * TVN_BITS)) & TVN_MASK)

比如这时候系统中有10个定时器,超时时间分别是256 - 266, 这10个定时器都会存放到tv2中的index1的链表中。因为tv2有64组,每组有256项,刚好这10项是存放在第一项中。

这时候就需要将tv2中的第一组数据搬移到tv1中的各项中,如果tv2的第一组刚好是256项,则正好可以全部放入到tv1中。

搬移操作代码核心代码如下:

代码语言:javascript
复制
static int cascade(struct tvec_base *base, struct tvec *tv, int index)
{
	/* cascade all the timers from tv up one level */
	struct timer_list *timer, *tmp;
	struct list_head tv_list;

	list_replace_init(tv->vec + index, &tv_list);    ---------------------------A

	/*
	 * We are removing _all_ timers from the list, so we
	 * don't have to detach them individually.
	 */
	list_for_each_entry_safe(timer, tmp, &tv_list, entry) {
		BUG_ON(tbase_get_base(timer->base) != base);
		/* No accounting, while moving them */
		__internal_add_timer(base, timer); --------------------------------B
	}

	return index;
}

A: 将tv2中的马上超时定时器放入到tv_list中

B: 然后对tv_list的每一项又通过__internal_add_timer添加到tv1中。

这样一来再经过256个tick, 就会将tv1处理完毕。当tv2处理完毕之后,就会将tv3的处理转移到tv2中。依次类推就可以一直处理下去。

这样级联的timer list的实现机制可以大大降低每次检查超时timer的时间,而且每次中断只需要对tv1进行检测,只有必要的时才进行cascade。

内核的低分辨率定时器的实现非常精妙,既实现了大量定时器的管理,又实现了快速的O(1)查找到期定时器的能力,利用巧妙的数组结构,使得只需在间隔256个tick时间才处理一次迁移操作,5个数组就好比是5个齿轮,它们随着base->timer_jifffies的增长而不停地转动,每次只需处理第一个齿轮的某一个齿节,低一级的齿轮转动一圈,高一级的齿轮转动一个齿,同时自动把即将到期的定时器迁移到上一个齿轮中,所以低分辨率定时器通常又被叫做时间轮:time wheel。

但是即使这样timer wheel仍然存在很大的弊端,在极端的条件下,同时会有多个进行cascade处理,这样就产生了很大的延迟。同时timer wheel是基于HZ的基础上的,因此精度也无法提升,如果一味的提升HZ的值,就会导致系统产生过多的中断,增加系统的负载。这就导致了后面高精度定时器的hrtimer的出现。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 什么是低分辨率定时器
  • 低分辨率定时器示例
  • 低分辨率定时器实现原理
  • 定时器的到时处理
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档