前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >golang sync.Cond使用和实现原理

golang sync.Cond使用和实现原理

原创
作者头像
fnatic
发布2023-06-14 15:30:08
7.4K0
发布2023-06-14 15:30:08
举报
文章被收录于专栏:fnatic的区块链fnatic的区块链

Cond

sync.Cond 是基于互斥锁/读写锁实现的条件变量,用来协调想要访问共享资源的那些 Goroutine。当共享资源状态发生变化时,sync.Cond 可以用来通知等待条件发生而阻塞的 Goroutine。

假如有一个协程正在接收数据,其他协程必须等待这个协程接收完数据,才能读取到正确的数据。上述情形下,如果单纯的使用 channel 或者互斥锁,只能有一个协程可以等待,并读取到数据,没办法通知其他协程也读取数据。这个时候怎么办?

1)可以用一个全局变量标识第一个协程是否接收数据完毕,剩下的协程反复检查该变量的值,直到读取到数据。

2)也可创建多个 channel, 每个协程阻塞在一个 Channel 上,由接收数据的协程在数据接收完毕后,挨个通知,类似于msgbus实现。

然后 Go 中其实内置来一个 sync.Cond 来解决这个问题。

使用

下面的例子实现了通Cond实现通知协程的流程:

代码语言:go
复制
func TestCond(t *testing.T) {
	var locker = new(sync.Mutex)
	var cond = sync.NewCond(locker)
	for i := 0; i < 10; i++ {
		go func(x int) {
			cond.L.Lock()         //获取锁
			defer cond.L.Unlock() //释放锁
			cond.Wait()           //等待通知,阻塞当前goroutine
			fmt.Println(x)
		}(i)
	}

	time.Sleep(time.Second * 1)
	fmt.Println("Signal...")
	cond.Signal() // 下发一个通知给已经获取锁的goroutine
	time.Sleep(time.Second * 1)
	cond.Signal() // 3秒之后 下发一个通知给已经获取锁的goroutine
	time.Sleep(time.Second * 3)
	cond.Broadcast() //3秒之后 下发广播给所有等待的goroutine
	fmt.Println("Broadcast...")
	
	select {}
}

运行结果:
=== RUN   TestCond
Signal...
0
1
Broadcast...
2
6
4
5
7
3
9
8

Cond的主要方法有:

1)NewCond(l Locker) *Cond

NewCond 创建实例需要关联一个锁,使用方式为:

cond := sync.NewCond(&sync.Mutex{})

2)Wait()

Wait阻塞当前的 goroutine,等待唤起,在调用Wait前需要Lock,执行完Wait后的逻辑之后需要调用Unlock。

3)Signal()

Signal唤醒一个阻塞的goroutine,执行前不需要调用Lock。

4)Broadcast()

Broadcast唤起所有阻塞的 goroutine,执行前不需要调用Lock。

实现原理

数据结构

我们来看下sync.Cond的结构体,它的代码在 /sr/sync/cond.go下:

代码语言:go
复制
type Cond struct {
    noCopy noCopy       // 不可复制
    L Locker            // 锁
    notify  notifyList  // 通知唤起列表
    checker copyChecker // 复制检测,禁止第一次使用的Cond被复制拷贝
}

type notifyList struct {
	wait   uint32         // 当前wait的index
	notify uint32         // 当前notify的index
	lock   uintptr        // 锁
	head   unsafe.Pointer // 队头
	tail   unsafe.Pointer // 队尾
}

每个Cond实例都会关联一个锁 L(互斥锁 Mutex,或读写锁 RWMutex),当调用Wait方法时,必须加锁。

Wait方法

Wait方法的实现为:

代码语言:go
复制
func (c *Cond) Wait() {
	c.checker.check() // copy 检查
	t := runtime_notifyListAdd(&c.notify) // 向通知列表列表中加入该通知
	c.L.Unlock() // 暂时解锁
	runtime_notifyListWait(&c.notify, t) //通知操作
	c.L.Lock() //加锁,还原状态
}

//go:linkname notifyListAdd sync.runtime_notifyListAdd
func notifyListAdd(l *notifyList) uint32 {
	// 更新wait的index
	return atomic.Xadd(&l.wait, 1) - 1
}

