前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >互斥锁Mutex实现

互斥锁Mutex实现

作者头像
数据小冰
发布2022-08-15 14:53:41
1.5K0
发布2022-08-15 14:53:41
举报
文章被收录于专栏:数据小冰
mutex是什么

Mutex即我们常说的互斥锁,也称为排他锁。使用互斥锁,可以限定临界区只能同时有一个goroutine持有。当临界区已经有一个goroutine持有的时候,其他goroutine如果想进入此临界区,会等待或者返回失败。直到持有的goroutine退出临界区,等待goroutine中的某一个才有机会进入临界区执行代码。

对于如下代码,同时开启两个goroutine执行c.add操作,假设运行运行的机器是多核的,那这两个goroutine很可能是并行执行的,但是在执行c.count++时是串行的,只有获取到锁的goroutine才会执行,另一个会等待。

代码语言:javascript
复制
package main

import (
 "fmt"
 "sync"
)

type counter struct {
 count int
 sync.Mutex
}

func (c *counter) add() {
 c.Lock()
 defer c.Unlock()

 c.count++
}

func (c *counter) value() int {
 c.Lock()
 defer c.Unlock()

 return c.count
}

func main() {
 var wg sync.WaitGroup
 var c counter

 wg.Add(2)

 // goroutine 1
 go func() {
  defer wg.Done()

  for i := 0; i < 5000; i++ {
   c.add()
  }
 }()

 // goroutine 2
 go func() {
  defer wg.Done()

  for i := 0; i < 5000; i++ {
   c.add()
  }
 }()

 wg.Wait()

 fmt.Println(c.value())
}
mutex数据结构

Mutex结构定义如下,它由state和sema两个字段组成,state表示当前互斥锁的状态,sema是用来控制锁状态的信号量。

代码语言:javascript
复制
type Mutex struct {
 state int32
 sema  uint32
}

const (
 // state的第一个bit位,表示是否加锁
 mutexLocked = 1 << iota // mutex is locked
 // state的第二个bit位,表示是否被唤醒
 mutexWoken
 // state的第三个bit位,表示是否处于饥饿模式
 mutexStarving
 // state的[4,32]bit位,表示等待锁的goroutine的数量
 mutexWaiterShift = iota
 // 饥饿时间1毫秒
 starvationThresholdNs = 1e6
)

state是一个int32整数,它的最低3个bit分别表示是否已经加锁、是否唤醒、是否处于饥饿状态,剩余bit位的数值表示有多少个goroutine在等待,如下图所示。

mutex实现原理

为了保证锁的公平性,mutex有两种模式:正常模式和饥饿模式。正常模式下所有等待锁的goroutine按照队列的先进先出顺序等待。被唤醒的goroutine并不是直接拥有锁,而是与新请求锁的goroutine竞争。为啥要让新请求锁的goroutine也来竞争锁,而不是直接放到队列尾部呢?因为这看起来很公平,但从性能角度上来看,并不最优的。如果我们把锁交给正在占用CPU时间片的goroutine,这样就不需要做上下文切换,在高并发的情况下可能有更好的性能。

将唤醒的goroutine和新来请求锁的goroutine竞争锁,很可能导致唤醒的goroutine在竞争锁的时候失败,即新来的goroutine抢到了锁。唤醒的goroutine又被重新加入到队列的队头。那如果极端情况下会出现唤醒的goroutine一致抢不到锁,所以为了处理这种情况,设置了饥饿模式,如果唤醒goroutine等待超过1毫秒没有获取到锁,将会进入饥饿模式。

饥饿模式将锁的所有权直接从释放锁的goroutine移交给等待队列中的第一个goroutine。新来请求锁的goroutine不会尝试获取锁,即使锁看起来处于解锁状态,也不会进行自旋,而是直接放到队尾。

什么时候又从饥饿模式切换到正常模式呢?如果一个等待中的goroutine获得了锁并且满足下面两个条件之一,会从饥饿模式切换到正常模式。

  1. 当前的goroutine是队列中最后一个goroutine
  2. 当前的goroutine等待时间小于1ms

下面结合源码(Go1.14版本)看Mutex的实现细节。先来看加锁处理逻辑,实现如下。处理两个步骤:

  1. 如果是空锁状态,即当前还没有任何goroutine进行加锁,则直接cas将state设置为加锁状态,
  2. 否则走lockSlow逻辑。

