信号量也是一种锁,相对于自旋锁,当资源不可用的时候,它会使进程挂起,进入睡眠。而自旋锁则是让等待者忙等。这意味着在使用自旋锁获得某一信号量的进程会出现对处理器拥有权的丧失,也即时进程切换出处理器。信号量一般用于进程上下文,自旋锁一般用于中断上下文。
信号量的定义如下:
/* Please don't access any members of this structure directly */
struct semaphore {
raw_spinlock_t lock;
unsigned int count;
struct list_head wait_list;
};
lock: 是个自旋锁变量,用于实现对count变量的原子操作。
count: 无符号整型变量,用于表示通过该信号量允许进入临界区执行路径的个数。
wait_list: 用于管理所有在获取该信号量时候进入睡眠的进程,将这些进程加入到wait_list中。
从上述的信号量定义处看见注释: 不要直接的访问信号量的任何成员。
因此内核专门提供了初始化信号量的函数。如下:
static inline void sema_init(struct semaphore *sem, int val)
{
static struct lock_class_key __key;
*sem = (struct semaphore) __SEMAPHORE_INITIALIZER(*sem, val);
lockdep_init_map(&sem->lock.dep_map, "semaphore->lock", &__key, 0);
}
#define __SEMAPHORE_INITIALIZER(name, n) \
{ \
.lock = __RAW_SPIN_LOCK_UNLOCKED((name).lock), \
.count = n, \
.wait_list = LIST_HEAD_INIT((name).wait_list), \
}
当使用sema_init(sem, val)初始化一个信号量的时候,主要是通过__SEMAPHORE_INITIALIZER将lock设置为unlock状态, count设置为val, 同时初始化wait_list链表头。
信号量上的DOWN操作,是用来获取信号量。DOWN操作有一类函数:
static noinline void __down(struct semaphore *sem); // 如果获取不到等待,一直到等待为止
static noinline int __down_interruptible(struct semaphore *sem); // 如果获取不到可以被中断
static noinline int __down_killable(struct semaphore *sem); // 如果获取不到可以被杀死
static noinline int __down_timeout(struct semaphore *sem, long timeout); // 在指定的时间内获取不到,超时退出
根据down函数的注释:
/**
* down - acquire the semaphore
* @sem: the semaphore to be acquired
*
* Acquires the semaphore. If no more tasks are allowed to acquire the
* semaphore, calling this function will put the task to sleep until the
* semaphore is released.
*
* Use of this function is deprecated, please use down_interruptible() or
* down_killable() instead.
*/
void down(struct semaphore *sem)
{
unsigned long flags;
raw_spin_lock_irqsave(&sem->lock, flags);
if (likely(sem->count > 0))
sem->count--;
else
__down(sem);
raw_spin_unlock_irqrestore(&sem->lock, flags);
}
EXPORT_SYMBOL(down);
可以看到down函数已经不再推荐使用,请使用down_interruptible或者down_killable代替使用。接下来分析down_interruptible函数
/**
* down_interruptible - acquire the semaphore unless interrupted
* @sem: the semaphore to be acquired
*
* Attempts to acquire the semaphore. If no more tasks are allowed to
* acquire the semaphore, calling this function will put the task to sleep.
* If the sleep is interrupted by a signal, this function will return -EINTR.
* If the semaphore is successfully acquired, this function returns 0.
*/
int down_interruptible(struct semaphore *sem)
{
unsigned long flags;
int result = 0;
raw_spin_lock_irqsave(&sem->lock, flags);
if (likely(sem->count > 0))
sem->count--;
else
result = __down_interruptible(sem);
raw_spin_unlock_irqrestore(&sem->lock, flags);
return result;
}
EXPORT_SYMBOL(down_interruptible);
根据注释: 如果获取不到信号量,讲导致调用进程睡眠。 如果睡眠被中断信号唤醒,返回-EINTR, 如果获取成功,返回0
raw_spin_lock_irqsave(&sem->lock, flags);
这句操作主要是保证对sem->count的操作是原子操作,同时也许要关闭中断。如果不关闭中断,当中断触发后,也有可能会操作到sem->lock会形成死锁,所以需要关闭中断。
if (likely(sem->count > 0))
sem->count--;
如果sem->count大于0, 说明当前进程可以获取信号量,然后将sem->count减1
result = __down_interruptible(sem);
如果sem->count小于0,说明当前进程无法获取信号量,此时需要调用__down_interruptible函数,完成一个进程无法获取信号量的操作。
static noinline int __sched __down_interruptible(struct semaphore *sem)
{
return __down_common(sem, TASK_INTERRUPTIBLE, MAX_SCHEDULE_TIMEOUT);
}
此函数传递的参数state是TASK_INTERRUPTIBLE, 代表当前进程是可以中断的。
/*
* Because this function is inlined, the 'state' parameter will be
* constant, and thus optimised away by the compiler. Likewise the
* 'timeout' parameter for the cases without timeouts.
*/
static inline int __sched __down_common(struct semaphore *sem, long state,
long timeout)
{
struct task_struct *task = current;
struct semaphore_waiter waiter;
list_add_tail(&waiter.list, &sem->wait_list);
waiter.task = task;
waiter.up = false;
for (;;) {
if (signal_pending_state(state, task))
goto interrupted;
if (unlikely(timeout <= 0))
goto timed_out;
__set_task_state(task, state);
raw_spin_unlock_irq(&sem->lock);
timeout = schedule_timeout(timeout);
raw_spin_lock_irq(&sem->lock);
if (waiter.up)
return 0;
}
timed_out:
list_del(&waiter.list);
return -ETIME;
interrupted:
list_del(&waiter.list);
return -EINTR;
}
list_add_tail(&waiter.list, &sem->wait_list);
waiter.task = task;
waiter.up = false;
将当前进程放入到wait_list链表中,同时waiter.up成员设置为0,代表的意思是: 该信号量没有被执行UP操作。
if (signal_pending_state(state, task))
goto interrupted;
if (unlikely(timeout <= 0))
goto timed_out;
如果该进程被信号唤醒,同时该进程的状态是: TASK_INTERRUPTIBLE | TASK_WAKEKILL, 则返回。
如果超时,也返回相应的返回码。
__set_task_state(task, state);
这是当前进程的状态为可中断状态:TASK_INTERRUPTIBLE。
raw_spin_unlock_irq(&sem->lock);
timeout = schedule_timeout(timeout);
在被调度之前先释放自旋锁, 然后使用schedule_timeout使当前进程进入睡眠状态,直到再次被调度执行。当该进程再一次被调度执行时,需要根据被再次调度的原因进行处理:
如果是witer.up不为0, 说明睡眠的该进程被信号量执行了UP操作所唤醒,进程可以获取信号量,返回0.
如果是被其他信号唤醒,或者超时引起的唤醒,则返回相应的错误码。
相对于众多的DOWN操作,UP操作只有一个:
/**
* up - release the semaphore
* @sem: the semaphore to release
*
* Release the semaphore. Unlike mutexes, up() may be called from any
* context and even by tasks which have never called down().
*/
void up(struct semaphore *sem)
{
unsigned long flags;
raw_spin_lock_irqsave(&sem->lock, flags);
if (likely(list_empty(&sem->wait_list)))
sem->count++;
else
__up(sem);
raw_spin_unlock_irqrestore(&sem->lock, flags);
}
EXPORT_SYMBOL(up);
raw_spin_lock_irqsave(&sem->lock, flags);
此处获取自旋锁依然是保证sem->lock操作的原子性。
if (likely(list_empty(&sem->wait_list)))
sem->count++;
else
__up(sem);
如果sem->wait_list为空,则表明没有进程正在等待信号量,那么只需要将sem->count加1即可。 如果sem->wait_list不为空,说明有进程正在睡眠在wait_list上等待信号量,此时需要调用__up函数来唤醒进程。
static noinline void __sched __up(struct semaphore *sem)
{
struct semaphore_waiter *waiter = list_first_entry(&sem->wait_list,
struct semaphore_waiter, list);
list_del(&waiter->list);
waiter->up = true;
wake_up_process(waiter->task);
}
struct semaphore_waiter *waiter = list_first_entry(&sem->wait_list,struct semaphore_waiter, list);
list_del(&waiter->list);
waiter->up = true;
首先从wait_list链表上的第一个wait节点,然后将其从wait_list上删除,同时wait->up设置为1, 表明该进程是通过UP操作唤醒的,而不是通过其他方式。
wake_up_process(waiter->task);
调用wake_up_process唤醒该进程。
当信号量的count=1的时候,这种情况下就可以实现互斥机制。为此内核专门提供了一个宏,专门用于这种用途的信号量定义和初始化
#define DEFINE_SEMAPHORE(name) \
struct semaphore name = __SEMAPHORE_INITIALIZER(name, 1)