//go:linkname notifyListWait sync.runtime_notifyListWait
func notifyListWait(l *notifyList, t uint32) {
	lockWithRank(&l.lock, lockRankNotifyList)

	// 判断需要等待notify的index是否合法
	if less(t, l.notify) {
		unlock(&l.lock)
		return
	}

	// 获取当前的sudog
	s := acquireSudog()
	s.g = getg()
    // sudog设置ticket
    s.ticket = t
    // 加入到队尾
	if l.tail == nil {
		l.head = s
	} else {
		l.tail.next = s
	}
	l.tail = s
    // gopark阻塞等待被唤醒
	goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
	releaseSudog(s)
}

Wait方法首先调用runtime_notifyListAdd方法将wait索引+1,然后调用runtime_notifyListWait将自己加入到等待队列中,然后释放锁,等待其他协程的唤醒。需要注意的是,Wait的使用方式最好是:

代码语言:go
复制
c.L.Lock()
for !condition() {
    c.Wait()
}
//    ... make use of condition ...
c.L.Unlock()

强制调用Wait方法前需要先获取该锁。这里的原因在于调用Wait方法如果不加锁,有可能会出现竞态条件。这里假设多个协程都处于等待状态,然后一个协程调用了Broadcast唤醒了其中一个或多个协程,此时这些协程都会被唤醒。

如下,假设调用Wait方法前没有加锁的话,那么所有协程都会去调用condition方法去判断是否满足条件,然后都通过验证,执行后续操作:

代码语言:go
复制
for !condition() {
    c.Wait()
}
c.L.Lock()
// 满足条件情况下,执行的逻辑
c.L.Unlock()

此时会出现的情况为,本来是需要在满足condition方法的前提下,才能执行的操作。现在有可能的效果,为前面一部分协程执行时,还是满足condition条件的;但是后面的协程,尽管不满足condition条件,还是执行了后续操作,可能导致程序出错。

正常的用法应该是,在调用Wait方法前便加锁,只会有一个协程判断是否满足condition条件,然后执行后续操作。这样子就不会出现即使不满足条件,也会执行后续操作的情况出现。

Signal方法

Signal的实现为:

代码语言:go
复制
func (c *Cond) Signal() {
	c.checker.check()
	runtime_notifyListNotifyOne(&c.notify)
}

//go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne
func notifyListNotifyOne(l *notifyList) {
	// 所有wait已经全部被notify
	if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
		return
	}

	lockWithRank(&l.lock, lockRankNotifyList)

	// Lock之后再次检查是否还有wait需要notify.
	t := l.notify
	if t == atomic.Load(&l.wait) {
		unlock(&l.lock)
		return
	}

	// 更新notify的index.
	atomic.Store(&l.notify, t+1)
    // 遍历notifyList中的sudog队列
	for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
        // 根据wait设置的ticket找到对应的sudog
		if s.ticket == t {
			n := s.next
			if p != nil {
				p.next = n
			} else {
				l.head = n
			}
			if n == nil {
				l.tail = p
			}
			unlock(&l.lock)
			s.next = nil
            // 执行goready,唤醒对应的sudog
			readyWithTime(s, 4)
			return
		}
	}
	unlock(&l.lock)
}

Signal方法notify一个wait的goroutine,获取当前的notify的index,然后遍历队列,根据index找到对应的sudog,唤醒这个sudog,wait等待的goroutine就可以继续执行了。

Broadcast方法

Broadcast方法的实现为:

代码语言:go
复制
func (c *Cond) Broadcast() {
	c.checker.check()
	runtime_notifyListNotifyAll(&c.notify)
}

//go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll
func notifyListNotifyAll(l *notifyList) {
	// 检查是否所有的wait都已经被唤醒
	if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
		return
	}

	lockWithRank(&l.lock, lockRankNotifyList)
    // broadcast是唤醒所有的waits,可以清除等待的队列
	s := l.head
	l.head = nil
	l.tail = nil

	// 直接更新notify的index为wait,表示全部已经被notify
	atomic.Store(&l.notify, atomic.Load(&l.wait))
	unlock(&l.lock)

	// 唤醒所有的sudog
	for s != nil {
		next := s.next
		s.next = nil
		readyWithTime(s, 4)
		s = next
	}
}

broadcast方法是唤醒全部wait的goroutine,实现也比较简单,就是直接循环wait队列,全部执行goready,更新notify的index为wait的index,表示全部wait的goroutine都被唤醒了。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Cond
  • 使用
  • 实现原理
    • 数据结构
      • Wait方法
        • Signal方法
          • Broadcast方法
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档