步骤1也就是happy path,处理简单的情景,复杂的逻辑封装到一个单独的含失踪,对应到这里的步骤2. 这样步骤1可以内联,Go源码中很多地方都用到了这种处理方式,值得我们学习使用。

代码语言:javascript
复制
func (m *Mutex) Lock() {
 // happy path, 还没有人获取锁,即只有当前程序在获取锁,可以直接获取到
 // 通过CAS操作设置m.state值,从0修改为1
 if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
  if race.Enabled {
   race.Acquire(unsafe.Pointer(m))
  }
  return
 }
 m.lockSlow()
}

lockSlow处理逻辑很复杂,有太多的状态需要判断。整个核心处理就是一个for{}大循环,内部有很多处理细节。这里我们for循环中的逻辑分为3个逻辑块,方便理解,这三个逻辑块分别是自旋处理、计算目标状态值和设置目标状态。另外我们关注循环出口地方只有两处,其他地方都会继续执行循环。

自旋处理

自旋就是忙等待空转,让线程在某段时间内一直保持执行,从而避免线程上下文切换带来的开销。所以对于线程阻塞很短时间的场景是非常合适的。循环开始首先检查是否可以进行自旋的条件。条件1:锁已经被锁定 条件2:锁不处于饥饿模式,即处于正常模式 条件3:满足自旋条件runtime_canSpin

只有上述条件都满足才会开始自旋,自旋处理在runtime_doSpin

代码语言:javascript
复制
func (m *Mutex) lockSlow() {
 // 记录goroutine等待的时间
 var waitStartTime int64
 // 标记当前的goroutine是否处于饥饿状态
 starving := false
 // goroutine是否被唤醒
 awoke := false
 // 自旋的次数
 iter := 0
 old := m.state
 for {
  // 满足这里的3个条件进行自旋:
  // 1. 互斥锁已经被锁定,即有goroutine正在占用锁
  // 2. 互斥锁当前不处于饥饿模式
  // 3. 满足尝试自旋的条件runtime_canSpin
  if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
   // 走到这里,说明当前的互斥锁处于正常模式,如果当前互斥锁还没有被唤醒,则标记为唤醒状态
   // 唤醒的goroutine就是当前的goroutine. 通过CAS操作,将互斥锁更新为唤醒状态
   if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
    atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
    awoke = true
   }
   // 进行自旋
   runtime_doSpin()
   // 自旋次数+1
   iter++
   // 再次获取锁的状态
   old = m.state
   continue
  }
   ...
 }

 if race.Enabled {
  race.Acquire(unsafe.Pointer(m))
 }
}

我们来看runtime_canSpin可以进行自旋的条件,该函数实现在runtime包中的proc.go文件,实现函数为下面的sync_runtime_canSpin函数。根据实现总结出以下情况会终止自旋:

  1. 已经自旋执行了多次,具体执行自旋超过4次会停止
  2. 单核CPU也不会自旋,在单核CPU下因为没有其他goroutine运行,持有锁的goroutine没有运行,当前抢锁的goroutine自旋也是不抢到的
  3. 没有其他正在运行的P
  4. 当前P的G队列为空 上述情况1避免长时间自旋浪费CPU的情况,情况2和3用来保证除了当前在运行的Goroutine之外,还有其他Goroutine在运行。情况4避免自旋锁等待的条件是由当前P的其他G来触发,这样会导致自旋变得没有意义,因为条件永远无法触发。
代码语言:javascript
复制
func sync_runtime_canSpin(i int) bool {
  // sync_runtime_canSpin函数中在以下四种情况返回false

  // 1. 已经执行了很多次
  // 2. 是单核CPU
  // 3. 没有其他正在运行的P
  // 4. 当前P的G队列为空
 // active_spin为4
 if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
  return false
 }
 if p := getg().m.p.ptr(); !runqempty(p) {
  return false
 }
 return true
}

接着来看执行自旋的逻辑runtime_doSpin,runtime_doSpin实现也在runtime/proc.go文件,它调用了procyield函数,procyield是汇编实现的,处理逻辑是调用PAUSE指令30次,通过该指令消耗CPU时间。

