前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >我怎么从来没见过 sync.Cond

我怎么从来没见过 sync.Cond

作者头像
LinkinStar
发布2022-09-01 15:08:51
1850
发布2022-09-01 15:08:51
举报
文章被收录于专栏:LinkinStar's Blog

sync.Cond 作为 go 标准库提供的一个并发原语,但是可能你从来没听过,可见它使用场景挺少的,但是我们需要有这个知识储备,只有储备了之后才能在需要用的时候用出来。

其实如果你之前和我一样接触过 java,那么其实对于这个并发原语其实应该很熟悉,其实就是常说的等待通知机制,也就是 wait 方法和 notify 方法。

使用

我们首先从使用的角度的出发,先来看看 cond 是如何使用的

三个方法

首先我用最白话的方式描述一下 cond 的三个方法

  • Wait 当前调用者等待执行,直到被唤醒,调用该方法时需要加锁
  • Signal 唤醒一个调用者
  • Broadcast 唤醒所有调用者

一把锁一个队列

cond 初始化需要传入一个锁,用于并发控制,调用 wait 的时候需要加锁

cond 内部维护着一个队列,等待调用者排队等待

使用

我们创建两个 goroutine 使用 cond 等待执行任务,然后使用 signal 方法唤醒试试

代码语言:javascript
复制
package main

import (
   "fmt"
   "sync"
   "time"
)

func main() {
   cond := sync.NewCond(&sync.Mutex{})
   go func() {
      cond.L.Lock()
      fmt.Println("a is waiting...")
      cond.Wait()
      fmt.Println("a was awakened")
      cond.L.Unlock()
   }()
   go func() {
      cond.L.Lock()
      fmt.Println("b is waiting...")
      cond.Wait()
      fmt.Println("b was awakened")
      cond.L.Unlock()
   }()
   time.Sleep(time.Second)
   cond.Signal()
   time.Sleep(time.Second)
   cond.Signal()
   time.Sleep(time.Second)
}
代码语言:javascript
复制
output: 
a is waiting...
b is waiting...
a was awakened
b was awakened

当然你也可以使用功能 Broadcast 方法全部一次性唤醒,输出也是一样的。

这里埋一个伏笔,我们这里两个 goroutine 都 阻塞在了 wait 方法,都没有 unlock 这里的互斥锁,但是我们看到 waiting 都打印出来了,那为什么可以这样做呢?

这个使用的给你的感觉是什么?我第一次看到 cond 的时候就给我的感觉是 waitgroup 的反向操作。

我们知道 waitgroup 可以描述为将一个大任务拆分成多个小任务,每次拆成一个任务就 add 一次,每一次任务完成就 done 一次,然后有人 wait 直到所有的任务都完成。而 cond 是不是刚好反了一下,是一堆人在等着执行,等着被唤醒执行,但是好像又不太一样。

源码分析

在看源码之前还是带着几个问题去看:

  1. wait 之前为什么需要 lock?
  2. signal 次数大于当前等待对象数量会有问题吗?
  3. broadcast 之后还能继续 wait 吗?

结构

代码语言:javascript
复制
type Cond struct {
   noCopy noCopy

   // L is held while observing or changing the condition
   L Locker

   notify  notifyList
   checker copyChecker
}

type notifyList struct {
	// wait is the ticket number of the next waiter. It is atomically
	// incremented outside the lock.
	wait uint32

	// notify is the ticket number of the next waiter to be notified. It can
	// be read outside the lock, but is only written to with lock held.
	//
	// Both wait & notify can wrap around, and such cases will be correctly
	// handled as long as their "unwrapped" difference is bounded by 2^31.
	// For this not to be the case, we'd need to have 2^31+ goroutines
	// blocked on the same condvar, which is currently not possible.
	notify uint32

	// List of parked waiters.
	lock mutex
	head *sudog
	tail *sudog
}

可以看到结构非常简单,noCopy 和 checker 保证 cond 不能被 copy,否则会 panic,而且是个运行时检查。

剩下的就是一把锁一个队列了

方法

代码语言:javascript
复制
// NewCond returns a new Cond with Locker l.
func NewCond(l Locker) *Cond {
   return &Cond{L: l}
}

创建没啥好说的,就是传入一个锁赋值就可以了

代码语言:javascript
复制
func (c *Cond) Signal() {
   c.checker.check()
   runtime_notifyListNotifyOne(&c.notify)
}
代码语言:javascript
复制
func (c *Cond) Broadcast() {
   c.checker.check()
   runtime_notifyListNotifyAll(&c.notify)
}

Signal 和 Broadcast 都是 check 一下 cond 有没有被复制,然后就直接通过 sema 的 notify 方法将队列传入唤醒了

代码语言:javascript
复制
func (c *Cond) Wait() {
   c.checker.check()
   t := runtime_notifyListAdd(&c.notify)
   c.L.Unlock()
   runtime_notifyListWait(&c.notify, t)
   c.L.Lock()
}

wait 方法也是类似,不过这里需要注意的一点是,这里首先 unlock 了一次,然后再开始 wait,这也就是解释了之前那个伏笔,并且也引出了为什么 wait 之前必须 lock,因为不 lock 的话直接 unlock 肯定报错

runtime_notifyListWait

首先我们来看 runtime_notifyListAdd

代码语言:javascript
复制
// notifyListAdd adds the caller to a notify list such that it can receive
// notifications. The caller must eventually call notifyListWait to wait for
// such a notification, passing the returned ticket number.
//go:linkname notifyListAdd sync.runtime_notifyListAdd
func notifyListAdd(l *notifyList) uint32 {
   // This may be called concurrently, for example, when called from
   // sync.Cond.Wait while holding a RWMutex in read mode.
   return atomic.Xadd(&l.wait, 1) - 1
}

