前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >手握源码,深入分析Linux互斥体

手握源码,深入分析Linux互斥体

作者头像
董哥聊技术
发布2023-10-25 17:08:06
3430
发布2023-10-25 17:08:06
举报
文章被收录于专栏:嵌入式艺术嵌入式艺术

【深入理解Linux内核锁】七、互斥体

尽管信号量已经可以实现互斥的功能,但是“正宗”的mutexLinux内核中还是真实地存在着。尤其是在Linux内核代码中,更多能看到mutex的身影。

1、互斥体API

代码语言:javascript
复制
struct mutex my_mutex;  // 定义互斥体
mutex_init(&my_mutex);  // 初始化互斥体

/* 获取互斥体 */
void mutex_lock(struct mutex *lock);
int mutex_lock_interruptible(struct mutex *lock);
int mutex_trylock(struct mutex *lock);

void mutex_unlock(struct mutex *lock); // 释放互斥体

2、API实现

2.1 mutex

代码语言:javascript
复制
/*
 * Simple, straightforward mutexes with strict semantics:
 *
 * - only one task can hold the mutex at a time
 * - only the owner can unlock the mutex
 * - multiple unlocks are not permitted
 * - recursive locking is not permitted
 * - a mutex object must be initialized via the API
 * - a mutex object must not be initialized via memset or copying
 * - task may not exit with mutex held
 * - memory areas where held locks reside must not be freed
 * - held mutexes must not be reinitialized
 * - mutexes may not be used in hardware or software interrupt
 *   contexts such as tasklets and timers
 *
 * These semantics are fully enforced when DEBUG_MUTEXES is
 * enabled. Furthermore, besides enforcing the above rules, the mutex
 * debugging code also implements a number of additional features
 * that make lock debugging easier and faster:
 *
 * - uses symbolic names of mutexes, whenever they are printed in debug output
 * - point-of-acquire tracking, symbolic lookup of function names
 * - list of all locks held in the system, printout of them
 * - owner tracking
 * - detects self-recursing locks and prints out all relevant info
 * - detects multi-task circular deadlocks and prints out all affected
 *   locks and tasks (and only those tasks)
 */
struct mutex {
 atomic_long_t  owner;
 spinlock_t  wait_lock;
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
 struct optimistic_spin_queue osq; /* Spinner MCS lock */
#endif
 struct list_head wait_list;
#ifdef CONFIG_DEBUG_MUTEXES
 void   *magic;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
 struct lockdep_map dep_map;
#endif
};

结构体名称mutex

文件位置include/linux/mutex.h

主要作用:互斥锁结构体,用于定义一个互斥锁

  • atomic_long_t owner:原子变量,表示互斥锁当前的持有者,可以安全地被多个线程同时访问,而不会导致数据破坏。
  • spinlock_t wait_lock:一个自旋锁,用于保护涉及 wait_list 的临界区
  • struct list_head wait_list:一个链表头,用于维护等待互斥锁释放的线程列表。

2.2 mutex_init

代码语言:javascript
复制
/**
 * mutex_init - initialize the mutex
 * @mutex: the mutex to be initialized
 *
 * Initialize the mutex to unlocked state.
 *
 * It is not allowed to initialize an already locked mutex.
 */
