本文为社区粉丝原创投稿,再次感谢作者南瓜waniu的分享,欢迎大家在评论区留言和作者讨论,同时也欢迎大家踊跃投稿,分享您的golang语言学习经验!投稿邮箱地址为tougao@golang.ltd
原创投稿:南瓜waniu
并发已经成为现代程序设计中的重要考虑内容,但是并发涉及到一个很重要的内容就是资源同步。当两个或者多个线程访问同样的资源的时候,运行的结果取决于线程运行时精确的时序。这样导致结果与期望的结果大相径庭,因此我们需要对资源的访问顺序进行控制,已达到资源同步的目的。
将共享资源(也就是共享内存)的程序片段成为临界区域
,通过适当安排,使得两个者线程同时位于临界区域。对于临界区域
访问解决方案,需要满足如下4个条件
常见的互斥解决方案:
屏蔽中断线程的切换是由CPU中断机制提供的,如果一个线程进入临界区域后,CPU关闭中断响应;在离开临界区域后,再打开中断机制。那么在临界区域将不会有其他线程来竞争资源。 当时将屏蔽中断权利交给用户空间执行是不明智的,而且对于多核CPU而言没有效果。
锁变量几乎每一个编程语言都提供了资源同步方式:锁机制。该机制通过对资源进行Lock
和Unlock
,以达到对关键资源有序访问。
严格轮换法线程不停的执行CPU时间,连续测试某一个值是否出现。但是如果认为等待的时间非常短,可以使用该方式浪费CPU时间,用于等待的锁也成为自旋锁
。
在理解信号量之前,先了解采用共享变量使用多线程会出现什么问题。下面是一个C代码片段
1for (i=0; i<niters; i++){
2 cnt ++;
3}
cnt
为全局变量,一个线程执行该代码片段的时候的汇编代码如下:
1 movq (%rdi), %rcx
2 testq %rcx, %rcx
3 jle .L2
4 movl $0, %eax
5.L3:
6 movq cnt(%rip), %rdx
7 addq %eax
8 movq %eax, cnt(%rip)
9 addq $1, %rax
10 cmpq %rcx, %rax
11 jne .L3
12.L2
2.2 信号量其中6-8
行分别对应对应着加载cnt
,更新cnt
和存储cnt
。将cnt
变量从内存位置读出,加载到CPU寄存器中,在CPU运算器中加1,然后存储到cnt
的内存位置。虽然代码中cnt++
只有一行,但是转换为汇编代码的时候不只有一个操作,也就是说该语句不是原子操作。如果多个线程同时执行代码,按照之前的条件,不对CPU的执行顺序做任何假设,如果其中线程a
在执行7
行汇编代码,而线程b
执行6
行汇编代码,那么b
将"看不到"线程a
对全局变量cnt
加1的操作,那么每次执行的结果cnt
也不完全一致。
计算机领域先驱Dijkstra
提出经典的解决上述问题的方法:信号量(semaphore)。它是一个非负整数的全局变量。而且该变量只能有两个特殊操作来处理: P
和V
。
s
非零,那么P
将s
减1
,并且立即返回。如果s
为零,那么就挂起这个线程,知道s
为非零。V
操作将s
加1
。如果有任何线程阻塞在P
操作等待s
非零,那么V
将重启其中线程中的一个。Posix
标准定义需要操作信号量的函数
1#include <semaphore.h>
2int sem_init(sem_t *sem, 0, unsigned int value);
3int sem_wait(sem_t *s); /*P(s)*/
4int sem_post(sem_t *s); /*P(s)*/
那么如何使用信号量是的2.1小节出现同步问题解决呢?首先定义全局信号量
1volatile long cnt = 0; /* global variable */
2sem_t mutex; /*global semaphore*/
初始化信号量,在这里初始值为1
1sem_init(&mutex, 0, 1);
最后使用信号量操作函数将临界区域代码包含起来
1for (i =0; i<niters; i++){
2 sem_wait(&mutex);
3 cnt++;
4 sem_post(&mutex);
5}
首先看一下死锁的规范定义:
如果一个线程(进程)集合中的每一个线程(进程)都在等待只能由该线程(进程)集合中的其他线程(进程)才能引发的事件,那么该线程(进程)集合是死锁的。
举一个例子,如果线程 a
和线程 b
同是执行,线程a
获取了资源r1
,等待获取资源r2
;而线程b
获取了资源r2
,等待获取资源r1
。那么线程a
和线程b
组成的集合是死锁的。
预防死锁
在某些情况下,当线程意识它不能获下一个资源的时候,它会“礼貌性”地释放已经获得的资源,然后等待1ms
,在尝试一次。如果另一个线程也在相同的时候做了相同的操作, 那么同步的步调将导致两个线程都无法前进。
在信号量小节中,当执行V
操作后,将恢复挂起线程中的一个,那么问题出现了:如果有多个线程被挂起,那么选择哪个线程恢复呢?如果随机选择一个线程恢复,如果源源不断的线程到达临界区域并且挂起,那么很有可能出现某一个线程一直等待资源,而导致"饥饿"。当然也有好的FILO
调度策略来解决调用问题。当时问题在于刚刚到达的线程有很好的局部性,也就是CPU的寄存器、缓存等包含了该线程的局部变量,如果程获得资源锁,很好的避免了线程上下文切换,对性能提高很有帮助。
在go
语言的互斥锁中采用结合上述两种策略,接下来小节中,将会仔细分析源码。
包含了Locker
接口和Mutex
结构:
1type Locker interface {
2 Lock()
3 Unlock()
4}
5type Mutex struct {
6 state int32
7 sema uint32
8}
Mutex
实现了Locker
接口,该结构包含了state
的字段,用来表示该锁当前状态;sema
则为一个信号量。state
是一个32位的整数,不同比特位包含了不同的意义,其中源码中的有很详细的注释,该注释很好解释mutex
如何工作:
互斥锁有两种状态:正常状态和饥饿状态。在正常状态下,所有挂起等待的goroutine按照
FIFO
顺序等待。唤醒的goroutine将会和刚刚到达的goroutine竞争互斥锁的拥有权,因为刚刚到达的goroutine具有优势:它刚刚正在CPU上执行,所以刚刚唤醒的goroutine有很大可能在锁竞争中失败。如果一个等待的goroutine超过1ms没有获取互斥锁,那么它将会把互斥锁转变为饥饿模式。在饥饿模式下,互斥锁的所有权将移交给等待队列中的第一个。新来的goroutine将不会尝试去获得互斥锁,也不会去尝试自旋操作,而是放在队列的最后一个。如果一个等待的goroutine获取的互斥锁,如何它满足一下其中的任何一个条件:(1)它是队列中的最后一个;(2)它等待的时候小于1ms。它会将互斥锁的转台转换为正常状态。正常状态有很好的性能表现,饥饿模式也是非常重要的的,因为它能阻止尾部延迟的现象。
1const (
2 mutexLocked = 1 << iota // mutex is locked
3 mutexWoken
4 mutexStarving
5 mutexWaiterShift = iota
6 starvationThresholdNs = 1e6
7)
mutexLocked
该值为1
, 第一位比特位1
,代表了该是否该互斥锁已经被锁住。mutex.state
与它进行&
操作,如果为1
表示已经锁住,0
则表示未被锁住。
mutexWoken
该值为2
,第二位比特位1
,代表了该互斥锁是否被唤醒,mutex.state
与它进行&
操作,如果为1
表示已经被唤醒,0
代表未被唤醒
mutexStarving
该值为4
,第三位比特为1
,代表了该互斥锁是否处于饥饿状态,mutex.state
与它进行&
操作,如果为1
表示处于饥饿转态,0
表示处于正常状态。
mutexWaiterShift
该值为3
,表示mutex.state
右移3位后为等待的goroutine
的数量。
starvationThresholdNs
goroutine
将互斥锁转换状态的时间等待的临界值:一百万纳秒,也就是1ms。
1if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
2 if race.Enabled {
3 race.Acquire(unsafe.Pointer(m))
4 }
5 return
6}
CompareAndSwapInt32
是一个原子操作,它判断是一个参数的值是否等于第二个参数,如果相等,则将第一个参数设置为第三个参数,并返回true
;否则对一个参数不做任何操作并且返回false
。这一段是代码是处理第一次goroutine
进行尝试Lock
操作,如果一切都是初始状态,则m.state
为.....0000001
并且返回,进入临界区域代码,否则代码继续往下走。
1var waitStartTime int64
2starving := false
3awoke := false
4iter := 0
5old := m.state
首先定义了一下变量:goroutine
等待时间,是否饥饿转台,是否唤醒和自旋迭代次数和保存当前互斥锁状态。接下来是一个for
循环,只有退出循环才能进入临界区域代码,纵观代码只有两处使用break
来退出循环。
1for {
2 if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
3 if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
4 atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
5 awoke = true
6 }
7 runtime_doSpin()
8 iter++
9 old = m.state
10 continue
11 }
12}
首先判断锁状态是被锁而且不是处于饥饿模式,加上还能自旋额次数,进入下一层判断。如果当前goroutine没有被唤醒,
其他goroutine也没有被唤醒,
等待的goroutine超过1和
可以将m.state设置为唤醒转态四个条件同时满足,将
awoke设置
true`。然后进行自旋操作,进行一轮循环。
1new := old
2if old&mutexStarving == 0 {
3 new |= mutexLocked
4}
5if old&(mutexLocked|mutexStarving) != 0 {
6 new += 1 << mutexWaiterShift
7}
8if starving && old&mutexLocked != 0 {
9 new |= mutexStarving
10}
这三个判断条件做了如下工作:如果当前的mutex.state
处于正常模式,则将new
的锁位设置为1,如果当前锁锁状态为锁定状态或者处于饥饿模式,则将等待的线程数量+1。如果starving
变量为true
并且处于锁定状态,则new
的饥饿状态位打开。
1if awoke {
2 if new&mutexWoken == 0 {
3 throw("sync: inconsistent mutex state")
4 }
5 new &^= mutexWoken
6}
如果 goroutine
已经被唤醒,则清空new
的唤醒位。
1if atomic.CompareAndSawpInt32(&m.state, old, new){
2 //...
3}else{
4 //...
5}
如果更新m.state
成功
1if old&(mutexLocked|mutexStarving) == 0 {
2 break
3}
如果未被锁定并且并不是出于饥饿状态,到达第一个break
,进入代码临界区域。
1queueLifo := waitStartTime != 0
2if waitStartTime == 0 {
3 waitStartTime = runtime_nanotime()
4}
5runtime_SemacquireMutex(&m.sema, queueLifo)
runtime_SemacquireMutex(s *uint32, lifo bool)
函数类似P
操作,如果lifo
为true
则将等待goroutine
插入到队列的前面。在这里,对于每一个到达的goroutine
,如果CompareAndSawpInt32
成功,并且到达时候如果锁出于锁定状态,那么将该goroutine
插入到等待队列的最后,否则插入到最前面。此时goroutine
将会被挂起,等待Unlock
的V
操作,将唤醒goroutines
1starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
2old = m.state
3if old&mutexStarving != 0 {
4 if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
5 throw("sync: inconsistent mutex state")
6 }
7 delta := int32(mutexLocked - 1<<mutexWaiterShift)
8 if !starving || old>>mutexWaiterShift == 1 {
9 delta -= mutexStarving
10 }
11 atomic.AddInt32(&m.state, delta)
12 break
13}
14
判断被唤醒的线程是否为达到饥饿状态,也就是等待时间超过1ms
,如果之前的m.state
不是饥饿状态,继续循环,给新到来goroutine
让出互斥锁。如果已经饥饿状态,则修改等待goroutine
数量和饥饿状态位,并返回进入临界代码区域。
1new := atomic.AddInt32(&m.state, -mutexLocked)
首先创建变量new
,该变量的锁位为0
。接下来是饥饿状态判断
1if new&mutexStarving == 0 {
2 old := new
3 for {
4 if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
5 return
6 }
7 new = (old - 1<<mutexWaiterShift) | mutexWoken
8 if atomic.CompareAndSwapInt32(&m.state, old, new) {
9 runtime_Semrelease(&m.sema, false)
10 return
11 }
12 old = m.state
13 }
14 } else {
15 runtime_Semrelease(&m.sema, true)
16 }
如果是正常状态,则判断如果等待的goroutine
为零,或者已经被锁定、唤醒、或者已经变成饥饿状态,返回,不需要唤醒任何其他被挂起的goroutine
,因为互斥锁已经被其他goroutine
抢占。否则更新new
值(修改等待的goroutine数量)并设置唤醒为,如果CompareAndSwapInt32
成功,则通过runtime_Semrelease(&m.sema, false)
恢复挂起的goroutine.r如果为 true
表明将唤醒第一个阻塞的goroutine
,这第一点在else
饥饿的分支中体现。
读写锁也是一种常见的锁机制,它允许多个线程读共享资源,只有一个线程写共享资源,接下来看看go中如何实现读写锁。
1type RWMutex struct {
2 w Mutex
3 writerSem uint32
4 readerSem uint32
5 readerCount int32
6 readerWait int32
7}
8const rwmutexMaxReaders = 1 << 30
RWMutex
结构包含了如下的字段
goroutine
数量。
1func (rw *RWMutex) RLock() {
2 // [...]
3 if atomic.AddInt32(&rw.readerCount, 1) < 0 {
4 runtime_Semacquire(&rw.readerSem)
5 }
6//[...]
7}
首先是readerCount
值+1, 如果小于零,则挂起goroutine
等待readerSem
。是不是很奇怪,为什么会小于零判断呢?在这里先卖一个关子,接下来会看到为什么是这样的设计逻辑。
1func (rw *RWMutex) RUnlock() {
2 //[...]
3 if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
4 if r+1 == 0 || r+1 == -rwmutexMaxReaders {
5 race.Enable()
6 throw("sync: RUnlock of unlocked RWMutex")
7 }
8 if atomic.AddInt32(&rw.readerWait, -1) == 0 {
9 runtime_Semrelease(&rw.writerSem, false)
10 }
11 }
12 //[...]
13}
首先将readerCount
减去1,如果小于零,再讲readWait
减去1,如果是离开读的goroutine
数量为零,则对writerSem
信号量进行V
操作。
1func (rw *RWMutex) Lock() {
2 //[...]
3 rw.w.Lock()
4 r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
5 if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
6 runtime_Semacquire(&rw.writerSem)
7 }
8 //[...]
9}
首先rw.w.Lock
操作,来防止其他goroutine
对共享资源的写访问。然后将readerCount
减去rwmutexMaxReaders
,表明还剩多少goroutine
可以进行读访问,这也解释在RLock
中小于零的判断,如果还可以还可以进行读访问,则必须获得readerSem
信号量。在Lock
中接下来是对readWait
判断,如果该数量不为零,则需要对writerSem
进行P
操作,而V
操作只在RUnlock
方法中,如果最后一个读goroutine
离开,则进行V
操作。
1func (rw *RWMutex) Unlock() {
2 //[...]
3 r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
4 if r >= rwmutexMaxReaders {
5 race.Enable()
6 throw("sync: Unlock of unlocked RWMutex")
7 }
8 for i := 0; i < int(r); i++ {
9 runtime_Semrelease(&rw.readerSem, false)
10 }
11 //[...]
12}
首先恢复readCounter
为正数,然后对readerSem
信号量进行r
次V
操作,唤醒在RLock
中被挂起的goroutine
。
WaitGroup
通常用在等待一组goroutine
全部完成。调用Add
方法指明要等待的goroutine
的数量,调用Done
方法说明该goroutine
已经完成,而Wait
方法是阻塞等待的goroutine
。
1type WaitGroup struct {
2 noCopy noCopy
3 state1 [12]byte
4 sema uint32
5}
noCopy
字段说明WaitGroup
不允许拷贝,而state1
字段是一个非常tricky
的方法,用其中的8
个字节(64bit)来保存一些状态。高位的32bit用来表示需要等待的goroutine
的数量,地位的32
bit用来表示被挂起的goroutine
的数量。至于为什么不直接使用64bit
的数据主要是为了考虑32为编译器无法保证64位对齐。sema
则是一个信号量。
1func (wg *WaitGroup) state() *uint64 {
2 if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
3 return (*uint64)(unsafe.Pointer(&wg.state1))
4 } else {
5 return (*uint64)(unsafe.Pointer(&wg.state1[4]))
6 }
7}
该方法是一个辅助方法,用来获取state
,一个64为的无符号整数。
1func (wg *WaitGroup) Done() {
2 wg.Add(-1)
3}
Done方法其实调用了Add(-1)
方法,所以我们着重讨论Add
方法。
1func (wg *WaitGroup) Add(delta int) {
2 statep := wg.state()
3 //[...]
4 state := atomic.AddUint64(statep, uint64(delta)<<32)
5 v := int32(state >> 32)
6 w := uint32(state)
7 if race.Enabled && delta > 0 && v == int32(delta) {
8 race.Read(unsafe.Pointer(&wg.sema))
9 }
10 if v < 0 {
11 panic("sync: negative WaitGroup counter")
12 }
13 if w != 0 && delta > 0 && v == int32(delta) {
14 panic("sync: WaitGroup misuse: Add called concurrently with Wait")
15 }
16 if v > 0 || w == 0 {
17 return
18 }
19 if *statep != state {
20 panic("sync: WaitGroup misuse: Add called concurrently with Wait")
21 }
22 // Reset waiters count to 0.
23 *statep = 0
24 for ; w != 0; w-- {
25 runtime_Semrelease(&wg.sema, false)
26 }
27}
首先是获取state
,然后将delta
右移32位,加上等待的goroutine
数量。v
和w
分别代表了需要等待的goroutine
和被阻塞的goroutine
的数量。接下来v==int32(delta)
判断条件表明如果是第一次Add
操作,则必须与等待的goroutine
同步,在Wait
方法中可以看到同样的操作。接下来是一些抛异常操作,如果等待的数量为负数,如何第一次Add
操作没有同步。if >0 || w==0
条件表明如何v
没有降到零,或者被阻塞的goroutine
数量为零,直接返回。如何v
为零,则按照w
的数量,依次对信号量ws.sema
进行V
操作。
1func (wg *WaitGroup) Wait() {
2 //[...]
3 for {
4 state := atomic.LoadUint64(statep)
5 v := int32(state >> 32)
6 w := uint32(state)
7 //[...]
8 // Increment waiters count.
9 if atomic.CompareAndSwapUint64(statep, state, state+1) {
10 if race.Enabled && w == 0 {
11 race.Write(unsafe.Pointer(&wg.sema))
12 }
13 runtime_Semacquire(&wg.sema)
14 if *statep != 0 {
15 panic("sync: WaitGroup is reused before previous Wait has returned")
16 }
17 //[...]
18 return
19 }
20 }
21}
Wait
方法同样也是CAS算法,首先获取需要等待的goroutine
的数量v
和阻塞的goroutine
数量w
, 然后将阻塞的goroutine
数量+1,如果之前的w
为零,表示是第一次等待,则与Add
操作进行同步,最后后对信号量wg.sema
进行P
操作。
在编程中使用Cond
也叫管程(monitor)
,它可以用来使不同线程完成互斥条件,也可以使某个线程等待某个条件的发生。常见的使用模式如下:
1var locker = new(sync.Mutex)
2var cond = sync.NewCond(locker)
3var condition = true
4// goroutine A
5cond.L.Lock()
6for condition {
7 cond.Wait()
8}
9// ...
10cond.L.Unlock()
11
12//goroutine B
13condiiton = false
14cond.Signal()
为什么使用for
循环作为判断进入Wait
的条件而不是if
呢?主要是防止为被唤醒的goroutine
在返回Wait
调用的时候,恰好有别的goroutine
修改了conditon
的值,所以需要使用for
循环作为条件判断。
1type Cond struct {
2 noCopy noCopy
3 L Locker
4 notify notifyList
5 checker copyChecker
6}
Cond
结构不允许拷贝,包含了Locker
的接口字段,和一个notifyList
的集合字段。
1func NewCond(l Locker) *Cond {
2 return &Cond{L: l}
3}
实现Locker
接口的类型都可以,一般为Mutex
和RWMutex
1func (c *Cond) Wait() {
2 c.checker.check()
3 t := runtime_notifyListAdd(&c.notify)
4 c.L.Unlock()
5 runtime_notifyListWait(&c.notify, t)
6 c.L.Lock()
7}
在使用Wait
方法之前,要调用c.L.Lock
来进入临界区域,将当前等待的goroutine
加入到通知队列中,然后调用c.L.Unlock()
来退出临界区域,以便让其他goroutine
可以进入等待区域。紧接着挂起goroutine
,等待消息。
1func (c *Cond) Signal() {
2 c.checker.check()
3 runtime_notifyListNotifyOne(&c.notify)}
runtime_notifyListNotifyOne
唤起其中的等待的goroutine
。
1func (c *Cond) Broadcast() {
2 c.checker.check()
3 runtime_notifyListNotifyAll(&c.notify)
4}
runtime_notifyListNotifyAll
唤起全部等待的goroutine
。
版权申明:内容来源网络,版权归原创者所有。除非无法确认,我们都会标明作者及出处,如有侵权烦请告知,我们会立即删除并表示歉意。谢谢。