在之前的softirq中提到过,内核在中断的bottom half引入了softirq, tasklet, workqueue。 而softirq和tasklet只能用在中断上下文中,而且不可以睡眠。所以内核引入了workqueue,工作队列运行在进程上下文,同时可以睡眠。在以前版本的内核中workqueue的代码比较简单。在linux2.6.30代码量在1000行左右,而在linux3.18代码量在5000行左右。其中巨大的变化就是引入了Concurrency Managed Workqueue (cmwq)概念。不过本篇先学习以前版本的workqueue,因为它简单。在了解了简单版本的存在问题之后在学习cmwq就有更好的认识。
workqueue的工作原理和工厂流水线流程很相似。当流水线上没有任务的时候,就玩手机,睡觉等,当某某客户想加工爱疯不拉死的时候,客户只需要将一个一个任务放入到流水线上,这时候流水线上的工作就要拼命的加班,工作。当干完了爱疯不拉死之后的订单就休息。
如果将上述的整个流程对应到代码的世界的话。爱疯不拉死就是一个一个任务,俗称work_struct,而流水线就是一个容器,可以称为worklist,工作就是处理任务的单元,俗称worker_thread,将爱疯不拉死放入到流水线的过程可称为提交任务: queue_work。这样一来现实生活就对应到代码世界了,这也就是一切源于生活。
struct work_struct {
atomic_long_t data;
struct list_head entry;
work_func_t func;
};
客户需要提交订单,对于每一个爱疯不拉死需要定义为上述的结构,这就是规矩。
data: 可以使用data来传递某些信息到延迟函数中,也就是func。
entry: 将提交的节点,使用链表链接起来。
func: 工作节点的最终回调延迟函数。
typedef void (*work_func_t)(struct work_struct *work);
/*
* The per-CPU workqueue (if single thread, we always use the first
* possible cpu).
*/
struct cpu_workqueue_struct {
spinlock_t lock;
struct list_head worklist;
wait_queue_head_t more_work;
struct work_struct *current_work;
struct workqueue_struct *wq;
struct task_struct *thread;
} ____cacheline_aligned;
根据注释可知cpu_workqueue_struct是per-cpu变量,也就是说每个cpu都维护一个这样的结构。
lock: 对变量worklist实现互斥保护。
worklist: 用来将提交的work节点形成链表。
more_work: 等待队列头节点,当工作队列的上没有任务需要处理时,工人线程(worker_thread)就需要睡眠。
current_work: 指向当前正在处理的work
wq: 指向workqueue_struct结构体
therad: 指向工人线程(worker_thread)所在的进程空间
struct workqueue_struct {
struct cpu_workqueue_struct *cpu_wq;
struct list_head list;
const char *name;
int singlethread;
int freezeable; /* Freeze threads during suspend */
int rt;
};
此结构体和cpu_workqueue_struct只相差一个前缀cpu,而两者的区别是cpu_workqueue_struct是per-cpu变量,用来管理该cpu上的workqueue, 而workqueue_struct是为创建的每个工作队列生成一个这样的结构,用来管理workqueue。
cpu_wq: 指向cpu_workqueue_struct结构的指针。根据该指针,每个cpu可以通过per_cpu_ptr来获得属于自己cpu的工作队列管理结构cpu_workqueue_struct
list: 将workqueue_stuct结构用链表链接起来,放入到全局变量workqueues中,而且只对非singletherad工作队列有效。
static LIST_HEAD(workqueues);
name: 工作队列的名称。
singletherad: singletherad 或者 multitherad。
freezeable: suspend流程中会使用到。
rt: 用来调整worker_therad线程所在进程的调度策略。
静态初始化一个work_struct
#define DECLARE_WORK(n, f) \
struct work_struct n = __WORK_INITIALIZER(n, f)
#define __WORK_INITIALIZER(n, f) { \
.data = WORK_DATA_INIT(), \
.entry = { &(n).entry, &(n).entry }, \
.func = (f), \
__WORK_INIT_LOCKDEP_MAP(#n, &(n)) \
}
同时也可以初始化一个定时器,在指定的时间之后去处理此work
#define DECLARE_DELAYED_WORK(n, f) \
struct delayed_work n = __DELAYED_WORK_INITIALIZER(n, f)
#define __DELAYED_WORK_INITIALIZER(n, f) { \
.work = __WORK_INITIALIZER((n).work, (f)), \
.timer = TIMER_INITIALIZER(NULL, 0, 0), \
}
既然支持静态创建,也可以动态创建一个work_struct
#define PREPARE_WORK(_work, _func) \
do { \
(_work)->func = (_func); \
} while (0)
#define INIT_WORK(_work, _func) \
do { \
(_work)->data = (atomic_long_t) WORK_DATA_INIT(); \
INIT_LIST_HEAD(&(_work)->entry); \
PREPARE_WORK((_work), (_func)); \
} while (0)
#define create_workqueue(name) __create_workqueue((name), 0, 0, 0)
#define create_rt_workqueue(name) __create_workqueue((name), 0, 0, 1)
#define create_freezeable_workqueue(name) __create_workqueue((name), 1, 1, 0)
#define create_singlethread_workqueue(name) __create_workqueue((name), 1, 0, 0)
内核提供了四个创建不同类型的workqueue,而区别就是各个传递的参数不一样。create_workqueue就是创建普通的workqueue; create_rt_workqueue调度策略相关的; freezeable和singletherad都是创建single therad workqueue, 区别就在于freezebale。
#define __create_workqueue(name, singlethread, freezeable, rt) \
__create_workqueue_key((name), (singlethread), (freezeable), (rt), \
NULL, NULL)
struct workqueue_struct *__create_workqueue_key(const char *name,
int singlethread,
int freezeable,
int rt,
struct lock_class_key *key,
const char *lock_name)
{
struct workqueue_struct *wq;
struct cpu_workqueue_struct *cwq;
int err = 0, cpu;
wq = kzalloc(sizeof(*wq), GFP_KERNEL); //分配一个workqueue_struct结构
if (!wq)
return NULL;
wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct); //分配一个cpu_workqueue_struct结构
if (!wq->cpu_wq) {
kfree(wq);
return NULL;
}
wq->name = name; //根据传递进来的参数赋值
lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
wq->singlethread = singlethread;
wq->freezeable = freezeable;
wq->rt = rt;
INIT_LIST_HEAD(&wq->list);
if (singlethread) {
cwq = init_cpu_workqueue(wq, singlethread_cpu); ------------------------A
err = create_workqueue_thread(cwq, singlethread_cpu); ------------------------B
start_workqueue_thread(cwq, -1); ------------------------C
} else {
cpu_maps_update_begin();
/*
* We must place this wq on list even if the code below fails.
* cpu_down(cpu) can remove cpu from cpu_populated_map before
* destroy_workqueue() takes the lock, in that case we leak
* cwq[cpu]->thread.
*/
spin_lock(&workqueue_lock); //使用锁保护workqueue,然后将workqueue加入到全局链表workqueues中
list_add(&wq->list, &workqueues);
spin_unlock(&workqueue_lock);
/*
* We must initialize cwqs for each possible cpu even if we
* are going to call destroy_workqueue() finally. Otherwise
* cpu_up() can hit the uninitialized cwq once we drop the
* lock.
*/
for_each_possible_cpu(cpu) { -----------------------------D
cwq = init_cpu_workqueue(wq, cpu);
if (err || !cpu_online(cpu))
continue;
err = create_workqueue_thread(cwq, cpu);
start_workqueue_thread(cwq, cpu);
}
cpu_maps_update_done();
}
if (err) {
destroy_workqueue(wq);
wq = NULL;
}
return wq;
}
A: 初始化cpu_wokrqueue_struct结构。
static struct cpu_workqueue_struct *
init_cpu_workqueue(struct workqueue_struct *wq, int cpu)
{
struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
cwq->wq = wq; //初始化cpu_workqueue_struct中workqueue_struct
spin_lock_init(&cwq->lock); //初始化lock
INIT_LIST_HEAD(&cwq->worklist); //初始化worklist
init_waitqueue_head(&cwq->more_work); //初始化等待队列头
return cwq;
}
singlethread_cpu的意思是系统第一个cpu,定义和初始化如下:
static int singlethread_cpu __read_mostly;
cpumask_copy(cpu_populated_map, cpu_online_mask);
singlethread_cpu = cpumask_first(cpu_possible_mask)
B: 创建工人线程,也就是worker_thread
static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
{
struct sched_param param = { .sched_priority = MAX_RT_PRIO-1 };
struct workqueue_struct *wq = cwq->wq;
const char *fmt = is_wq_single_threaded(wq) ? "%s" : "%s/%d";
struct task_struct *p;
p = kthread_create(worker_thread, cwq, fmt, wq->name, cpu); //创建内核线程,返回task_struct
/*
* Nobody can add the work_struct to this cwq,
* if (caller is __create_workqueue)
* nobody should see this wq
* else // caller is CPU_UP_PREPARE
* cpu is not on cpu_online_map
* so we can abort safely.
*/
if (IS_ERR(p))
return PTR_ERR(p);
if (cwq->wq->rt)
sched_setscheduler_nocheck(p, SCHED_FIFO, ¶m);
cwq->thread = p; //将内核线程的进程空间保存在cpu_workqueue_struct的therad中
trace_workqueue_creation(cwq->thread, cpu);
return 0;
}
C: 将新创建的进程投入到系统,等待调度
static void start_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
{
struct task_struct *p = cwq->thread;
if (p != NULL) {
if (cpu >= 0)
kthread_bind(p, cpu); //如果不是singletherad,就必须将进程bind到特定的cpu上运行
wake_up_process(p); //将进程投入到系统中,等待调度
}
}
D: 对每个cpu执行上述singletherad 的操作,这样一来每个cpu就创建自己的cpu_workqueue_struct(CPU工作管理队列)和worker_thread(工人线程)。
在驱动程序中初始化一个work之后,就需要将此work(爱疯不拉死)提交到worklist(流水线)中,让工人去处理。内核提供了queue_work用来提交对应的work到指定的workqueue中。
/**
* queue_work - queue work on a workqueue
* @wq: workqueue to use
* @work: work to queue
*
* Returns 0 if @work was already on a queue, non-zero otherwise.
*
* We queue the work to the CPU on which it was submitted, but if the CPU dies
* it can be processed by another CPU.
*/
int queue_work(struct workqueue_struct *wq, struct work_struct *work)
{
int ret;
ret = queue_work_on(get_cpu(), wq, work);
put_cpu();
return ret;
}
提交一个work到指定的workqueue。当此work已经在队列中返回0,否则分会1。 如果提交work到某个cpu上,而此cpu真好已经死了,此work就会被别的cpu处理。
如果驱动程序创建的队列是非singlethread或者singlethread工作队列,那么两者的区别在于: singlethread模式下只在系统的第一个cpu(singlethread_cpu)上创建工作队列和工人线程,在用queue_work提交work的时候只能提交到第一个(singlethread_cpu)cpu上的worklist中。而在非singletherad模式下,会在每个cpu上都会创建工作队列和工人线程,同时在提交work的时候也会将work提交到指定的cpu的worklist中。
int queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work)
{
int ret = 0;
if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) { -------------------A
BUG_ON(!list_empty(&work->entry)); -------------------B
__queue_work(wq_per_cpu(wq, cpu), work); -------------------C
ret = 1;
}
return ret;
}
A: 检测work_struct中的data的WORK_STRUCT_PENDING是否已经设置为1。如果已经设置为1说明此work已经提交了但是还没有处理,内核禁止再次提交一个还没有处理的work。如果没有设置为1,则设置work的data的WORK_STRUCT_PENDING位为1,返回以前的值。
#define work_data_bits(work) ((unsigned long *)(&(work)->data))
B: 判断work的entry链表是否为空,如果不为空就会发生kernel panic,从而ops。因为进入到这里的话一般work的entry链表是空的,不为空就有异常。
C: 将此work加入到指定的cpu上的worklist中,通过wq_per_cpu函数。
static struct cpu_workqueue_struct *wq_per_cpu(struct workqueue_struct *wq, int cpu)
{
if (unlikely(is_wq_single_threaded(wq)))
cpu = singlethread_cpu;
return per_cpu_ptr(wq->cpu_wq, cpu);
}
如果是singlethread的模式话,返回称为第一个cpu的cwq上(singlethread_cpu),如果不是则需要通过per_cpu_ptr获得对应cpu的cpu_workqueue_struct结构。
下面继续看__queue_work函数的实现,
static void __queue_work(struct cpu_workqueue_struct *cwq,
struct work_struct *work)
{
unsigned long flags;
spin_lock_irqsave(&cwq->lock, flags);
insert_work(cwq, work, &cwq->worklist);
spin_unlock_irqrestore(&cwq->lock, flags);
}
可以看到其内部是通过insert_work函数实现插入操作。
static void insert_work(struct cpu_workqueue_struct *cwq,
struct work_struct *work, struct list_head *head)
{
trace_workqueue_insertion(cwq->thread, work);
set_wq_data(work, cwq); //设置work_struct中data的pending状态
/*
* Ensure that we get the right work->data if we see the
* result of list_add() below, see try_to_grab_pending().
*/
smp_wmb();
list_add_tail(&work->entry, head); //将work加入到worklist的尾部
wake_up(&cwq->more_work); //调用wakeup函数唤醒在等待队列上睡眠的worker_thread,如果没有睡觉,什么都不需要做
}
在创建workqueue的时候,根据不同模式会创建对应的cpu_workqueue_struct和worker_therad, 当驱动程序使用queue_work提交work到系统中的时候,在合适的时机 就会调度worker_thread来处理提交的work,如果工作队列中没有work,那么worker_therad就会睡眠 在more_work的等待队列上。
static int worker_thread(void *__cwq)
{
struct cpu_workqueue_struct *cwq = __cwq;
DEFINE_WAIT(wait);
if (cwq->wq->freezeable) -----------------——A
set_freezable();
for (;;) {
prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);
if (!freezing(current) && -------------------B
!kthread_should_stop() &&
list_empty(&cwq->worklist))
schedule();
finish_wait(&cwq->more_work, &wait);
try_to_freeze();
if (kthread_should_stop())
break;
run_workqueue(cwq);
}
return 0;
}
A: 如果设置了freezeable属性,则需要调用set_freezeable函数清除PF_NOFREEZEflag,这样在suspend流程中就会freezeable该内核线程
static inline void set_freezable(void)
{
current->flags &= ~PF_NOFREEZE;
}
B: 该task已TASK_INTERRUPTIBLE状态进入睡眠,让出处理器的条件有三个。
1. 该内核线程没有被freeze
2. 该内核线程被没有被别的模块要求停止 3. worklist没有work需要处理了 当驱动程序项worklist上提交一个新的节点并且唤醒worker_thread, 则最后就会调用到run_workqueue去处理新提交的工作节点。
static void run_workqueue(struct cpu_workqueue_struct *cwq)
{
spin_lock_irq(&cwq->lock);
while (!list_empty(&cwq->worklist)) { //worklist不为empty的时候对worklist中的每一项work调用回调函数
struct work_struct *work = list_entry(cwq->worklist.next,
struct work_struct, entry);
work_func_t f = work->func;
trace_workqueue_execution(cwq->thread, work);
cwq->current_work = work; //设置当前正在处理的work
list_del_init(cwq->worklist.next); //从worklist中删除
spin_unlock_irq(&cwq->lock);
BUG_ON(get_wq_data(work) != cwq);
work_clear_pending(work); //清楚pending标志位
lock_map_acquire(&cwq->wq->lockdep_map);
lock_map_acquire(&lockdep_map);
f(work); //调用到回调函数
lock_map_release(&lockdep_map);
lock_map_release(&cwq->wq->lockdep_map);
spin_lock_irq(&cwq->lock);
cwq->current_work = NULL; //设置current_work为null
}
spin_unlock_irq(&cwq->lock);
}
当驱动程序不再使用workqueue的时候,就需要调用destory_workqueue函数销毁workqueue。
/**
* destroy_workqueue - safely terminate a workqueue
* @wq: target workqueue
*
* Safely destroy a workqueue. All work currently pending will be done first.
*/
void destroy_workqueue(struct workqueue_struct *wq)
{
const struct cpumask *cpu_map = wq_cpu_map(wq);
int cpu;
cpu_maps_update_begin();
spin_lock(&workqueue_lock);
list_del(&wq->list); //从workqueues中删除驱动程序自己创建的workqueue
spin_unlock(&workqueue_lock);
for_each_cpu(cpu, cpu_map)
cleanup_workqueue_thread(per_cpu_ptr(wq->cpu_wq, cpu)); //对每一个cpu执行此函数来终结worker_thread
cpu_maps_update_done();
free_percpu(wq->cpu_wq); //释放per-cpu变量 cpu_workqueue_struct
kfree(wq); //释放申请的内存workqueue_struct
}
因为destory_queue在调用的时候,worker_thread可能会在处理worklist中的work,这时候就要小心处理。
static void cleanup_workqueue_thread(struct cpu_workqueue_struct *cwq)
{
/*
* Our caller is either destroy_workqueue() or CPU_POST_DEAD,
* cpu_add_remove_lock protects cwq->thread.
*/
if (cwq->thread == NULL) //如果therad已经为空,则直接返回
return;
lock_map_acquire(&cwq->wq->lockdep_map);
lock_map_release(&cwq->wq->lockdep_map);
flush_cpu_workqueue(cwq); //等待所有的work全部处理完毕
/*
* If the caller is CPU_POST_DEAD and cwq->worklist was not empty,
* a concurrent flush_workqueue() can insert a barrier after us.
* However, in that case run_workqueue() won't return and check
* kthread_should_stop() until it flushes all work_struct's.
* When ->worklist becomes empty it is safe to exit because no
* more work_structs can be queued on this cwq: flush_workqueue
* checks list_empty(), and a "normal" queue_work() can't use
* a dead CPU.
*/
trace_workqueue_destruction(cwq->thread);
kthread_stop(cwq->thread); //停止掉工人线程(worker_thread)
cwq->thread = NULL;
}
flush_cpu_workqueue的主要工作流程就是确保worklist上所有的节点都已经处理完毕,通过完成接口completion等待。如果worklist不为空或者当前工人线程正在处理work,这时候内核会插入一个俗称分界线的work,标志为最后一个work,只有等待此work完成就标志着所有的节点已经全部处理完毕。
static int flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
{
int active = 0;
struct wq_barrier barr;
WARN_ON(cwq->thread == current);
spin_lock_irq(&cwq->lock);
if (!list_empty(&cwq->worklist) || cwq->current_work != NULL) { //如果不为空或者当前有work正在处理
insert_wq_barrier(cwq, &barr, &cwq->worklist); //插入一个barrier的work进入到worklist
active = 1;
}
spin_unlock_irq(&cwq->lock);
if (active)
wait_for_completion(&barr.done); //等待调用到barrier的work,之前在这里睡眠
return active;
}
接下来分析insert_wq_barrier
static void insert_wq_barrier(struct cpu_workqueue_struct *cwq,
struct wq_barrier *barr, struct list_head *head)
{
INIT_WORK(&barr->work, wq_barrier_func); //初始化barrier工作队列,已经回调函数
__set_bit(WORK_STRUCT_PENDING, work_data_bits(&barr->work)); //设置pending状态位
init_completion(&barr->done); //初始化完成接口
insert_work(cwq, &barr->work, head); //将barrier插入到该cpu的worklist中
}
当工人线程处理到barirer work的时候,就会调用到barrier的回调函数wq_barrier_func
static void wq_barrier_func(struct work_struct *work)
{
struct wq_barrier *barr = container_of(work, struct wq_barrier, work);
complete(&barr->done);
}
通知完成接口,然后会在flush_cpu_workqueue函数中唤醒。然后就会返回。工人线程就会被销毁了。
flush_cpu_workqueue函数只操作于单个cpu, 如果对于非singlethread的工作队列,则需要调用flush_workqueue函数,因为每个cpu上都有一个工作队列和worker_thread。此函数是对每个cpu都执行flush_cpu_workqueue的操作。
void flush_workqueue(struct workqueue_struct *wq)
{
const struct cpumask *cpu_map = wq_cpu_map(wq);
int cpu;
might_sleep();
lock_map_acquire(&wq->lockdep_map);
lock_map_release(&wq->lockdep_map);
for_each_cpu(cpu, cpu_map)
flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));
linux系统会在初始化阶段init_workqueues创建一个名字为event的工作队列。此工作队列是非singlethread的工作队列。
void __init init_workqueues(void)
{
....
keventd_wq = create_workqueue("events");
BUG_ON(!keventd_wq);
}
驱动程序如果不想自己创建工作队列就可以使用系统提供的,这时候就可以使用schedlue_work来提交对应的work
int schedule_work(struct work_struct *work)
{
return queue_work(keventd_wq, work);
}
可以看到schedule_work只是对queue_work的一个包装,同时指定的第一个参数,就是内核自己创建的工作队列keventd_wq。
使用内核提供的工作队列的好处是驱动程序无需自己创建一个工作队列,但是不好处也很明显,也就是系统其他模块也是用内核提供的工作队列,而且在其中做了耗时的操作,而且还有可能block住,导致我们自己提交的work需要等待比较长的时间才能处理到。同样如果每个驱动程序都创建自己的workqueue,导致系统worker_thread偏多,占用系统资源过多,从而会影响系统性能等问题,从而就引入了新的workqueue,也就是cmwq。