前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >万字图解| 深入揭秘Golang锁结构:Mutex(下)

万字图解| 深入揭秘Golang锁结构:Mutex(下)

作者头像
公众号 云舒编程
发布2024-01-25 16:06:19
1080
发布2024-01-25 16:06:19
举报
文章被收录于专栏:图解系列图解系列

大家好,我是「云舒编程」,今天我们来聊聊Golang锁结构:Mutex。

文章首发于微信公众号:云舒编程

一、前言

    书接上回,在万字图解| 深入揭秘Golang锁结构:Mutex(上)一文中,我们已经研究了Golang mutex V1和V2版本的实现。接下来我们继续研究V3和V4版本的实现。

二、面试中遇到Mutex

    为了让剧情顺利发展,我们依旧使用万字图解| 深入揭秘Golang锁结构:Mutex(上)一文中的面试对话模式。 面试官:你现在实现的锁的确给了新来的Goroutine直接获取锁的机会,但是还不够优雅。比如说,新Goroutine尝试获取锁失败的那一刻,锁就被释放了,但是新Goroutine需要等到下一次信号量唤醒加调度才有机会再次获取锁,这样其实浪费了新Goroutine的CPU时间,你可以再优化下吗? :考虑到这种情况,可以尝试给新的Goroutine多次获取锁的机会,说白了就是允许自旋,但是需要给自旋加一些限制条件,避免最开始提到的性能问题。 面试官:需要加哪些限制条件呢? :首先需要限制自旋的次数,其次操作系统的处理器个数和Golang 调度的P个数都必须大于1,否则就会是串行,自旋就没有意义了。 面试官:不错,怎么实现呢? :我来写下:

代码语言:javascript
复制
const (
    mutexLocked      = 1 // mutex is locked
    mutexWoken       = 2 
    mutexWaiterShift = 2
)

type Mutex struct {
    state int32
    sema  uint32
}

func (m *Mutex) Lock() {
    //给新来的协程直接加锁的机会
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        return
    }

    //上面没有加锁成功,尝试在接下来的唤醒中去竞争锁
    awoke := false //表示当前协程是不是被唤醒的
    iter := 0 //记录当前自旋的次数
    for {
        old := m.state
        new := old | mutexLocked // 设置锁标志位为1
        if old&mutexLocked != 0 {
            //判断是否满足自旋条件
            if runtime_canSpin(iter) {
    if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
     atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
     awoke = true
    }

                //内部调用procyield函数,该函数也是汇编语言实现。
                //函数内部循环调用PAUSE指令。减少cpu的消耗,节省电量。
                //指令的本质功能:让加锁失败时cpu睡眠30个(about)clock,从而使得读操作的频率低很多。流水线重排的代价也会小很多。
    runtime_doSpin() 
    iter++
    continue
   }
            
            new = old + 1<<mutexWaiterShift //锁没有释放,当前协程可能会阻塞在信号量上,先将waiter+1
        }
       ··· //剩下的不变
    }
}
代码语言:javascript
复制
//判断是否可以自旋,同时满足以下4个条件才能自旋:
//1、自旋次数小于4次
//2、cpu核数大于1
//3、GOMAXPROCS>1
//4、running P > 1 并且 P队列为空
func sync_runtime_canSpin(i int) bool {
 if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
  return false
 }
 if p := getg().m.p; p.runqhead != p.runqtail {
  return false
 }
 return true
}

解锁部分没有变化 面试官:通过自旋,对于临界区代码执行非常短的场景来说,这是一个非常好的优化。因为临界区的代码耗时很短,锁很快就能释放,而抢夺锁的 goroutine不用通过休眠唤醒方式等待调度,直接自旋几次,可能就获得了锁。但是这里也存在一个问题,你知道什么是【饥饿】吗? :因为新来的 goroutine 也参与竞争,有可能每次都会被新来的 goroutine 抢到获取锁的机会,在极端情况下,等待中的goroutine可能会一直获取不到锁,就会导致【饥饿】。 面试官:那怎么解决呢? :可以考虑给锁加一个标识,比如我们可以检测当一个老goroutine超过一定时间都没有获取到锁,那么他就给锁打上一个【饥饿】的标识,新来的goroutine发现存在该标识就不再通过自旋抢锁,而是直接进入信号量的等待队列的队尾。同时把老goroutine移动到信号量等待队列的队头,这样老goroutine下次就可以直接获取到锁了。 面试官:思路不错,写下代码吧。 :我们还是把state在拿出来一位表示锁是不是处于饥饿状态。