#define mutex_init(mutex)      \
do {         \
 static struct lock_class_key __key;    \
         \
 __mutex_init((mutex), #mutex, &__key);    \
} while (0)

void
__mutex_init(struct mutex *lock, const char *name, struct lock_class_key *key)
{
 atomic_long_set(&lock->owner, 0);
 spin_lock_init(&lock->wait_lock);
 INIT_LIST_HEAD(&lock->wait_list);
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
 osq_lock_init(&lock->osq);
#endif

 debug_mutex_init(lock, name, key);
}
EXPORT_SYMBOL(__mutex_init);

结构体名称mutex_init

文件位置include/linux/mutex.h

主要作用:初始化互斥体

相关实现

  1. mutex_init调用__mutex_init接口,实现互斥锁的初始化
  2. atomic_long_set:初始化原子变量,设置值为0
  3. spin_lock_init:初始化自旋锁,设置其为UNLOCK
  4. INIT_LIST_HEAD:初始化链表
  5. debug_mutex_init:初始化调试相关信息

2.3 mutex_lock

代码语言:javascript
复制
/**
 * mutex_lock - acquire the mutex
 * @lock: the mutex to be acquired
 *
 * Lock the mutex exclusively for this task. If the mutex is not
 * available right now, it will sleep until it can get it.
 *
 * The mutex must later on be released by the same task that
 * acquired it. Recursive locking is not allowed. The task
 * may not exit without first unlocking the mutex. Also, kernel
 * memory where the mutex resides must not be freed with
 * the mutex still locked. The mutex must first be initialized
 * (or statically defined) before it can be locked. memset()-ing
 * the mutex to 0 is not allowed.
 *
 * (The CONFIG_DEBUG_MUTEXES .config option turns on debugging
 * checks that will enforce the restrictions and will also do
 * deadlock debugging)
 *
 * This function is similar to (but not equivalent to) down().
 */
void __sched mutex_lock(struct mutex *lock)
{
 might_sleep();

 if (!__mutex_trylock_fast(lock))
  __mutex_lock_slowpath(lock);
}
EXPORT_SYMBOL(mutex_lock);
#endif

# define might_sleep() do { might_resched(); } while (0)

/*
 * Optimistic trylock that only works in the uncontended case. Make sure to
 * follow with a __mutex_trylock() before failing.
 */
static __always_inline bool __mutex_trylock_fast(struct mutex *lock)
{
 unsigned long curr = (unsigned long)current;
 unsigned long zero = 0UL;

 if (atomic_long_try_cmpxchg_acquire(&lock->owner, &zero, curr))
  return true;

 return false;
}


static noinline void __sched
__mutex_lock_slowpath(struct mutex *lock)
{
 __mutex_lock(lock, TASK_UNINTERRUPTIBLE, 0, NULL, _RET_IP_);
}


static int __sched
__mutex_lock(struct mutex *lock, long state, unsigned int subclass,
      struct lockdep_map *nest_lock, unsigned long ip)
{
 return __mutex_lock_common(lock, state, subclass, nest_lock, ip, NULL, false);
}


/*
 * Lock a mutex (possibly interruptible), slowpath:
 */
static __always_inline int __sched
__mutex_lock_common(struct mutex *lock, long state, unsigned int subclass,
      struct lockdep_map *nest_lock, unsigned long ip,
      struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx)
{
 struct mutex_waiter waiter;
 bool first = false;
 struct ww_mutex *ww;
 int ret;

 might_sleep();

 ww = container_of(lock, struct ww_mutex, base);
 if (use_ww_ctx && ww_ctx) {
  if (unlikely(ww_ctx == READ_ONCE(ww->ctx)))
   return -EALREADY;

  /*
   * Reset the wounded flag after a kill. No other process can
   * race and wound us here since they can't have a valid owner
   * pointer if we don't have any locks held.
   */
  if (ww_ctx->acquired == 0)
   ww_ctx->wounded = 0;
 }

 preempt_disable();
 mutex_acquire_nest(&lock->dep_map, subclass, 0, nest_lock, ip);

 if (__mutex_trylock(lock) ||
     mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx, NULL)) {
  /* got the lock, yay! */
  lock_acquired(&lock->dep_map, ip);
  if (use_ww_ctx && ww_ctx)
   ww_mutex_set_context_fastpath(ww, ww_ctx);
  preempt_enable();
  return 0;
 }

 spin_lock(&lock->wait_lock);
 /*
  * After waiting to acquire the wait_lock, try again.
  */
 if (__mutex_trylock(lock)) {
  if (use_ww_ctx && ww_ctx)
   __ww_mutex_check_waiters(lock, ww_ctx);

  goto skip_wait;
 }

 debug_mutex_lock_common(lock, &waiter);

 lock_contended(&lock->dep_map, ip);

 if (!use_ww_ctx) {
  /* add waiting tasks to the end of the waitqueue (FIFO): */
  __mutex_add_waiter(lock, &waiter, &lock->wait_list);


#ifdef CONFIG_DEBUG_MUTEXES
  waiter.ww_ctx = MUTEX_POISON_WW_CTX;
#endif
 } else {
  /*
   * Add in stamp order, waking up waiters that must kill
   * themselves.
   */
  ret = __ww_mutex_add_waiter(&waiter, lock, ww_ctx);
  if (ret)
   goto err_early_kill;

  waiter.ww_ctx = ww_ctx;
 }

 waiter.task = current;

 set_current_state(state);
 for (;;) {
  /*
   * Once we hold wait_lock, we're serialized against
   * mutex_unlock() handing the lock off to us, do a trylock
   * before testing the error conditions to make sure we pick up
   * the handoff.
   */
  if (__mutex_trylock(lock))
   goto acquired;

  /*
   * Check for signals and kill conditions while holding
   * wait_lock. This ensures the lock cancellation is ordered
   * against mutex_unlock() and wake-ups do not go missing.
   */
  if (unlikely(signal_pending_state(state, current))) {
   ret = -EINTR;
   goto err;
  }

  if (use_ww_ctx && ww_ctx) {
   ret = __ww_mutex_check_kill(lock, &waiter, ww_ctx);
   if (ret)
    goto err;
  }

  spin_unlock(&lock->wait_lock);
  schedule_preempt_disabled();

  /*
   * ww_mutex needs to always recheck its position since its waiter
   * list is not FIFO ordered.
   */
  if ((use_ww_ctx && ww_ctx) || !first) {
   first = __mutex_waiter_is_first(lock, &waiter);
   if (first)
    __mutex_set_flag(lock, MUTEX_FLAG_HANDOFF);
  }

  set_current_state(state);
  /*
   * Here we order against unlock; we must either see it change
   * state back to RUNNING and fall through the next schedule(),
   * or we must see its unlock and acquire.
   */
  if (__mutex_trylock(lock) ||
      (first && mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx, &waiter)))
   break;

  spin_lock(&lock->wait_lock);
 }
 spin_lock(&lock->wait_lock);
