Go中semaphore功能与linux系统下futex的功能是一样的。它提供了sleep和wakeup原语,可以在同步原语中的竞争情况下使用。重点是Go中的semaphore结合了goroutine调度机制,当前的goroutine在获取不到锁时 ,将对其进行休眠,让出CPU的执行权,切换到其他goroutine运行。当条件满足时,有其他goroutine会唤醒阻塞休眠的goroutine,重新放入调度器中让它运行。
semaphore的实现参考了论文<<semaphores in plan 9>>,感兴趣的同学可以读读,下载地址见references. 这里对futex稍微做点介绍,futex是fast user space mutex的简称。在无竞争的情况下操作完全在user space进行,不需要系统调用,仅在发生竞争的时候进入内核去完成相应的处理。futex是一种user mode和kernel mode混合的同步机制,需要两种模式合作才能完成,futex变量必须位于user space。
例如下图中的户空间信号量Usem,由用户级信号量值u和内核值k组成。在没有竞争的情况下,完全可用在用户空间中运行,在有竞争时,回退到内核中处理竞争。
Go中semaphore实现的数据结构中,为了处理竞争的情况,用到了lock锁(lock mutex)。这里的锁并不是sync包中的Mutex, RWMutex之类的锁。而是更底层的锁,依赖于不同操作系统具体实现不同。在linux系统中,借助的就是前面介绍的futex。所以semaphore可以看做futex上层的一个封装。
Go中semaphore的实现用到了treap数据结构。treap=tree+heap, 所以它被称为树堆。treap兼具有二叉搜索树的性质和堆的性质。树中的每个节点有一个权重,根据这个权重调整为小根堆。这个权重值在代码中是一个随机数。所以这里的treap也就是A Randomized Binary Search Tree。为了方便理解代码。这里对Randomized Binary Search Tree做一些介绍。
下图就是一颗randomized binary search tree. 每个节点有一个值value和一个权值。7、5、6、1、3这些都是value。按value值构成了二叉搜索树。3、1、5、2、4是权值,按权值构成堆结构。即p(root)>p(child nodes)
下面是将value值4和权值6插入到树中的过程,先根据value找到待插入的叶子节点的位置。根据6->3->5,一路找到5的left节点。然后在根据权值6进行旋转调整,使之符合堆的性质。
semaphore实现在runtime包中的sema.go文件,整个逻辑有两个核心数据结构sematable和notifyList,以及围绕这些结构的函数实现。下面是数据结构依赖关系图和函数调用关系层次结构图。
semtable是信号量hash表,它是一个固定251个元素的数组,数组中每个元素是semaRoot类型,pad是用来填充一定的字节,使得CPU一次可以读取cacheline的长度。
const semTabSize = 251
// 信号量hash表
var semtable [semTabSize]struct {
root semaRoot
pad [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte
}
type semaRoot struct {
lock mutex
// 平衡树的树根
treap *sudog
// 保存有多少个goroutine阻塞在该semaRoot上
nwait uint32
}
semaRoot中有一个treap字段,它是一个*sudog类型,sudog类型定义如下,可以看到,它有多个指针域指向prev,next和parent. treap保存的是平衡树的根节点。以treap为根节点构成了一颗二叉查找树,同时它也是一个小根堆结构。所以它是一个树+堆的复合结构,这也是treap=tree+heap名字的由来。这颗树的具体结构如下,看懂这张图,阅读下面的代码就很轻松了。
sudog是运行时用来存放处于阻塞goroutine的一个上层抽象,是用来实现用户态信号量的主要机制之一。例如我们调用sync包中的Mutex加锁操作,当前的goroutine可能需要挂起,这时会将挂起的goroutine保存在sudog中,整个sudog对象构成一个平衡二叉搜索树。在goroutine恢复时能够快速定位到需要恢复的g,让其继续运行。
type sudog struct {
// 保存goroutine对象的指针
g *g
// isSelect是给channel操作用的,如果它为true,表示有g正在参与一个select
isSelect bool
// next和prev可以理解为树的右和左两个分支,sudog构成一个树形结构
next *sudog
prev *sudog
// 数据元素,treap树中根据elem的值构成二叉搜索树
elem unsafe.Pointer
// 获取时间,goroutine进行休眠前的时间
acquiretime int64
// 释放时间,goroutine被唤醒后的时间
releasetime int64
// 权重值,treap树根据ticket值构成一个小根堆结构
ticket uint32
// 指向父sudog节点
parent *sudog
// sudog列表,有相同elem值的g构成一个链表,它们之间通过waitlink串联起来
waitlink *sudog
// sudog列表中的最后一个元素,有字段,使得将goroutine插入链表尾的时间复杂度为O(1)
waittail *sudog
// 通道对象指针
c *hchan
}
notifyList结构和它相关的函数功能在条件变量Cond实现文章中已做了分析,本文不在对这部分进行介绍,感兴趣的同学可以看条件变量Cond实现。
对sematable来说,内部核心的方法是semacquire1和semrelease1。提供给外部使用的接口都是对这两个方法做的简单包装。下面提供给外部接口调用了semacquire1和semrelease1。
// 提供给sync包使用,例如在sync.WaithGroup用到了
func sync_runtime_Semacquire(addr *uint32) {
semacquire1(addr, false, semaBlockProfile, 0)
}
// 提供给网络轮询poll使用
func poll_runtime_Semacquire(addr *uint32) {
semacquire1(addr, false, semaBlockProfile, 0)
}
// 提供给sync包使用,例如在sync.Mutext,sync.RWMutex均有使用
func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) {
semrelease1(addr, handoff, skipframes)
}
// 也是提供给sync包使用,在sync.Mutex中
func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) {
semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes)
}
「semacquire1」
semacquire1操作就是信号量中的P操作,整个操作可以分为两种情况。一种是快速操作,另一种是慢操作。执行semacquire1操作,如果addr的值大于0,会将addr值减1,然后直接返回了,这种情况就是快速操作,信号量的值大于0,不用等待。如果*addr的值为0,则需要等待,这种就是慢操作,当前的goroutine会放入到treap中,让出调度权进入休眠状态。等待semrelease1操作唤醒之后,继续执行。
func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) {
// 获取当前运行的g
gp := getg()
// 判断当前的g是否是用户程序g,如果不是抛出异常
if gp != gp.m.curg {
throw("semacquire not on the G stack")
}
// 当addr指向的值大于0时,直接返回,否则继续下面的逻辑
if cansemacquire(addr) {
return
}
// 获取一个sudog对象s
s := acquireSudog()
// 根据addr的值(地址值)确定落在哪个hash桶中,返回桶中的semaRoot值
root := semroot(addr)
t0 := int64(0)
// 初始化获取和释放goroutine时的时间,这两个值用于性能分析用,这里我们可以不关心
s.releasetime = 0
s.acquiretime = 0
// ticket是权重值,sudog组成了一个treap结构,即tree+heap。从t.elem角度来看,
// 此结构是一个二叉搜索树。从ticket角度看,此结构是一个小根堆。
s.ticket = 0
// 根据条件设置releasetime和acquiretime,用于pprof性能分析
if profile&semaBlockProfile != 0 && blockprofilerate > 0 {
t0 = cputicks()
s.releasetime = -1
}
if profile&semaMutexProfile != 0 && mutexprofilerate > 0 {
if t0 == 0 {
t0 = cputicks()
}
s.acquiretime = t0
}
for {
lock(&root.lock)
atomic.Xadd(&root.nwait, 1)
// 加锁后再次检查*addr的值是否大于0,如果大于0,直接唤醒
if cansemacquire(addr) {
// 可以唤醒,前面nwait加了1,这里在减去1
atomic.Xadd(&root.nwait, -1)
unlock(&root.lock)
break
}
// 将s加入到treap中,传入的参数有3个,根据addr的值决定挂载树上的哪个节点
// s为要挂载的对象,lifo是后进先出的缩写(lat in first out)
root.queue(addr, s, lifo)
// 调用gopark将当前的goroutine休眠
goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes)
// 唤醒后检查,root.queue内部有修改s.ticket
if s.ticket != 0 || cansemacquire(addr) {
break
}
}
if s.releasetime > 0 {
blockevent(s.releasetime-t0, 3+skipframes)
}
// 将对象s归还到P中
releaseSudog(s)
}
queue会将当前goroutine入队,在上面的semacquire1满操作中,当前的goroutine被放入到一个sudog对象中,然后将这个sudog对象挂到treap中的合适位置。这里再介绍下treap结构,它是一个树形结构,按sudog中的elem值构成一颗二叉搜索树。按sudog中ticket的值,又构成一个小根堆。所以它即满足二叉搜索树的性质又满足小根堆的性质。相对于其他的平衡二叉搜索树,treap的特点是实现简单,并且能基本实现随机平衡的结构,它的插入、删除和查找的平均时间复杂度都为O(logN).
addr值会赋值给sudog对象s的elem。所以addr的值在treap中可能存在也可能不存在:
addr不存在加入对象s前
addr不存在加入对象s后
「queue」
func (root *semaRoot) queue(addr *uint32, s *sudog, lifo bool) {
// 设置s的g为当前的goroutine
s.g = getg()
// 将地址addr的值给s的elem
s.elem = unsafe.Pointer(addr)
// 清理干净next和prev
s.next = nil
s.prev = nil
var last *sudog
// pt为 **sudog类型,二次指针
pt := &root.treap
// 遍历treap,将s放进树中的合适位置
for t := *pt; t != nil; t = *pt {
// 地址已经在treap中存在,相同地址的sudog对象构成了一个链表
if t.elem == unsafe.Pointer(addr) {
// lifo为true,表示插入到链表的表头
if lifo {
// 将treap中原来是t的节点替换为新的节点s
*pt = s
// 将原来t的节点的值拷贝给s
s.ticket = t.ticket
s.acquiretime = t.acquiretime
// 将s和treap互相关联绑定
s.parent = t.parent
s.prev = t.prev
s.next = t.next
if s.prev != nil {
s.prev.parent = s
}
if s.next != nil {
s.next.parent = s
}
// 以s为开头的sudog对象构成了一个链表,执行s.waitlink=t
// 将s和原来的链表串起来
s.waitlink = t
// 设置s的指向的尾节点值
s.waittail = t.waittail
// 如果s.waittail为nil,说明t.waittail的值为nil
// 即之前链表中只有一个元素就是t
if s.waittail == nil {
// 设置s的尾节点为t,现在只要2个元素s和t
s.waittail = t
}
// 解除旧节点t与treap的关联
t.parent = nil
t.prev = nil
t.next = nil
t.waittail = nil
} else {
// else逻辑将节点s添加到链表尾部,处理来说比上面加入到头节点简单很多
// 如果t.waittail为nil,说明链表中只有一个元素就是t
if t.waittail == nil {
// t的下一个节点指向s
t.waitlink = s
} else {
// t的最后一个节点的下一个节点指向s
t.waittail.waitlink = s
}
// 更新t的尾节点为当前新加入的节点s
// NOTE:这里只是更新了链表头节点指向的尾部节点为s,那链表中间的节点的waittail值不更新,
// 将它指向s吗?需要更新,不过是渐进式的,在每次链表头结点出队dequeue的时候,会更新头
// 节点下一个节点,将它的waittail指向正确的位置
t.waittail = s
// 将s的waitlink指向空
s.waitlink = nil
}
// 已将s加入到treap中,直接返回
return
}
last = t
// treap是根据elem构成的二叉搜索树,所以如果addr比当前t.elem小
// 向左t.prev搜索,否则向右搜索
if uintptr(unsafe.Pointer(addr)) < uintptr(t.elem) {
pt = &t.prev
} else {
pt = &t.next
}
}
// 走到这里,说明addr的值在treap中不存在,所以需要将当前的节点s插入到treap中合适的位置
// last是一个叶子节点,将当前的节点s挂载到last的子节点上(可能是last.prev也可能是last.next的)
// 产生一个随机值赋值给s.ticket,下面要根据ticket进行顺序调整,构成一个random search tree
s.ticket = fastrand() | 1
// 设置s的parent为last
s.parent = last
*pt = s
// 根据节点的权重ticket值,调整节点的位置
for s.parent != nil && s.parent.ticket > s.ticket {
if s.parent.prev == s {
// 右旋转
root.rotateRight(s.parent)
} else {
if s.parent.next != s {
panic("semaRoot queue")
}
// 左旋转
root.rotateLeft(s.parent)
}
}
}
对于ticket调整,分为左旋(root.rotateLeft)和右旋(root.rotateRight),旋转方法如下图:
右旋就是将当前root节点的prev节点调整为新的root节点,并将之前的root节点作为新root节点的next节点,将新root节点的next节点作为之前root节点的prev节点。左旋与上面的顺序相反,可以结合上图和选择代码理解。
这样每一次旋转调整,可以将一个节点顺序调整正确,整个调整放在大循环中,这样不断的进行调整,直到所有的子节点都满足小根堆的性质结束。
semrelease1功能与semacquire1相反,它就是信号量中的V操作。处理步骤如下:
这里在说下semrelease1函数入参handoff的意义,执行readyWithTime完之后,休眠的goroutine已经被唤醒,放入到了P的本地运行队列中的runnext中,如果设置handoff为true,则会执行goyield,让调度程序立即运行runnext中的g. runnext中的g运行会继承当前的时间片。handoff会在sync.Mutex中饥饿状态下会设置为true,避免高度竞争的信号量过度占用P.
「semrelease1」
func semrelease1(addr *uint32, handoff bool, skipframes int) {
// 更加addr的值定位hash表中桶的位置,并获得treap的根节点root
root := semroot(addr)
// 将 *addr的值加1,相当于V操作,在semacquire1中有减1,这里反过来加1
atomic.Xadd(addr, 1)
// nwait表示阻塞休眠的goroutine的数量,如果为0,说明没有goroutine休眠,
// 也就不用进行下面的唤醒逻辑
if atomic.Load(&root.nwait) == 0 {
return
}
lock(&root.lock)
// 这里加锁后再次检查nwait,如果为0,处理同上
if atomic.Load(&root.nwait) == 0 {
unlock(&root.lock)
return
}
// 找到与addr关联的sudog对象s,将其从treap中移除
s, t0 := root.dequeue(addr)
if s != nil {
// 休眠的goroutine数量减1
atomic.Xadd(&root.nwait, -1)
}
unlock(&root.lock)
// 如果s不等于nil,需要将s中绑定的g唤醒
if s != nil {
acquiretime := s.acquiretime
if acquiretime != 0 {
// 调试诊断
mutexevent(t0-acquiretime, 3+skipframes)
}
if s.ticket != 0 {
throw("corrupted semaphore ticket")
}
// handoff为true表示需要剥离当前goroutine运行,立即运行P中runnext中的g
if handoff && cansemacquire(addr) {
s.ticket = 1
}
// 调用goready将休眠的goroutine唤醒
readyWithTime(s, 5+skipframes)
if s.ticket == 1 && getg().m.locks == 0 {
// 前面执行readyWithTime,已经将休眠的goroutine放入到P的runnext中了,这里调用
// goyield让调度程序立即运行runnext中的g. runnext中的g运行会继承当前的时间片。
// handoff会在sync.Mutex中饥饿状态下会设置为true,避免高度竞争的信号量过度占用P.
goyield()
}
}
}
dequeue操作将一个sudog对象从队列中移除,操作步骤如下:
1. 找到与addr相同的sudog对象
2. 对于1中的寻找结构,有3种情况,不同情况执行不同逻辑 2.1 查找的sudog对象不存在,直接返回 2.2 查找的sudog对象存在,并且只有一个sudog的elem值等于addr,这时将此sudog对象出队,可能需要进行选择旋转调整 2.3 查找的sudog对象存在,并且elem值与addr相同的sudog对象不止一个,这些相同的elem组成一个链表结构,将链表头节点出队。
「dequeue」
func (root *semaRoot) dequeue(addr *uint32) (found *sudog, now int64) {
// ps为二级指针,指向根*sudog
ps := &root.treap
// s指向根sudog对象
s := *ps
// 遍历treap树,查找是否有元素的elem值与addr相对
for ; s != nil; s = *ps {
// 找到了值相等的节点
if s.elem == unsafe.Pointer(addr) {
goto Found
}
// 二叉搜索,当前的值大于addr,向左查找,否则向右查找
if uintptr(unsafe.Pointer(addr)) < uintptr(s.elem) {
ps = &s.prev
} else {
ps = &s.next
}
}
// 走到这里,表示没有找到,返回nil
return nil, 0
Found:
// 找到了值相同的节点s
now = int64(0)
if s.acquiretime != 0 {
now = cputicks()
}
// 多个值相同的sudog对象会组成一个链表结构,这里判断以s开头的链接元素的个数
// s.waitlink不为nil,表示链表中有多个元素,s会从treap中移除,将链表中第二个节点
// 移动到s的位置
if t := s.waitlink; t != nil {
// 将原来s节点替换为t节点(链表的第二个节点)
*ps = t
// 将节点s的ticket值赋值给t
t.ticket = s.ticket
// 建立节点t与它父节点关系
t.parent = s.parent
// t的prev指向s.prev
t.prev = s.prev
if t.prev != nil {
// 更新t.prev的父节点为当前的t节点
t.prev.parent = t
}
// t的next指向s.next
t.next = s.next
if t.next != nil {
// 更新t.next的父节点为当前的t节点
t.next.parent = t
}
// 更新t节点指向链表的尾节点,这里采用了惰性更新,在入队enqueue的时候
// 链表中间的节点的waittail还没有设置指向链表尾节点
if t.waitlink != nil {
t.waittail = s.waittail
} else {
// 如果除了t链表中没有其他元素了,设置t.waittail为nil
t.waittail = nil
}
// 更新节点t的acquiretime值为当前时间
t.acquiretime = now
// 清理掉要移除的节点s的waitlink和waittail值
s.waitlink = nil
s.waittail = nil
} else {
// treap中只有一个节点的elem值与addr相等,则移除s,需要调整以s为根节点的treap
// 保持random search tree性质,通过左旋或右旋来实现调整
for s.next != nil || s.prev != nil {
if s.next == nil || s.prev != nil && s.prev.ticket < s.next.ticket {
// 右旋
root.rotateRight(s)
} else {
// 左旋
root.rotateLeft(s)
}
}
// 将节点s从treap树中移除掉
if s.parent != nil {
if s.parent.prev == s {
s.parent.prev = nil
} else {
s.parent.next = nil
}
} else {
// 移除的s节点是treap的根节点,需要特殊处理
root.treap = nil
}
}
// 清理节点s的字段值
s.parent = nil
s.elem = nil
s.next = nil
s.prev = nil
s.ticket = 0
return s, now
}
「sudog缓存」
acquireSudog优先从本地P的sudogcache切片中摘取最后一个sudog对象返回,如果本地的sudogcache是空的,则尝试从全局中央缓存sched的sudogcache中获取一批sudog对象加入到本地缓存中。获取的数量为直到全局切片为空或者达到pp切片容量的一半为止。如果中央缓存也是空的,这时会new一个sudog对象加入到本地缓存中。
本地缓存sudogcache(pp.sudogcache)的大小是多少呢?是128个。p的sudogcache是通过sudogbuf赋值的。如下,sudogbuf是大小为128的数组,所以sudogcache的容量为128.
type p struct {
...
sudogcache []*sudog
sudogbuf [128]*sudog
...
}
func acquireSudog() *sudog {
// Delicate dance: 信号量的实现调用acquireSudog,然后acquireSudog调用new(sudog)
// new调用malloc, malloc调用垃圾收集器,垃圾收集器在stopTheWorld调用信号量
// 通过在new(sudog)周围执行acquirem/releasem来打破循环
// acquirem/releasem在new(sudog)期间增加m.locks,防止垃圾收集器被调用
// 获取当前运行的工作线程M
mp := acquirem()
// 获取M绑定的P(pp)
pp := mp.p.ptr()
// 检查pp的sudogcache 切片是否为空
if len(pp.sudogcache) == 0 {
lock(&sched.sudoglock)
// First, try to grab a batch from central cache.
// 从全局sched的sudogcache切片中获取sudog加入到pp中,直到全局切片为空或者达到
// pp切片容量的一半为止
for len(pp.sudogcache) < cap(pp.sudogcache)/2 && sched.sudogcache != nil {
s := sched.sudogcache
sched.sudogcache = s.next
s.next = nil
// 将从全局shed中获取的sudog对象加入到本地pp的sudogcache中
pp.sudogcache = append(pp.sudogcache, s)
}
unlock(&sched.sudoglock)
// 走到这里还是空的,说明全局缓存中也没有sudog对象,这时new一个新的
if len(pp.sudogcache) == 0 {
pp.sudogcache = append(pp.sudogcache, new(sudog))
}
}
// 将pp.sudogcache切片中最后一个sudog对象摘取出去返回
n := len(pp.sudogcache)
s := pp.sudogcache[n-1]
pp.sudogcache[n-1] = nil
pp.sudogcache = pp.sudogcache[:n-1]
if s.elem != nil {
throw("acquireSudog: found s.elem != nil in cache")
}
releasem(mp)
return s
}
releaseSudog将s放入到本地缓存中,如果放入时本地缓存已经满(128个)了,则会将本地缓存中后一半的sudog对象取出放入到全局缓存的链表头部。
func releaseSudog(s *sudog) {
... 异常检查
// 获取当前的工作线程M
mp := acquirem() // avoid rescheduling to another P
// 获取与M绑定的P
pp := mp.p.ptr()
// 如果本地缓存切片满了,则会将本地缓存后一半的sudog对象转移到全局缓存中
if len(pp.sudogcache) == cap(pp.sudogcache) {
// Transfer half of local cache to the central cache.
var first, last *sudog
// 先将本地缓存中后一半的sudog串成一个链表
for len(pp.sudogcache) > cap(pp.sudogcache)/2 {
n := len(pp.sudogcache)
p := pp.sudogcache[n-1]
pp.sudogcache[n-1] = nil
pp.sudogcache = pp.sudogcache[:n-1]
if first == nil {
first = p
} else {
last.next = p
}
last = p
}
// 直接将上面的链表加入到全局缓存的头部,这里一次性加入,锁的粒度小
lock(&sched.sudoglock)
last.next = sched.sudogcache
sched.sudogcache = first
unlock(&sched.sudoglock)
}
// 将s加入到本地缓存中
pp.sudogcache = append(pp.sudogcache, s)
releasem(mp)
}
Semaphores in Plan 9[1]Futex设计与实现[2]A futex overview and update[3]Randomized Search Tree[4]
[1]
Semaphores in Plan 9: https://swtch.com/semaphore.pdf
[2]
Futex设计与实现: https://www.jianshu.com/p/d17a6152740c
[3]
A futex overview and update: https://lwn.net/Articles/360699/
[4]
Randomized Search Tree: http://www.ist.tugraz.at/_attach/Publish/Eaa19/Chapter_05_RandomizedSearchTree_handout.pdf