前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Mutex的实现

Mutex的实现

原创
作者头像
用户7381369
发布2020-10-29 10:39:03
1.4K0
发布2020-10-29 10:39:03
举报
文章被收录于专栏:Go 并发Go 并发

Mutex的实现

1. Mutex的演进

2. 初版互斥锁

2.1 CAS

CAS 指令将给定的值和一个内存地址中的值进行比较,如果相等,则用新值替换内存地址中的值。

CAS操作是原子性的。

2.2 实现原理

代码语言:txt
复制
   // CAS操作,当时还没有抽象出atomic包
    func cas(val *int32, old, new int32) bool
    func semacquire(*int32)
    func semrelease(*int32)
    // 互斥锁的结构,包含两个字段
    type Mutex struct {
        key  int32 // 锁是否被持有的标识
        sema int32 // 信号量专用,用以阻塞/唤醒goroutine
    }
    
    // 保证成功在val上增加delta的值
    func xadd(val *int32, delta int32) (new int32) {
        for {
            v := *val
            if cas(val, v, v+delta) {
                return v + delta
            }
        }
        panic("unreached")
    }
    
    // 请求锁
    func (m *Mutex) Lock() {
        if xadd(&m.key, 1) == 1 { //标识加1,如果等于1,成功获取到锁
            return
        }
        semacquire(&m.sema) // 否则阻塞等待
    }
    
    func (m *Mutex) Unlock() {
        if xadd(&m.key, -1) == 0 { // 将标识减去1,如果等于0,则没有其它等待者
            return
        }
        semrelease(&m.sema) // 唤醒其它阻塞的goroutine
    }    
  • goroutine 调用Lock 请求锁。
    • 若当前锁没有被其他goroutine 持有,则获取锁成功,并返回。
    • 若当前锁被其他goroutine持有,则阻塞自己。
  • goroutine 调用unlock 释放锁
    • 若当前锁没有被人等待,则返回。
    • 若当前锁被人等待,则唤醒其他阻塞的goroutine。
  • lock 和 unlock 必须成对出现。

2.3 面临的问题

请求锁的goroutine会排队等待,在性能上不是最优的。优化点在于把锁交给正在持有CPU时间片的goroutine。

3. 第二代 - 给新人机会

3.1 Mutex的结构体

代码语言:txt
复制
type Mutex struct {
    state int32
    sema  uint32
}


const (
    mutexLocked = 1 << iota // mutex is locked  // = 1
    mutexWoken  // 2
    mutexWaiterShift = iota // 2
)

Mutex中有两个字段:

  • state 描述状态
  • sema 信号量

state字段分为三个部分:

  • mutexLocked 持有锁的标志位,0表示不持有锁,1表示持有锁。
  • mutexWoken 唤醒标志,0表示没有唤醒,1表示唤醒
  • mutexWaiters 表示 阻塞等待的waiter数量。

3.2 加锁过程

代码语言:txt
复制
  func (m *Mutex) Lock() {
        // Fast path: 幸运case,能够直接获取到锁
        if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
            return
        }

        awoke := false
        for {
            old := m.state
            new := old | mutexLocked // 新状态加锁
            if old&mutexLocked != 0 {
                new = old + 1<<mutexWaiterShift //等待者数量加一
            }
            if awoke {
                // goroutine是被唤醒的,
                // 新状态清除唤醒标志
                new &^= mutexWoken
            }
            if atomic.CompareAndSwapInt32(&m.state, old, new) {//设置新状态
                if old&mutexLocked == 0 { // 锁原状态未加锁
                    break
                }
                runtime.Semacquire(&m.sema) // 请求信号量
                awoke = true
            }
        }
    }
  • 如果goroutine A 申请锁,如果当前锁没有被goroutine持有,则对直接获取锁。(3-5行)
  • 如果goroutine B 之情已经获取了锁,goroutine A 尝试获取锁 (7-26行)
    • goroutine B 的唤醒标志置false
    • 进入for循环
      • 对当前state (old state) 加锁 ,生成新状态(new state)(10行)
      • 判断old state 是否被锁住,若已加锁,则把新状态 (new state)的等待数量+1
      • 若果当前goroutine 是被唤醒的,则清除唤醒标志。
      • 设置锁的新状态
      • 若之前的状态 old state 没有加锁,则获取锁成功,返回。
      • 若之前的状态已经加锁,则等待信号量
      • 有锁被释放的时候,goroutine B会被唤醒
      • 此时置唤醒状态为true
      • 对当前state (old state) 加锁 ,生成新状态(new state)(10行)
      • 判断old state 是否被锁住,若已加锁,则把新状态 (new state)的等待数量+1
      • 若果当前goroutine 是被唤醒的,则清除唤醒标志。
      • 设置锁的新状态
      • 若之前的状态 old state 没有加锁,则获取锁成功,返回。
      • 否则等待信号量

对应下面两种处理逻辑:

请求锁的goroutine

当前锁被持有

当前锁未被持有

新来的goroutine

waiter++; 休眠

获取到锁

被唤醒的goroutine

清除mutexWoken标志;重新休眠,加入等待队列

清楚mutexWoken;获取到锁

4. 多给些机会

4.1 加锁过程