非常简单就是将 notifyList 的中的 wait + 1,并且这是一个原子操作

runtime_notifyListWait

然后来看 runtime_notifyListWait 这里的第二个参数 t 就是上一个 Xadd 之后 -1 返回的结果

代码语言:javascript
复制
// notifyListAdd was called, it returns immediately. Otherwise, it blocks.
//go:linkname notifyListWait sync.runtime_notifyListWait
func notifyListWait(l *notifyList, t uint32) {
   lockWithRank(&l.lock, lockRankNotifyList)

   // Return right away if this ticket has already been notified.
   if less(t, l.notify) {
      unlock(&l.lock)
      return
   }

   // Enqueue itself.
   s := acquireSudog()
   s.g = getg()
   s.ticket = t
   s.releasetime = 0
   t0 := int64(0)
   if blockprofilerate > 0 {
      t0 = cputicks()
      s.releasetime = -1
   }
   if l.tail == nil {
      l.head = s
   } else {
      l.tail.next = s
   }
   l.tail = s
   goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
   if t0 != 0 {
      blockevent(s.releasetime-t0, 2)
   }
   releaseSudog(s)
}

不难,说几个要点:

  • 如果当前传入的 t < notify 的话,证明已经被唤醒了,所以直接解锁返回
  • 获取一个 sudog 用于挂起
  • s.ticket = t 注意这里后面会用到,这里将 sudog 里面的 ticket 标记为当前队列长度
  • 当 tail 为 nil 证明是空队列,直接 head 赋值为 s;如果 tail 不为 nil 证明队列有元素直接链到队尾,并且将当前节点作为新的队尾
  • 然后 gopark 等着被唤醒就可以
runtime_notifyListNotifyOne
代码语言:javascript
复制
// notifyListNotifyOne notifies one entry in the list.
//go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne
func notifyListNotifyOne(l *notifyList) {
   // Fast-path: if there are no new waiters since the last notification
   // we don't need to acquire the lock at all.
   if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
      return
   }

   lockWithRank(&l.lock, lockRankNotifyList)

   // Re-check under the lock if we need to do anything.
   t := l.notify
   if t == atomic.Load(&l.wait) {
      unlock(&l.lock)
      return
   }

   // Update the next notify ticket number.
   atomic.Store(&l.notify, t+1)

   // Try to find the g that needs to be notified.
   // If it hasn't made it to the list yet we won't find it,
   // but it won't park itself once it sees the new notify number.
   //
   // This scan looks linear but essentially always stops quickly.
   // Because g's queue separately from taking numbers,
   // there may be minor reorderings in the list, but we
   // expect the g we're looking for to be near the front.
   // The g has others in front of it on the list only to the
   // extent that it lost the race, so the iteration will not
   // be too long. This applies even when the g is missing:
   // it hasn't yet gotten to sleep and has lost the race to
   // the (few) other g's that we find on the list.
   for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
      if s.ticket == t {
         n := s.next
         if p != nil {
            p.next = n
         } else {
            l.head = n
         }
         if n == nil {
            l.tail = p
         }
         unlock(&l.lock)
         s.next = nil
         readyWithTime(s, 4)
         return
      }
   }
   unlock(&l.lock)
}
  • wait 和 notify 数量一致就没有人等着了,直接返回
  • lock 之后 double check 一次,并发编程的常规操作了
  • notify 的数量在原有数量上+1,因为这次唤醒一个新的了
  • 只有当 ticket 为 t 的时候证明才是下一个需要被唤醒的 sudog (上面的注释解释了这里为什么使用循环,大多数情况下就是 head 就是需要被唤醒的 sudog 了)
  • 然后就是队列出队的基本操作了
  • 最后 readyWithTime 调用 goready 唤醒对应的 sudog 执行就可以了
runtime_notifyListNotifyAll
代码语言:javascript
复制
// notifyListNotifyAll notifies all entries in the list.
//go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll
func notifyListNotifyAll(l *notifyList) {
   // Fast-path: if there are no new waiters since the last notification
   // we don't need to acquire the lock.
   if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
      return
   }

   // Pull the list out into a local variable, waiters will be readied
   // outside the lock.
   lockWithRank(&l.lock, lockRankNotifyList)
   s := l.head
   l.head = nil
   l.tail = nil

   // Update the next ticket to be notified. We can set it to the current
   // value of wait because any previous waiters are already in the list
   // or will notice that they have already been notified when trying to
   // add themselves to the list.
   atomic.Store(&l.notify, atomic.Load(&l.wait))
   unlock(&l.lock)

   // Go through the local list and ready all waiters.
   for s != nil {
      next := s.next
      s.next = nil
      readyWithTime(s, 4)
      s = next
   }
}

看完 notify 方法然后再看 notifyAll 方法就很简单了,其实就是遍历了整个队列,对每一个 sudog 都 ready 一次就可以了

总结

总的来说 cond 的实现还是很容易理解的,并没有想的很复杂,只需要在使用的时候多加注意:wait 之前需要加锁。

和 java 比较起来,我记得一开始学的时候 notify 还是随机唤醒一个,然后后来根据不同的 jvm 有了不同的实现,hotspot 实现还是队列。

最后是使用,为什么我这么晚才写这个 cond 呢..其实拖延了很久了,因为在实际中没用过,就在最近在处理一个并发场景的时候偶发的用上了一下,就想着来补一下了。所以在实际中,可能你永远也用不到它,但是知道它,当个知识储备以防不时之需吧。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021-09-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 使用
    • 三个方法
      • 一把锁一个队列
        • 使用
        • 源码分析
          • 结构
            • 方法
              • runtime_notifyListWait
              • runtime_notifyListWait
              • runtime_notifyListNotifyOne
              • runtime_notifyListNotifyAll
          • 总结
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档