图片
图片

加锁部分:

代码语言:javascript
复制
type Mutex struct {
 state int32
 sema  uint32
}

const (
 mutexLocked      = 1 // mutex is locked
 mutexWoken       = 2
 mutexWaiterShift = 3
 mutexStarving    = 4 //新增字段,标识
)

func (m *Mutex) Lock() {
 //给新来的协程直接加锁的机会
 if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
  return
 }

 m.lockSlow()
}

func (m *Mutex) lockSlow() {
 var waitStartTime int64 //表示等待了多久还没获取到锁
 starving := false       //表示当前锁是否处于饥饿状态
 awoke := false          //表示当前协程是不是被唤醒的
 iter := 0               //自旋次数
 old := m.state

 for {
  //满足以下条件才能进入自旋:
  //1、锁不是饥饿状态,并且没有获取到锁
  //2、满足自旋条件runtime_canSpin
  if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
   if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
    atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
    awoke = true
   }

   //内部调用procyield函数,该函数也是汇编语言实现。
   //函数内部循环调用PAUSE指令。减少cpu的消耗,节省电量。
   //指令的本质功能:让加锁失败时cpu睡眠30个(about)clock,从而使得读操作的频率低很多。流水线重排的代价也会小很多。
   runtime_doSpin()
   iter++
   old = m.state
   continue
  }

  new := old

  //如果当前锁不是饥饿状态,尝试将加锁标志置为1
  if old&mutexStarving == 0 {
   new |= mutexLocked
  }

  //锁没有释放,或者是饥饿模式,当前协程会阻塞在信号量上,将waiter+1
  if old&(mutexLocked|mutexStarving) != 0 {
   new = old + 1<<mutexWaiterShift
  }

  //已经等待超过了1ms,且锁被其他协程占用,则进入饥饿模式
  if starving && old&mutexLocked != 0 {
   new |= mutexStarving
  }

  if awoke { //尝试清除唤醒标志
   if new&mutexWoken == 0 {
    throw("sync: inconsistent mutex state")
   }
   new &^= mutexWoken
  }

  //这里尝试将state从old设置为new。如果代码能够执行到这步,代表了可能发生以下几种情况的一种或者多种
  //1、当前协程尝试加锁
  //2、waiter+1
  //3、清除唤醒标志
  //4、想将锁设置为饥饿模式
  if atomic.CompareAndSwapInt32(&m.state, old, new) {
   if old&(mutexLocked|mutexStarving) == 0 {
    //不是饥饿状态,并且成功获取到锁了,返回
    break
   }

   //是否已经等待过了
   queueLifo := waitStartTime != 0
   if waitStartTime == 0 {
    waitStartTime = runtime_nanotime()
   }

   // 阻塞等待
   // queueLifo 为 true,表示加入到队列头。否则,加入到队列尾。
   // (首次加入队列加入到队尾,不是首次加入则加入队头,这样等待最久的 goroutine 优先能够获取到锁。)
   runtime_SemacquireMutex(&m.sema, queueLifo, 1)

   // 从等待队列中唤醒,根据等待时间,判断锁是否应该进入饥饿模式。
   starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs

   //如果锁已经是饥饿状态了,直接抢锁返回
   if old&mutexStarving != 0 {
    if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
     throw("sync: inconsistent mutex state")
    }

    //加锁并且等待者数量减一
    delta := int32(mutexLocked - 1<<mutexWaiterShift)

    // 清除饥饿状态的两种情况:
    //. 如果不需要进入饥饿模式(当前被唤醒的 goroutine 的等待时间小于 1ms)
    //. 原来的等待者数量为 1,说明是最后一个被唤醒的 goroutine。
    if !starving || old>>mutexWaiterShift == 1 {
     //清除饥饿状态
     delta -= mutexStarving
    }

    //设置状态,加锁
    atomic.AddInt32(&m.state, delta)
    break
   }
   // 设置唤醒标记,重新抢占锁(会与那些运行中的 goroutine 一起竞争锁)
   awoke = true
   iter = 0
  } else {
   old = m.state
  }
 }
}