acquired:
 __set_current_state(TASK_RUNNING);

 if (use_ww_ctx && ww_ctx) {
  /*
   * Wound-Wait; we stole the lock (!first_waiter), check the
   * waiters as anyone might want to wound us.
   */
  if (!ww_ctx->is_wait_die &&
      !__mutex_waiter_is_first(lock, &waiter))
   __ww_mutex_check_waiters(lock, ww_ctx);
 }

 mutex_remove_waiter(lock, &waiter, current);
 if (likely(list_empty(&lock->wait_list)))
  __mutex_clear_flag(lock, MUTEX_FLAGS);

 debug_mutex_free_waiter(&waiter);

skip_wait:
 /* got the lock - cleanup and rejoice! */
 lock_acquired(&lock->dep_map, ip);

 if (use_ww_ctx && ww_ctx)
  ww_mutex_lock_acquired(ww, ww_ctx);

 spin_unlock(&lock->wait_lock);
 preempt_enable();
 return 0;

err:
 __set_current_state(TASK_RUNNING);
 mutex_remove_waiter(lock, &waiter, current);
err_early_kill:
 spin_unlock(&lock->wait_lock);
 debug_mutex_free_waiter(&waiter);
 mutex_release(&lock->dep_map, 1, ip);
 preempt_enable();
 return ret;
}

结构体名称mutex_lock

文件位置kernel/locking/mutex.c

主要作用:用于获取一个互斥锁

调用流程

代码语言:javascript
复制
mutex_lock(kernel/locking/mutex.c)
    |--> might_sleep
    |--> __mutex_trylock_fast
        |--> atomic_long_try_cmpxchg_acquire        //  尝试获取互斥锁
    |--> __mutex_lock_slowpath
        |--> __mutex_lock
            |--> __mutex_lock_common                //  获取锁并等待

相关实现

might_sleep:用于标记可能在非原子上下文(允许睡眠)中执行的操作,以确保不会在不允许睡眠的上下文中调用这段代码。

__mutex_trylock_fast:尝试获取互斥锁(mutex),如果获取成功则范围true,获取失败,返回false

获取失败后,调用__mutex_lock_slowpath接口,最终调用__mutex_lock_common接口,该接口才是重头戏

__mutex_lock_common接口主要有几个作用:死锁避免策略、

下面我们看第一部分:死锁避免策略