代码语言:javascript
复制
func sync_runtime_doSpin() {
 // active_spin_cnt为30
 procyield(active_spin_cnt)
}

TEXT runtime·procyield(SB),NOSPLIT,$0-0
    MOVL    cycles+0(FP), AX
again:
    PAUSE
    SUBL    $1, AX
    JNZ again
    RET
计算目标状态值

互斥锁经过特殊的自旋逻辑处理后,会根据上下文计算当前互斥量的最新状态new。根据不同的条件更新状态字段mutexlocked、mutexstarting、mutexwoken和mutexwaitershift中存储的不同信息。

代码语言:javascript
复制
func (m *Mutex) lockSlow() {
 ...
 for {
   ...
  // 走到这里有四种情况:
  // 1. 当前互斥锁处于正常模式,并且锁还没有被释放
  // 2. 当前互斥锁处于饥饿模式,并且锁还没有被释放
  // 3. 当前互斥锁处于正常模式,并且锁已经被释放
  // 4. 当前互斥锁处于饥饿模式,并且锁已经被释放
  new := old
 
  // 之前处于非饥饿状态,新状态进行加锁
  if old&mutexStarving == 0 {
   new |= mutexLocked
  }
  // 之前处于加锁状态或是饥饿状态,当前的goroutine需要等待
  // 之前处于加锁状态,则当前的goroutine还是不会获取到锁的,所以waiter+1
  // 之前是饥饿状态,则当前的goroutine不会设置新状态锁,因为锁被转移给队列中的第一个goroutine.
  if old&(mutexLocked|mutexStarving) != 0 {
   new += 1 << mutexWaiterShift
  }
  
  // 如果当前的goroutine正处于饥饿模式并且旧状态中锁还是处于锁定状态
  // 则将新状态标记为饥饿模式
  if starving && old&mutexLocked != 0 {
   new |= mutexStarving
  }
  // 如果当前的goroutine已经设置为唤醒状态,需要清除新状态中的唤醒标记,因为走到这里,
  // 当前的goroutine要么拿到锁,要么会进入休眠状态,反正不在是唤醒状态
  if awoke {
   if new&mutexWoken == 0 {
    throw("sync: inconsistent mutex state")
   }
   // 当前的goroutine已经唤醒了,重置清理掉new中的唤醒标志
   new &^= mutexWoken
  }
   ...
 }

 if race.Enabled {
  race.Acquire(unsafe.Pointer(m))
 }
}

设置目标状态

计算最新的目标值new后,通过CAS更新state的值。如果更新成功,则旧状态未加锁,且锁不处于饥饿状态,说明当前goroutine竞争成功,获得锁返回。这就是为什么goroutine在正常模式下竞争时更有可能获得锁的原因。如果当前goroutine竞争失败,则使用runtime_SemacquireMutex来保证资源不被两个goroutines获取。runtime_SemacquireMutex 实现在runtime包中的sema.go文件。它是semacquire1的简单封装,里面最后会调用goPark让当前goroutine让出执行权限,同时设置当前goroutine为睡眠状态,即不参与调度。休眠之后某个时刻锁被释放此goroutine被唤醒,计算它是否处处于饥饿状态,如果锁已经处于饥饿状态,抢到锁返回。

