Mutex即我们常说的互斥锁,也称为排他锁。使用互斥锁,可以限定临界区只能同时有一个goroutine持有。当临界区已经有一个goroutine持有的时候,其他goroutine如果想进入此临界区,会等待或者返回失败。直到持有的goroutine退出临界区,等待goroutine中的某一个才有机会进入临界区执行代码。
对于如下代码,同时开启两个goroutine执行c.add操作,假设运行运行的机器是多核的,那这两个goroutine很可能是并行执行的,但是在执行c.count++时是串行的,只有获取到锁的goroutine才会执行,另一个会等待。
package main
import (
"fmt"
"sync"
)
type counter struct {
count int
sync.Mutex
}
func (c *counter) add() {
c.Lock()
defer c.Unlock()
c.count++
}
func (c *counter) value() int {
c.Lock()
defer c.Unlock()
return c.count
}
func main() {
var wg sync.WaitGroup
var c counter
wg.Add(2)
// goroutine 1
go func() {
defer wg.Done()
for i := 0; i < 5000; i++ {
c.add()
}
}()
// goroutine 2
go func() {
defer wg.Done()
for i := 0; i < 5000; i++ {
c.add()
}
}()
wg.Wait()
fmt.Println(c.value())
}
Mutex结构定义如下,它由state和sema两个字段组成,state表示当前互斥锁的状态,sema是用来控制锁状态的信号量。
type Mutex struct {
state int32
sema uint32
}
const (
// state的第一个bit位,表示是否加锁
mutexLocked = 1 << iota // mutex is locked
// state的第二个bit位,表示是否被唤醒
mutexWoken
// state的第三个bit位,表示是否处于饥饿模式
mutexStarving
// state的[4,32]bit位,表示等待锁的goroutine的数量
mutexWaiterShift = iota
// 饥饿时间1毫秒
starvationThresholdNs = 1e6
)
state是一个int32整数,它的最低3个bit分别表示是否已经加锁、是否唤醒、是否处于饥饿状态,剩余bit位的数值表示有多少个goroutine在等待,如下图所示。
为了保证锁的公平性,mutex有两种模式:正常模式和饥饿模式。正常模式下所有等待锁的goroutine按照队列的先进先出顺序等待。被唤醒的goroutine并不是直接拥有锁,而是与新请求锁的goroutine竞争。为啥要让新请求锁的goroutine也来竞争锁,而不是直接放到队列尾部呢?因为这看起来很公平,但从性能角度上来看,并不最优的。如果我们把锁交给正在占用CPU时间片的goroutine,这样就不需要做上下文切换,在高并发的情况下可能有更好的性能。
将唤醒的goroutine和新来请求锁的goroutine竞争锁,很可能导致唤醒的goroutine在竞争锁的时候失败,即新来的goroutine抢到了锁。唤醒的goroutine又被重新加入到队列的队头。那如果极端情况下会出现唤醒的goroutine一致抢不到锁,所以为了处理这种情况,设置了饥饿模式,如果唤醒goroutine等待超过1毫秒没有获取到锁,将会进入饥饿模式。
饥饿模式将锁的所有权直接从释放锁的goroutine移交给等待队列中的第一个goroutine。新来请求锁的goroutine不会尝试获取锁,即使锁看起来处于解锁状态,也不会进行自旋,而是直接放到队尾。
什么时候又从饥饿模式切换到正常模式呢?如果一个等待中的goroutine获得了锁并且满足下面两个条件之一,会从饥饿模式切换到正常模式。
下面结合源码(Go1.14版本)看Mutex的实现细节。先来看加锁处理逻辑,实现如下。处理两个步骤:
步骤1也就是happy path,处理简单的情景,复杂的逻辑封装到一个单独的含失踪,对应到这里的步骤2. 这样步骤1可以内联,Go源码中很多地方都用到了这种处理方式,值得我们学习使用。
func (m *Mutex) Lock() {
// happy path, 还没有人获取锁,即只有当前程序在获取锁,可以直接获取到
// 通过CAS操作设置m.state值,从0修改为1
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
m.lockSlow()
}
lockSlow处理逻辑很复杂,有太多的状态需要判断。整个核心处理就是一个for{}大循环,内部有很多处理细节。这里我们for循环中的逻辑分为3个逻辑块,方便理解,这三个逻辑块分别是自旋处理、计算目标状态值和设置目标状态。另外我们关注循环出口地方只有两处,其他地方都会继续执行循环。
自旋就是忙等待空转,让线程在某段时间内一直保持执行,从而避免线程上下文切换带来的开销。所以对于线程阻塞很短时间的场景是非常合适的。循环开始首先检查是否可以进行自旋的条件。条件1:锁已经被锁定 条件2:锁不处于饥饿模式,即处于正常模式 条件3:满足自旋条件runtime_canSpin
只有上述条件都满足才会开始自旋,自旋处理在runtime_doSpin
func (m *Mutex) lockSlow() {
// 记录goroutine等待的时间
var waitStartTime int64
// 标记当前的goroutine是否处于饥饿状态
starving := false
// goroutine是否被唤醒
awoke := false
// 自旋的次数
iter := 0
old := m.state
for {
// 满足这里的3个条件进行自旋:
// 1. 互斥锁已经被锁定,即有goroutine正在占用锁
// 2. 互斥锁当前不处于饥饿模式
// 3. 满足尝试自旋的条件runtime_canSpin
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 走到这里,说明当前的互斥锁处于正常模式,如果当前互斥锁还没有被唤醒,则标记为唤醒状态
// 唤醒的goroutine就是当前的goroutine. 通过CAS操作,将互斥锁更新为唤醒状态
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
// 进行自旋
runtime_doSpin()
// 自旋次数+1
iter++
// 再次获取锁的状态
old = m.state
continue
}
...
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
我们来看runtime_canSpin可以进行自旋的条件,该函数实现在runtime包中的proc.go文件,实现函数为下面的sync_runtime_canSpin函数。根据实现总结出以下情况会终止自旋:
func sync_runtime_canSpin(i int) bool {
// sync_runtime_canSpin函数中在以下四种情况返回false
// 1. 已经执行了很多次
// 2. 是单核CPU
// 3. 没有其他正在运行的P
// 4. 当前P的G队列为空
// active_spin为4
if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
return false
}
if p := getg().m.p.ptr(); !runqempty(p) {
return false
}
return true
}
接着来看执行自旋的逻辑runtime_doSpin,runtime_doSpin实现也在runtime/proc.go文件,它调用了procyield函数,procyield是汇编实现的,处理逻辑是调用PAUSE指令30次,通过该指令消耗CPU时间。
func sync_runtime_doSpin() {
// active_spin_cnt为30
procyield(active_spin_cnt)
}
TEXT runtime·procyield(SB),NOSPLIT,$0-0
MOVL cycles+0(FP), AX
again:
PAUSE
SUBL $1, AX
JNZ again
RET
互斥锁经过特殊的自旋逻辑处理后,会根据上下文计算当前互斥量的最新状态new。根据不同的条件更新状态字段mutexlocked、mutexstarting、mutexwoken和mutexwaitershift中存储的不同信息。
func (m *Mutex) lockSlow() {
...
for {
...
// 走到这里有四种情况:
// 1. 当前互斥锁处于正常模式,并且锁还没有被释放
// 2. 当前互斥锁处于饥饿模式,并且锁还没有被释放
// 3. 当前互斥锁处于正常模式,并且锁已经被释放
// 4. 当前互斥锁处于饥饿模式,并且锁已经被释放
new := old
// 之前处于非饥饿状态,新状态进行加锁
if old&mutexStarving == 0 {
new |= mutexLocked
}
// 之前处于加锁状态或是饥饿状态,当前的goroutine需要等待
// 之前处于加锁状态,则当前的goroutine还是不会获取到锁的,所以waiter+1
// 之前是饥饿状态,则当前的goroutine不会设置新状态锁,因为锁被转移给队列中的第一个goroutine.
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// 如果当前的goroutine正处于饥饿模式并且旧状态中锁还是处于锁定状态
// 则将新状态标记为饥饿模式
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
// 如果当前的goroutine已经设置为唤醒状态,需要清除新状态中的唤醒标记,因为走到这里,
// 当前的goroutine要么拿到锁,要么会进入休眠状态,反正不在是唤醒状态
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
// 当前的goroutine已经唤醒了,重置清理掉new中的唤醒标志
new &^= mutexWoken
}
...
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
计算最新的目标值new后,通过CAS更新state的值。如果更新成功,则旧状态未加锁,且锁不处于饥饿状态,说明当前goroutine竞争成功,获得锁返回。这就是为什么goroutine在正常模式下竞争时更有可能获得锁的原因。如果当前goroutine竞争失败,则使用runtime_SemacquireMutex来保证资源不被两个goroutines获取。runtime_SemacquireMutex 实现在runtime包中的sema.go文件。它是semacquire1的简单封装,里面最后会调用goPark让当前goroutine让出执行权限,同时设置当前goroutine为睡眠状态,即不参与调度。休眠之后某个时刻锁被释放此goroutine被唤醒,计算它是否处处于饥饿状态,如果锁已经处于饥饿状态,抢到锁返回。
func (m *Mutex) lockSlow() {
...
for {
...
// 成功设置为新状态
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 原来锁的状态已经释放并且不是处于饥饿状态,当前的goroutine请求到了锁的所有权直接返回
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// queueLifo表示当前的goroutine以前是否在队列里面,queueLifo为true表示以前就在队列里面
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
// 当前的goroutine以前不在队列里面,将当前的时间保存起来
waitStartTime = runtime_nanotime()
}
// 休眠当前goroutine等待,如果是新的goroutine,它的queueLifo为false,会加入到
// 等待队列的尾部 ;如果是唤醒的goroutine,它的queueLifo为true,会加入到等待队列的队头
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 休眠之后被唤醒,计算它是否处处于饥饿状态
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
// 如果锁已经处于饥饿状态,抢到锁返回
if old&mutexStarving != 0 {
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// delta为7,二级制为0111,此时old二进制最低3位为100,所以old-delta,
// 相当于waiter减1并加锁
delta := int32(mutexLocked - 1<<mutexWaiterShift)
// 如果没有处于饥饿模式或者是最后一个waiter,需要将锁设置为正常模式
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving
}
// 设置state为新状态,因为已经获得了锁,直接break返回
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
上面分析完了加锁的处理流程,现在我们一起来看解锁处理流程。Unlock会先检查最简单的情况,持有锁的只有1个goroutine,没有其他goroutine抢锁。这种情况直接修改state的值进行解锁。其他情况走unlockSlow逻辑。如果锁处于饥饿模式,直接唤醒等待队列中的goroutine.如果锁处于正常状态,如果没有waiter,或者已经有在处理的情况了,那么释放就好,不用做额外的处理。否则,waiter数减 1,并设置唤醒标志,通过CAS操作更新state的值,然后执行runtime_Semrelease唤醒一个goroutine返回。
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// 解锁
new := atomic.AddInt32(&m.state, -mutexLocked)
// 如果new为0,说明只有当前的执行体持有锁,不用做其他的处理
// 如果new非0,需要执行其他操作,尝试唤醒等待者
if new != 0 {
m.unlockSlow(new)
}
}
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
// 没有处于饥饿模式,即处于正常模式
if new&mutexStarving == 0 {
old := new
for {
// 下面四个条件满足任一一个就直接返回,不做唤醒其他goroutine处理:
// 1. 没有waiter了,所以也就没有唤醒的对象了
// 2. 锁处于锁定状态,表明被其他goroutine抢到了
// 3. 锁处于唤醒状态,表明有其他等待的goroutine被唤醒
// 4. 锁处于饥饿模式下,将会解锁队列头的goroutine
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 将等待的waiter数减一,并设置为唤醒状态
new = (old - 1<<mutexWaiterShift) | mutexWoken
// 原子操作设置m.state为新状态new
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 唤醒一个waiter然后返回
runtime_Semrelease(&m.sema, false, 1)
return
}
// 执行CAS操作失败会走到这里,操作失败的原因是锁的状态已经被别的goroutine改变了
// 这里更新状态后执行下一个循环
old = m.state
}
} else {
// 处于饥饿状态,直接唤醒队列头部的goroutine
runtime_Semrelease(&m.sema, true, 1)
}
}
Mutex实现使用了CAS+自旋操作+信号量技术,通过正常模式和饥饿模式兼顾公平和性能。在正常模式下,主打性能,被唤醒的goroutine并不是直接拥有锁,而是与新请求锁的goroutine竞争。把锁交给正在占用CPU时间片的goroutine,这样就不需要做上下文切换。在饥饿模式下,保证等待时间最久的goroutine在锁被释放时优先执行,保证goroutine不会因等锁而饿死。