代码语言:javascript
复制
 struct ww_mutex *ww;
 int ret;

 might_sleep();

 ww = container_of(lock, struct ww_mutex, base);
 if (use_ww_ctx && ww_ctx) {
  if (unlikely(ww_ctx == READ_ONCE(ww->ctx)))
   return -EALREADY;

  /*
   * Reset the wounded flag after a kill. No other process can
   * race and wound us here since they can't have a valid owner
   * pointer if we don't have any locks held.
   */
  if (ww_ctx->acquired == 0)
   ww_ctx->wounded = 0;
 }
  • 这段代码片段是针对带有死锁避免机制(Wound-Wait,WW)的互斥锁的操作,是一种多线程环境中避免死锁的策略。让我们逐行分析这段代码的作用
  • struct ww_mutex *ww:声明了一个指向 struct ww_mutex 类型的指针 ww,这是一种带有死锁避免机制的互斥锁的数据结构。
  • might_sleep:用于标记可能在非原子上下文(允许睡眠)中执行的操作,以确保不会在不允许睡眠的上下文中调用这段代码。
  • ww = container_of(lock, struct ww_mutex, base):使用 container_of 宏将传递进来的 lock 指针转换为 struct ww_mutex 结构体的指针。这意味着 lock 指向的是 struct ww_mutex 结构体中的 base 成员。这样可以将更高级别的数据结构与较低级别的数据结构关联起来。
  • if (use_ww_ctx && ww_ctx):如果 use_ww_ctx 为真且 ww_ctx 不为空(即使用死锁避免上下文并且上下文存在)。
  • if (unlikely(ww_ctx == READ_ONCE(ww->ctx))):检查是否已经在同一个上下文中尝试获取锁。如果是,则返回错误码 -EALREADY,表示已经在当前上下文中获取了锁。(避免死锁)
  • if (ww_ctx->acquired == 0):检查是否当前上下文已经没有持有任何锁。如果是,则将 ww_ctxwounded 标志重置为0。

下面我们看第二部分:获取锁阶段

代码语言:javascript
复制
 preempt_disable();
 mutex_acquire_nest(&lock->dep_map, subclass, 0, nest_lock, ip);

 if (__mutex_trylock(lock) ||
     mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx, NULL)) {
  /* got the lock, yay! */
  lock_acquired(&lock->dep_map, ip);
  if (use_ww_ctx && ww_ctx)
   ww_mutex_set_context_fastpath(ww, ww_ctx);
  preempt_enable();
  return 0;
 }
  • preempt_disable:禁用抢占,确保在获取锁期间不会被抢占。
  • 通过调用__mutex_trylockmutex_optimistic_spin多次尝试获取锁,如果获取锁成功,说明没有其他进程占用,直接返回,否则接着往下执行
  • ww_mutex_set_context_fastpath(ww, ww_ctx):如果使用了"wait-wound"上下文,这个函数会将上下文设置为快速路径,以便在解锁时进行优化。
  • preempt_enable():重新启用抢占,允许系统在后续操作中进行抢占。

下面我们看第三部分:将等待进程置入睡眠状态

代码语言:javascript
复制
 spin_lock(&lock->wait_lock);
 /*
  * After waiting to acquire the wait_lock, try again.
  */
 if (__mutex_trylock(lock)) {
  if (use_ww_ctx && ww_ctx)
   __ww_mutex_check_waiters(lock, ww_ctx);

  goto skip_wait;
 }

 debug_mutex_lock_common(lock, &waiter);

 lock_contended(&lock->dep_map, ip);

 if (!use_ww_ctx) {
  /* add waiting tasks to the end of the waitqueue (FIFO): */
  __mutex_add_waiter(lock, &waiter, &lock->wait_list);


#ifdef CONFIG_DEBUG_MUTEXES
  waiter.ww_ctx = MUTEX_POISON_WW_CTX;
#endif
 } else {
  /*
   * Add in stamp order, waking up waiters that must kill
   * themselves.
   */
  ret = __ww_mutex_add_waiter(&waiter, lock, ww_ctx);
  if (ret)
   goto err_early_kill;

  waiter.ww_ctx = ww_ctx;
 }

 waiter.task = current;

 set_current_state(state);
  • 如果不使用"wait-wound"上下文,会将等待任务添加到等待队列的末尾(FIFO顺序)。
  • 如果使用"wait-wound"上下文,会按照时间戳的顺序将等待任务添加到等待队列,并可能唤醒需要终止的等待者。
  • spin_lock(&lock->wait_lock):获取wait_lock自旋锁,这是为了在等待期间操作等待队列时防止竞争。
  • if (__mutex_trylock(lock)):再次尝试获取锁,如果成功获取,说明此时没有竞争,会跳到acquired标签。
  • 根据是否使用"wait-wound"上下文进行不同的等待队列操作:
  • set_current_state(state):设置等待任务的上下文和当前状态