代码语言:javascript
复制
func (m *Mutex) lockSlow() {
    ...
 for {
  ...
  // 成功设置为新状态
  if atomic.CompareAndSwapInt32(&m.state, old, new) {
   // 原来锁的状态已经释放并且不是处于饥饿状态,当前的goroutine请求到了锁的所有权直接返回
   if old&(mutexLocked|mutexStarving) == 0 {
    break // locked the mutex with CAS
   }
   
   // queueLifo表示当前的goroutine以前是否在队列里面,queueLifo为true表示以前就在队列里面
   queueLifo := waitStartTime != 0
   if waitStartTime == 0 {
    // 当前的goroutine以前不在队列里面,将当前的时间保存起来
    waitStartTime = runtime_nanotime()
   }
   // 休眠当前goroutine等待,如果是新的goroutine,它的queueLifo为false,会加入到
   // 等待队列的尾部 ;如果是唤醒的goroutine,它的queueLifo为true,会加入到等待队列的队头
   runtime_SemacquireMutex(&m.sema, queueLifo, 1)
   // 休眠之后被唤醒,计算它是否处处于饥饿状态
   starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
   old = m.state
   // 如果锁已经处于饥饿状态,抢到锁返回
   if old&mutexStarving != 0 {
    if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
     throw("sync: inconsistent mutex state")
    }
    // delta为7,二级制为0111,此时old二进制最低3位为100,所以old-delta,
    // 相当于waiter减1并加锁
    delta := int32(mutexLocked - 1<<mutexWaiterShift)
    // 如果没有处于饥饿模式或者是最后一个waiter,需要将锁设置为正常模式
    if !starving || old>>mutexWaiterShift == 1 {
     delta -= mutexStarving
    }
    // 设置state为新状态,因为已经获得了锁,直接break返回
    atomic.AddInt32(&m.state, delta)
    break
   }
   awoke = true
   iter = 0
  } else {
   old = m.state
  }
 }

 if race.Enabled {
  race.Acquire(unsafe.Pointer(m))
 }
}

上面分析完了加锁的处理流程,现在我们一起来看解锁处理流程。Unlock会先检查最简单的情况,持有锁的只有1个goroutine,没有其他goroutine抢锁。这种情况直接修改state的值进行解锁。其他情况走unlockSlow逻辑。如果锁处于饥饿模式,直接唤醒等待队列中的goroutine.如果锁处于正常状态,如果没有waiter,或者已经有在处理的情况了,那么释放就好,不用做额外的处理。否则,waiter数减 1,并设置唤醒标志,通过CAS操作更新state的值,然后执行runtime_Semrelease唤醒一个goroutine返回。

代码语言:javascript
复制
func (m *Mutex) Unlock() {
 if race.Enabled {
  _ = m.state
  race.Release(unsafe.Pointer(m))
 }

 // 解锁
 new := atomic.AddInt32(&m.state, -mutexLocked)
 // 如果new为0,说明只有当前的执行体持有锁,不用做其他的处理
 // 如果new非0,需要执行其他操作,尝试唤醒等待者
 if new != 0 {
  m.unlockSlow(new)
 }
}

func (m *Mutex) unlockSlow(new int32) {
 if (new+mutexLocked)&mutexLocked == 0 {
  throw("sync: unlock of unlocked mutex")
 }
 // 没有处于饥饿模式,即处于正常模式
 if new&mutexStarving == 0 {
  old := new
  for {
   // 下面四个条件满足任一一个就直接返回,不做唤醒其他goroutine处理:
   // 1. 没有waiter了,所以也就没有唤醒的对象了
   // 2. 锁处于锁定状态,表明被其他goroutine抢到了
   // 3. 锁处于唤醒状态,表明有其他等待的goroutine被唤醒
   // 4. 锁处于饥饿模式下,将会解锁队列头的goroutine
   if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
    return
   }
   // 将等待的waiter数减一,并设置为唤醒状态
   new = (old - 1<<mutexWaiterShift) | mutexWoken
   // 原子操作设置m.state为新状态new
   if atomic.CompareAndSwapInt32(&m.state, old, new) {
    // 唤醒一个waiter然后返回
    runtime_Semrelease(&m.sema, false, 1)
    return
   }
   // 执行CAS操作失败会走到这里,操作失败的原因是锁的状态已经被别的goroutine改变了
   // 这里更新状态后执行下一个循环
   old = m.state
  }
 } else {
  // 处于饥饿状态,直接唤醒队列头部的goroutine
  runtime_Semrelease(&m.sema, true, 1)
 }
}
总结

Mutex实现使用了CAS+自旋操作+信号量技术,通过正常模式和饥饿模式兼顾公平和性能。在正常模式下,主打性能,被唤醒的goroutine并不是直接拥有锁,而是与新请求锁的goroutine竞争。把锁交给正在占用CPU时间片的goroutine,这样就不需要做上下文切换。在饥饿模式下,保证等待时间最久的goroutine在锁被释放时优先执行,保证goroutine不会因等锁而饿死。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-09-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 数据小冰 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • mutex是什么
  • mutex数据结构
  • mutex实现原理
    • 自旋处理
      • 计算目标状态值
        • 设置目标状态
        • 总结
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档