在上一版的基础上增加了自旋操作,如果新来的goroutine 或者被唤醒的goroutine 首次获取不到锁,它们会自旋,尝试检查锁是否被释放。

代码语言:txt
复制
  func (m *Mutex) Lock() {
        // Fast path: 幸运之路,正好获取到锁
        if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
            return
        }

        awoke := false
        iter := 0
        for { // 不管是新来的请求锁的goroutine, 还是被唤醒的goroutine,都不断尝试请求锁
            old := m.state // 先保存当前锁的状态
            new := old | mutexLocked // 新状态设置加锁标志
            if old&mutexLocked != 0 { // 锁还没被释放
                if runtime_canSpin(iter) { // 还可以自旋
                    if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                        atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                        awoke = true
                    }
                    runtime_doSpin()
                    iter++
                    continue // 自旋,再次尝试请求锁
                }
                new = old + 1<<mutexWaiterShift
            }
            if awoke { // 唤醒状态
                if new&mutexWoken == 0 {
                    panic("sync: inconsistent mutex state")
                }
                new &^= mutexWoken // 新状态清除唤醒标记
            }
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                if old&mutexLocked == 0 { // 旧状态锁已释放,新状态成功持有了锁,直接返回
                    break
                }
                runtime_Semacquire(&m.sema) // 阻塞等待
                awoke = true // 被唤醒
                iter = 0
            }
        }
    }

13行- 21行,如果可以自旋,若当前goroutine 没有被唤醒,mutex 的等待者不为0,则唤醒当前goroutine。

执行runtime_doSpin(). 当前进程进入自旋后,就一直保持CPU占有,持续检查某个条件为真。在多核的CPU上,自旋可以避免Goroutine切换。

5. 解决饥饿问题

5.1 state 字段

5.2 加锁过程

代码语言:txt
复制
   type Mutex struct {
        state int32  // 状态
        sema  uint32 // 信号量
    }
    
    const (
        mutexLocked = 1 << iota // mutex is locked  // 1
        mutexWoken // 2
        mutexStarving // 从state字段中分出一个饥饿标记   =4
        mutexWaiterShift = iota // 3
    
        starvationThresholdNs = 1e6
    ) 

    func (m *Mutex) Lock() {
        // Fast path: 幸运之路,一下就获取到了锁
        if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
            return
        }
        // Slow path:缓慢之路,尝试自旋竞争或饥饿状态下饥饿goroutine竞争
        m.lockSlow()
    }
    
    func (m *Mutex) lockSlow() {
        var waitStartTime int64
        starving := false // 此goroutine的饥饿标记
        awoke := false // 唤醒标记
        iter := 0 // 自旋次数
        old := m.state // 当前的锁的状态
        for {
            // 锁是非饥饿状态,锁还没被释放,尝试自旋
            if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
                if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                    atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                    awoke = true
                }
                runtime_doSpin()
                iter++
                old = m.state // 再次获取锁的状态,之后会检查是否锁被释放了
                continue
            }
            new := old
            if old&mutexStarving == 0 {
                new |= mutexLocked // 非饥饿状态,加锁
            }
            if old&(mutexLocked|mutexStarving) != 0 {
                new += 1 << mutexWaiterShift // waiter数量加1
            }
            if starving && old&mutexLocked != 0 {
                new |= mutexStarving // 设置饥饿状态
            }
            if awoke {
                if new&mutexWoken == 0 {
                    throw("sync: inconsistent mutex state")
                }
                new &^= mutexWoken // 新状态清除唤醒标记
            }
            // 成功设置新状态
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                // 原来锁的状态已释放,并且不是饥饿状态,正常请求到了锁,返回
                if old&(mutexLocked|mutexStarving) == 0 {
                    break // locked the mutex with CAS
                }
                // 处理饥饿状态

                // 如果以前就在队列里面,加入到队列头
                queueLifo := waitStartTime != 0
                if waitStartTime == 0 {
                    waitStartTime = runtime_nanotime()
                }
                // 阻塞等待
                runtime_SemacquireMutex(&m.sema, queueLifo, 1)
                // 唤醒之后检查锁是否应该处于饥饿状态
                starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
                old = m.state
                // 如果锁已经处于饥饿状态,直接抢到锁,返回
                if old&mutexStarving != 0 {
                    if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                        throw("sync: inconsistent mutex state")
                    }
                    // 有点绕,加锁并且将waiter数减1
                    delta := int32(mutexLocked - 1<<mutexWaiterShift)
                    if !starving || old>>mutexWaiterShift == 1 {
                        delta -= mutexStarving // 最后一个waiter或者已经不饥饿了,清除饥饿标记
                    }
                    atomic.AddInt32(&m.state, delta)
                    break
                }
                awoke = true
                iter = 0
            } else {
                old = m.state
            }
        }
    }

加入饥饿模式之后,优先让饥饿的等待者获取锁。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Mutex的实现
    • 1. Mutex的演进
      • 2. 初版互斥锁
        • 2.1 CAS
          • 2.2 实现原理
          • 2.3 面临的问题
        • 3. 第二代 - 给新人机会
          • 3.1 Mutex的结构体
          • 3.2 加锁过程
        • 4. 多给些机会
          • 4.1 加锁过程
        • 5. 解决饥饿问题
          • 5.1 state 字段
          • 5.2 加锁过程
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档