下面我们看第四部分:等待状态处理

代码语言:javascript
复制
 for (;;) {
  /*
   * Once we hold wait_lock, we're serialized against
   * mutex_unlock() handing the lock off to us, do a trylock
   * before testing the error conditions to make sure we pick up
   * the handoff.
   */
  if (__mutex_trylock(lock))
   goto acquired;

  /*
   * Check for signals and kill conditions while holding
   * wait_lock. This ensures the lock cancellation is ordered
   * against mutex_unlock() and wake-ups do not go missing.
   */
  if (unlikely(signal_pending_state(state, current))) {
   ret = -EINTR;
   goto err;
  }

  if (use_ww_ctx && ww_ctx) {
   ret = __ww_mutex_check_kill(lock, &waiter, ww_ctx);
   if (ret)
    goto err;
  }

  spin_unlock(&lock->wait_lock);
  schedule_preempt_disabled();

  /*
   * ww_mutex needs to always recheck its position since its waiter
   * list is not FIFO ordered.
   */
  if ((use_ww_ctx && ww_ctx) || !first) {
   first = __mutex_waiter_is_first(lock, &waiter);
   if (first)
    __mutex_set_flag(lock, MUTEX_FLAG_HANDOFF);
  }

  set_current_state(state);
  /*
   * Here we order against unlock; we must either see it change
   * state back to RUNNING and fall through the next schedule(),
   * or we must see its unlock and acquire.
   */
  if (__mutex_trylock(lock) ||
      (first && mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx, &waiter)))
   break;

  spin_lock(&lock->wait_lock);
 }
 spin_lock(&lock->wait_lock);
acquired:
 __set_current_state(TASK_RUNNING);

 if (use_ww_ctx && ww_ctx) {
  /*
   * Wound-Wait; we stole the lock (!first_waiter), check the
   * waiters as anyone might want to wound us.
   */
  if (!ww_ctx->is_wait_die &&
      !__mutex_waiter_is_first(lock, &waiter))
   __ww_mutex_check_waiters(lock, ww_ctx);
 }

 mutex_remove_waiter(lock, &waiter, current);
 if (likely(list_empty(&lock->wait_list)))
  __mutex_clear_flag(lock, MUTEX_FLAGS);

 debug_mutex_free_waiter(&waiter);

skip_wait:
 /* got the lock - cleanup and rejoice! */
 lock_acquired(&lock->dep_map, ip);

 if (use_ww_ctx && ww_ctx)
  ww_mutex_lock_acquired(ww, ww_ctx);

 spin_unlock(&lock->wait_lock);
 preempt_enable();
 return 0;

err:
 __set_current_state(TASK_RUNNING);
 mutex_remove_waiter(lock, &waiter, current);
err_early_kill:
 spin_unlock(&lock->wait_lock);
 debug_mutex_free_waiter(&waiter);
 mutex_release(&lock->dep_map, 1, ip);
 preempt_enable();
 return ret;

这部分代码是在等待期间的处理:

如果获取到锁,跳转到acquired标签。

  • __set_current_state(TASK_RUNNING):设置当前线程的状态为TASK_RUNNING
  • 如果使用"wait-wound"上下文,检查是否需要继续检查等待者。
  • mutex_remove_waiter:从等待队列中移除等待任务。
  • list_empty:判断是否等待队列为空,如果是,则清除锁的标志。
  • 实时调用__mutex_trylock判断是否获取到锁,如果成功获取,跳转到acquired标签。
  • signal_pending_state:检查是否有信号到达,如果有,返回错误码-EINTR
  • 如果使用"wait-wound"上下文,检查是否需要终止等待者。
  • spin_unlock(&lock->wait_lock):解锁wait_lock,禁用抢占并进入调度等待状态。
  • schedule_preempt_disabled:进入调度等待状态
  • set_current_state:设置当前线程的状态为等待状态。
  • __mutex_trylockmutex_optimistic_spin:尝试获取锁,或者如果是第一个等待任务且可以进行自旋优化,则尝试自旋等待。
  • spin_lock(&lock->wait_lock):获取锁,继续自旋等待