解锁部分:

代码语言:javascript
复制
func (m *Mutex) Unlock() {
 new := atomic.AddInt32(&m.state, -mutexLocked)
 //还有人阻塞在信号量上,需要进行唤醒
 if new != 0 {
  m.unlockSlow(new)
 }
}

func (m *Mutex) unlockSlow(new int32) {
 if (new+mutexLocked)&mutexLocked == 0 {
  fatal("sync: unlock of unlocked mutex")
 }

 //非饥饿模式
 if new&mutexStarving == 0 {
  old := new
  for {
   //以下情况都直接结束,不继续往下:
   //1、如果没有人阻塞在信号量上了
   //2、其他人加锁了
   //3、已经有人唤醒协程了
   //4、锁又切换成饥饿模式了
   if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
    return
   }

   //waiter-1 并且将唤醒标志置为1
   new = (old - 1<<mutexWaiterShift) | mutexWoken
   if atomic.CompareAndSwapInt32(&m.state, old, new) {
    //如果cas执行成功就唤醒队头协程
    runtime_Semrelease(&m.sema, false, 1)
    return
   }
   old = m.state
  }
 } else {
  //饥饿模式下直接唤醒队头,这里分为两种情况:
  //1、如果“mutexLocked”未设置,等待者在唤醒后会获取到锁。
  //2、如果只设置了“mutexStarving”,则仍然认为互斥锁已被锁定,因此新到来的goroutine不会获取它,唤醒的协程会获取到锁。
  runtime_Semrelease(&m.sema, true, 1)
 }
}

关于信号量相关接口:

代码语言:javascript
复制
//lifo:true 加入等待队列的队头,否则加入队尾
func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)

//handoff=true:将当前G直接放到runq,调度器可以立即该G,并且会继承上一个G的时间片,
//这样的目的是为了使当前G可以得到更快的调度。
func runtime_Semrelease(s *uint32, handoff bool, skipframes int)

面试官:不错,这就是Golang mutex目前的设计,兼容了公平与效率的方案。

三、总结

1、互斥锁是一种常见的控制并发读写资源的手段,go 中的互斥锁实现是 sync.Mutex。 2、Mutex 的对外的接口:

  • Lock:同一时刻只能有一个 goroutine 可以获取到锁,其他goroutine会先通过自旋抢占锁,抢不到则阻塞等待。
  • Unlock:释放锁,释放锁之后,会唤醒等待队列中的下一个 goroutine。

3、使用 Mutex 需要注意以下几点:

  • Lock与Unlock需要成对出现,在 Unlock 之前,必须已经调用了 Lock,否则会 panic。
  • 不要将 Mutex 作为函数或方法的参数传递。
  • 不要在锁内部执行阻塞或耗时操作。
  • 可以通过 vet 这个命令行来检查上面的锁 copy 的问题。

4、go 的 Mutex 基于以下技术实现:

  • 信号量:操作系统层面的同步机制。
  • 队列:在协程抢锁失败后,会将这些协程放入一个 FIFO 队列中,下次唤醒会唤醒队列头的协程。
  • 原子操作:通过cas操作状态字段state,可以保证数据的完整性。

5、go Mutex 的两种模式:

  • 正常模式:运行中的 goroutine 有一定机会比等待队列中的 goroutine 先获取到锁,这种模式有更好的性能。
  • 饥饿模式:所有后来的 goroutine 都直接进入等待队列,会依次从等待队列头唤醒 goroutine。可以有效避免【饥饿】。等待队列中的 goroutine 超过了 1ms 还没有获取到锁,那么会进入饥饿模式

四、最后

    “长城不是一天建成的”,Mutex 的设计也是从简单设计到复杂处理逐渐演变的。最初Mutex设计非常简洁,不到20行代码。但是,随着使用者越来越多,需要处理的场景也越来越复杂,逐渐暴露出一些缺陷,为了弥补这些缺陷,Mutex不得不加入越来越多的设计,也逐步变得越来越复杂。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2024-01-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、前言
  • 二、面试中遇到Mutex
  • 三、总结
  • 四、最后
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档