这段代码是内核中用于互斥锁获取的一个复杂实现,其中包含了自旋、等待队列、信号处理和死锁避免等多个关键概念和操作,它旨在在高并发的多线程环境中提供高性能的互斥锁实现。

2.4 mutex_unlock

代码语言:javascript
复制
/**
 * mutex_unlock - release the mutex
 * @lock: the mutex to be released
 *
 * Unlock a mutex that has been locked by this task previously.
 *
 * This function must not be used in interrupt context. Unlocking
 * of a not locked mutex is not allowed.
 *
 * This function is similar to (but not equivalent to) up().
 */
void __sched mutex_unlock(struct mutex *lock)
{
#ifndef CONFIG_DEBUG_LOCK_ALLOC
 if (__mutex_unlock_fast(lock))
  return;
#endif
 __mutex_unlock_slowpath(lock, _RET_IP_);
}
EXPORT_SYMBOL(mutex_unlock);

/*
 * Release the lock, slowpath:
 */
static noinline void __sched __mutex_unlock_slowpath(struct mutex *lock, unsigned long ip)
{
 struct task_struct *next = NULL;
 DEFINE_WAKE_Q(wake_q);
 unsigned long owner;

 mutex_release(&lock->dep_map, 1, ip);

 /*
  * Release the lock before (potentially) taking the spinlock such that
  * other contenders can get on with things ASAP.
  *
  * Except when HANDOFF, in that case we must not clear the owner field,
  * but instead set it to the top waiter.
  */
 owner = atomic_long_read(&lock->owner);
 for (;;) {
  unsigned long old;

#ifdef CONFIG_DEBUG_MUTEXES
  DEBUG_LOCKS_WARN_ON(__owner_task(owner) != current);
  DEBUG_LOCKS_WARN_ON(owner & MUTEX_FLAG_PICKUP);
#endif

  if (owner & MUTEX_FLAG_HANDOFF)
   break;

  old = atomic_long_cmpxchg_release(&lock->owner, owner,
        __owner_flags(owner));
  if (old == owner) {
   if (owner & MUTEX_FLAG_WAITERS)
    break;

   return;
  }

  owner = old;
 }

 spin_lock(&lock->wait_lock);
 debug_mutex_unlock(lock);
 if (!list_empty(&lock->wait_list)) {
  /* get the first entry from the wait-list: */
  struct mutex_waiter *waiter =
   list_first_entry(&lock->wait_list,
      struct mutex_waiter, list);

  next = waiter->task;

  debug_mutex_wake_waiter(lock, waiter);
  wake_q_add(&wake_q, next);
 }

 if (owner & MUTEX_FLAG_HANDOFF)
  __mutex_handoff(lock, next);

 spin_unlock(&lock->wait_lock);

 wake_up_q(&wake_q);
}

结构体名称mutex_unlock

文件位置kernel/locking/mutex.c

主要作用:用于释放一个互斥锁,并唤醒等待这个锁的任务

调用流程

代码语言:javascript
复制
mutex_unlock(kernel/locking/mutex.c)
    |--> __mutex_unlock_slowpath
        |--> DEFINE_WAKE_Q          //  定义唤醒队列
        |--> atomic_long_read       //  获取原子变量,即获取当前锁的持有者
        |--> atomic_long_cmpxchg_release    //  判断ock->owner和owner是否相等,如果有,则跳出循环,没有则直接返回
        |--> spin_lock              //  处理等待队列
        |--> list_empty             //  判断等待队列是否为空
        |--> list_first_entry       //  获取等待队列的第一个任务
        |--> wake_q_add             //  加入到唤醒队列    
        |--> spin_unlock

实现流程

mutex_unlock主要有两个步骤,其一是释放互斥锁,其二是唤醒等待队列的任务,核心实现在__mutex_unlock_slowpath

__mutex_unlock_slowpath中主要负责实现这两个工作,第一个释放锁部分如下:

代码语言:javascript
复制
/*
  * Release the lock before (potentially) taking the spinlock such that
  * other contenders can get on with things ASAP.
  *
  * Except when HANDOFF, in that case we must not clear the owner field,
  * but instead set it to the top waiter.
  */
 owner = atomic_long_read(&lock->owner);
 for (;;) {
  unsigned long old;

#ifdef CONFIG_DEBUG_MUTEXES
  DEBUG_LOCKS_WARN_ON(__owner_task(owner) != current);
  DEBUG_LOCKS_WARN_ON(owner & MUTEX_FLAG_PICKUP);
#endif

  if (owner & MUTEX_FLAG_HANDOFF)
   break;

  old = atomic_long_cmpxchg_release(&lock->owner, owner,
        __owner_flags(owner));
  if (old == owner) {
   if (owner & MUTEX_FLAG_WAITERS)
    break;

   return;
  }

  owner = old;
 }
  • 调用atomic_long_read来获取当前锁的拥有者
  • 然后调用atomic_long_cmpxchg_release来比较当前锁的拥有者是否与之前获取的相同,以此来判断是否有其他线程操作了锁
  • old == owner:如果相同,则表示没有其他线程获取锁;如果有,则将owner = old旧的锁的持有者赋值给当前继续判断。
  • 进而判断是否有等待者if (owner & MUTEX_FLAG_WAITERS),如果有,就退出循环,执行下面的唤醒等待者的任务,如果没有就直接return

__mutex_unlock_slowpath中第二个唤醒等待线程部分如下:

代码语言:javascript
复制
 spin_lock(&lock->wait_lock);
 debug_mutex_unlock(lock);
 if (!list_empty(&lock->wait_list)) {
  /* get the first entry from the wait-list: */
  struct mutex_waiter *waiter =
   list_first_entry(&lock->wait_list,
      struct mutex_waiter, list);

  next = waiter->task;

  debug_mutex_wake_waiter(lock, waiter);
  wake_q_add(&wake_q, next);
 }

 if (owner & MUTEX_FLAG_HANDOFF)
  __mutex_handoff(lock, next);

 spin_unlock(&lock->wait_lock);

 wake_up_q(&wake_q);
  • 开始调用spin_lockspin_unlock来避免线程的并发,保护临界资源
  • if (!list_empty(&lock->wait_list)):判断等待队列是否为空
  • list_first_entry:获取等待队列的第一个任务
  • wake_q_add:将该任务加入到唤醒队列中
  • if (owner & MUTEX_FLAG_HANDOFF):判断当前锁是否被标记为HANDOFF。如果是,它会调用__mutex_handoff函数,将锁的所有权交给等待者。
  • wake_up_q:唤醒等待队列

互斥锁的操作确实比较复杂,其底层牵涉到了原子操作,自旋锁,唤醒队列等操作

3、自旋锁与互斥体区别

实现上的区别

  • 自旋锁:当锁不能获取到时,一直在原地自旋等待,直到锁变为可用,自旋锁等待期间,不会让线程处于睡眠状态,而是一直忙等。
  • 互斥锁:当锁不能获取到时,将该线程直接置入睡眠状态,直到互斥体可用被唤醒。

时间开销

  • 自旋锁:在等待锁的过程中,自旋锁所浪费的时间为等待获取锁的时间+执行临界区的时间
  • 互斥锁:其时间开销为等待锁的时间+进程上下文切换的时间+执行临界区的时间

总结:如果临界区很小,锁的持有时间非常短,那么使用自旋锁可能更为高效。反之,如果临界区大,锁的持有时间较长,或者涉及到I/O操作等可能导致线程睡眠的操作,那么使用互斥体可能更为合适。

嵌入式艺术

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2023-10-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 嵌入式艺术 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 【深入理解Linux内核锁】七、互斥体
    • 1、互斥体API
      • 2、API实现
        • 2.1 mutex
        • 2.2 mutex_init
        • 2.3 mutex_lock
        • 2.4 mutex_unlock
      • 3、自旋锁与互斥体区别
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档