专栏首页码农桃花源semaphore 的原理与实现

semaphore 的原理与实现

Semaphore

数据结构

// Go 语言中暴露的 semaphore 实现
// 具体的用法是提供 sleep 和 wakeup 原语
// 以使其能够在其它同步原语中的竞争情况下使用
// 因此这里的 semaphore 和 Linux 中的 futex 目标是一致的
// 只不过语义上更简单一些
//
// 也就是说,不要认为这些是信号量
// 把这里的东西看作 sleep 和 wakeup 实现的一种方式
// 每一个 sleep 都会和一个 wakeup 配对
// 即使在发生 race 时,wakeup 在 sleep 之前时也是如此
//
// See Mullender and Cox, ``Semaphores in Plan 9,''
// http://swtch.com/semaphore.pdf

// 为 sync.Mutex 准备的异步信号量

// semaRoot 持有一棵 地址各不相同的 sudog(s.elem) 的平衡树
// 每一个 sudog 都反过来指向(通过 s.waitlink)一个在同一个地址上等待的其它 sudog 们
// 同一地址的 sudog 的内部列表上的操作时间复杂度都是 O(1)。顶层 semaRoot 列表的扫描
// 的时间复杂度是 O(log n),n 是被哈希到同一个 semaRoot 的不同地址的总数,每一个地址上都会有一些 goroutine 被阻塞。
// 访问 golang.org/issue/17953 来查看一个在引入二级列表之前性能较差的程序样例,test/locklinear.go
// 中有一个复现这个样例的测试
type semaRoot struct {
    lock  mutex
    treap *sudog // root of balanced tree of unique waiters.
    nwait uint32 // Number of waiters. Read w/o the lock.
}

// Prime to not correlate with any user patterns.
const semTabSize = 251

var semtable [semTabSize]struct {
    root semaRoot
    pad  [sys.CacheLineSize - unsafe.Sizeof(semaRoot{})]byte
}

func semroot(addr *uint32) *semaRoot {
    return &semtable[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root
}
┌─────┬─────┬─────┬─────┬─────┬────────────────────────┬─────┐                 
│  0  │  1  │  2  │  3  │  4  │         .....          │ 250 │                 
└─────┴─────┴─────┴─────┴─────┴────────────────────────┴─────┘                 
   │                                                      │                    
   │                                                      │                    
   └──┐                                                   └─┐                  
      │                                                     │                  
      │                                                     │                  
      ▼                                                     ▼                  
 ┌─────────┐                                           ┌─────────┐             
 │ struct  │                                           │ struct  │             
 ├─────────┴─────────┐                                 ├─────────┴─────────┐   
 │   root semaRoot   │──┐                              │   root semaRoot   │──┐
 ├───────────────────┤  │                              ├───────────────────┤  │
 │        pad        │  │                              │        pad        │  │
 └───────────────────┘  │                              └───────────────────┘  │
                        │                                                     │
       ┌────────────────┘                                    ┌────────────────┘
       │                                                     │                 
       │                                                     │                 
       ▼                                                     ▼                 
 ┌──────────┐                                          ┌──────────┐            
 │ semaRoot │                                          │ semaRoot │            
 ├──────────┴────────┐                                 ├──────────┴────────┐   
 │    lock mutex     │                                 │    lock mutex     │   
 ├───────────────────┤                                 ├───────────────────┤   
 │   treap *sudog    │                                 │   treap *sudog    │   
 ├───────────────────┤                                 ├───────────────────┤   
 │   nwait uint32    │                                 │   nwait uint32    │   
 └───────────────────┘                                 └───────────────────┘   

treap 结构:

                                 ┌──────────┐                                    
                            ┌─┬─▶│  sudog   │                                    
                            │ │  ├──────────┴────────────┐                       
      ┌─────────────────────┼─┼──│      prev *sudog      │                       
      │                     │ │  ├───────────────────────┤                       
      │                     │ │  │      next *sudog      │────┐                  
      │                     │ │  ├───────────────────────┤    │                  
      │                     │ │  │     parent *sudog     │    │                  
      │                     │ │  ├───────────────────────┤    │                  
      │                     │ │  │  elem unsafe.Pointer  │    │                  
      │                     │ │  ├───────────────────────┤    │                  
      │                     │ │  │     ticket uint32     │    │                  
      │                     │ │  └───────────────────────┘    │                  
      │                     │ │                               │                  
      │                     │ │                               │                  
      │                     │ │                               │                  
      │                     │ │                               │                  
      │                     │ │                               │                  
      │                     │ │                               │                  
      ▼                     │ │                               ▼                  
┌──────────┐                │ │                         ┌──────────┐             
│  sudog   │                │ │                         │  sudog   │             
├──────────┴────────────┐   │ │                         ├──────────┴────────────┐
│      prev *sudog      │   │ │                         │      prev *sudog      │
├───────────────────────┤   │ │                         ├───────────────────────┤
│      next *sudog      │   │ │                         │      next *sudog      │
├───────────────────────┤   │ │                         ├───────────────────────┤
│     parent *sudog     │───┘ └─────────────────────────│     parent *sudog     │
├───────────────────────┤                               ├───────────────────────┤
│  elem unsafe.Pointer  │                               │  elem unsafe.Pointer  │
├───────────────────────┤                               ├───────────────────────┤
│     ticket uint32     │                               │     ticket uint32     │
└───────────────────────┘                               └───────────────────────┘

在这个 treap 结构里,从 elem 的视角(其实就是 lock 的 addr)来看,这个结构是个二叉搜索树。从 ticket 的角度来看,整个结构就是一个小顶堆。

所以才叫树堆(treap)。

相同 addr,即对同一个 mutex 上锁的 g,会阻塞在同一个地址上。这些阻塞在同一个地址上的 goroutine 会被打包成 sudog,组成一个链表。用 sudog 的 waitlink 相连:

┌──────────┐                         ┌──────────┐                          ┌──────────┐             
│  sudog   │                  ┌─────▶│  sudog   │                   ┌─────▶│  sudog   │             
├──────────┴────────────┐     │      ├──────────┴────────────┐      │      ├──────────┴────────────┐
│    waitlink *sudog    │─────┘      │    waitlink *sudog    │──────┘      │    waitlink *sudog    │
├───────────────────────┤            ├───────────────────────┤             ├───────────────────────┤
│    waittail *sudog    │            │    waittail *sudog    │             │    waittail *sudog    │
└───────────────────────┘            └───────────────────────┘             └───────────────────────┘

中间的元素的 waittail 都会指向最后一个元素:

┌──────────┐                                                                                           
│  sudog   │                                                                                           
├──────────┴────────────┐                                                                              
│    waitlink *sudog    │                                                                              
├───────────────────────┤                                                                              
│    waittail *sudog    │───────────────────────────────────────────────────────────┐                  
└───────────────────────┘                                                           │                  
                                  ┌──────────┐                                      │                  
                                  │  sudog   │                                      │                  
                                  ├──────────┴────────────┐                         │                  
                                  │    waitlink *sudog    │                         │                  
                                  ├───────────────────────┤                         │                  
                                  │    waittail *sudog    │─────────────────────────┤                  
                                  └───────────────────────┘                         ▼                  
                                                                              ┌──────────┐             
                                                                              │  sudog   │             
                                                                              ├──────────┴────────────┐
                                                                              │    waitlink *sudog    │
                                                                              ├───────────────────────┤
                                                                              │    waittail *sudog    │
                                                                              └───────────────────────┘

对外封装

在 sema.go 里实现的内容,用 go:linkname 导出给 sync、poll 库来使用,也是在链接期做了些手脚:

//go:linkname sync_runtime_Semacquire sync.runtime_Semacquire
func sync_runtime_Semacquire(addr *uint32) {
    semacquire1(addr, false, semaBlockProfile)
}

//go:linkname poll_runtime_Semacquire internal/poll.runtime_Semacquire
func poll_runtime_Semacquire(addr *uint32) {
    semacquire1(addr, false, semaBlockProfile)
}

//go:linkname sync_runtime_Semrelease sync.runtime_Semrelease
func sync_runtime_Semrelease(addr *uint32, handoff bool) {
    semrelease1(addr, handoff)
}

//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
func sync_runtime_SemacquireMutex(addr *uint32, lifo bool) {
    semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile)
}

//go:linkname poll_runtime_Semrelease internal/poll.runtime_Semrelease
func poll_runtime_Semrelease(addr *uint32) {
    semrelease(addr)
}

实现

sem 本身支持 acquire 和 release,其实就是 OS 里常说的 P 操作和 V 操作。

公共部分

func cansemacquire(addr *uint32) bool {
    for {
        v := atomic.Load(addr)
        if v == 0 {
            return false
        }
        if atomic.Cas(addr, v, v-1) {
            return true
        }
    }
}

acquire 过程

type semaProfileFlags int

const (
    semaBlockProfile semaProfileFlags = 1 << iota
    semaMutexProfile
)

// Called from runtime.
func semacquire(addr *uint32) {
    semacquire1(addr, false, 0)
}

func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags) {
    gp := getg()
    if gp != gp.m.curg {
        throw("semacquire not on the G stack")
    }

    // 低成本的情况
    if cansemacquire(addr) {
        return
    }

    // 高成本的情况:
    //    增加 waiter count 的值
    //    再尝试调用一次 cansemacquire,成本了就直接返回
    //    没成功就把自己作为一个 waiter 入队
    //    sleep
    //    (之后 waiter 的 descriptor 被 signaler 用 dequeue 踢出)
    s := acquireSudog()
    root := semroot(addr)
    t0 := int64(0)
    s.releasetime = 0
    s.acquiretime = 0
    s.ticket = 0

    for {
        lock(&root.lock)
        // 给 nwait 加一,这样后来的就不会在 semrelease 中进低成本的路径了
        atomic.Xadd(&root.nwait, 1)
        // 检查 cansemacquire 避免错过了唤醒
        if cansemacquire(addr) {
            atomic.Xadd(&root.nwait, -1)
            unlock(&root.lock)
            break
        }
        // 在 cansemacquire 之后的 semrelease 都可以知道我们正在等待
        // (上面设置了 nwait),所以会直接进入 sleep
        // 注: 这里说的 sleep 其实就是 goparkunlock
        root.queue(addr, s, lifo)
        goparkunlock(&root.lock, "semacquire", traceEvGoBlockSync, 4)
        if s.ticket != 0 || cansemacquire(addr) {
            break
        }
    }
    if s.releasetime > 0 {
        blockevent(s.releasetime-t0, 3)
    }
    releaseSudog(s)
}

release 过程

func semrelease(addr *uint32) {
    semrelease1(addr, false)
}

func semrelease1(addr *uint32, handoff bool) {
    root := semroot(addr)
    atomic.Xadd(addr, 1)

    // 低成本情况: 没有 waiter?
    // 这个 atomic 的检查必须发生在 xadd 之前,以避免错误唤醒
    // (具体参见 semacquire 中的循环)
    if atomic.Load(&root.nwait) == 0 {
        return
    }

    // 高成本情况: 搜索 waiter 并唤醒它
    lock(&root.lock)
    if atomic.Load(&root.nwait) == 0 {
        // count 值已经被另一个 goroutine 消费了
        // 所以我们不需要唤醒其它 goroutine 了
        unlock(&root.lock)
        return
    }
    s, t0 := root.dequeue(addr)
    if s != nil {
        atomic.Xadd(&root.nwait, -1)
    }
    unlock(&root.lock)
    if s != nil { // 可能会很慢,所以先解锁
        acquiretime := s.acquiretime
        if acquiretime != 0 {
            mutexevent(t0-acquiretime, 3)
        }
        if s.ticket != 0 {
            throw("corrupted semaphore ticket")
        }
        if handoff && cansemacquire(addr) {
            s.ticket = 1
        }
        readyWithTime(s, 5)
    }
}

func readyWithTime(s *sudog, traceskip int) {
    if s.releasetime != 0 {
        s.releasetime = cputicks()
    }
    goready(s.g, traceskip)
}

treap 结构

sudog 按照地址 hash 到 251 个 bucket 中的其中一个,每一个 bucket 都是一棵 treap。而相同 addr 上的 sudog 会形成一个链表。

为啥同一个地址的 sudog 不需要展开放在 treap 中呢?显然,sudog 唤醒的时候,block 在同一个 addr 上的 goroutine,说明都是加的同一把锁,这些 goroutine 被唤醒肯定是一起被唤醒的,相同地址的 g 并不需要查找才能找到,只要决定是先进队列的被唤醒(fifo)还是后进队列的被唤醒(lifo)就可以了。

// queue 函数会把 s 添加到 semaRoot 上阻塞的 goroutine 们中
// 实际上就是把 s 添加到其地址对应的 treap 上
func (root *semaRoot) queue(addr *uint32, s *sudog, lifo bool) {
    s.g = getg()
    s.elem = unsafe.Pointer(addr)
    s.next = nil
    s.prev = nil

    var last *sudog
    pt := &root.treap
    for t := *pt; t != nil; t = *pt {
        if t.elem == unsafe.Pointer(addr) {
            // Already have addr in list.
            if lifo {
                // treap 中在 t 的位置用 s 覆盖掉 t
                *pt = s
                s.ticket = t.ticket
                s.acquiretime = t.acquiretime
                s.parent = t.parent
                s.prev = t.prev
                s.next = t.next
                if s.prev != nil {
                    s.prev.parent = s
                }
                if s.next != nil {
                    s.next.parent = s
                }
                // 把 t 放在 s 的 wait list 的第一个位置
                s.waitlink = t
                s.waittail = t.waittail
                if s.waittail == nil {
                    s.waittail = t
                }
                t.parent = nil
                t.prev = nil
                t.next = nil
                t.waittail = nil
            } else {
                // 把 s 添加到 t 的等待列表的末尾
                if t.waittail == nil {
                    t.waitlink = s
                } else {
                    t.waittail.waitlink = s
                }
                t.waittail = s
                s.waitlink = nil
            }
            return
        }
        last = t
        if uintptr(unsafe.Pointer(addr)) < uintptr(t.elem) {
            pt = &t.prev
        } else {
            pt = &t.next
        }
    }

    // 把 s 作为树的新的叶子插入进去
    // 平衡树使用 ticket 作为堆的权重值,这个 ticket 是随机生成的
    // 也就是说,这个结构以元素地址来看的话,是一个二叉搜索树
    // 同时用 ticket 值使其同时又是一个小顶堆,满足
    // s.ticket <= both s.prev.ticket and s.next.ticket.
    // https://en.wikipedia.org/wiki/Treap
    // http://faculty.washington.edu/aragon/pubs/rst89.pdf
    //
    // s.ticket 会在一些地方和 0 相比,因此只设置最低位的 bit
    // 这样不会明显地影响 treap 的质量?
    s.ticket = fastrand() | 1
    s.parent = last
    *pt = s

    // 按照 ticket 来进行旋转,以满足 treap 的性质
    for s.parent != nil && s.parent.ticket > s.ticket {
        if s.parent.prev == s {
            root.rotateRight(s.parent)
        } else {
            if s.parent.next != s {
                panic("semaRoot queue")
            }
            root.rotateLeft(s.parent)
        }
    }
}

// dequeue 会搜索到阻塞在 addr 地址的 semaRoot 中的第一个 goroutine
// 如果这个 sudog 需要进行 profile,dequeue 会返回它被唤醒的时间(now),否则的话 now 为 0
func (root *semaRoot) dequeue(addr *uint32) (found *sudog, now int64) {
    ps := &root.treap
    s := *ps
    for ; s != nil; s = *ps {
        if s.elem == unsafe.Pointer(addr) {
            goto Found
        }
        if uintptr(unsafe.Pointer(addr)) < uintptr(s.elem) {
            ps = &s.prev
        } else {
            ps = &s.next
        }
    }
    return nil, 0

Found:
    now = int64(0)
    if s.acquiretime != 0 {
        now = cputicks()
    }
    if t := s.waitlink; t != nil {
        // 替换掉同样在 addr 上等待的 t。
        *ps = t
        t.ticket = s.ticket
        t.parent = s.parent
        t.prev = s.prev
        if t.prev != nil {
            t.prev.parent = t
        }
        t.next = s.next
        if t.next != nil {
            t.next.parent = t
        }
        if t.waitlink != nil {
            t.waittail = s.waittail
        } else {
            t.waittail = nil
        }
        t.acquiretime = now
        s.waitlink = nil
        s.waittail = nil
    } else {
        // 向下旋转 s 到叶节点,以进行删除,同时要考虑优先级
        for s.next != nil || s.prev != nil {
            if s.next == nil || s.prev != nil && s.prev.ticket < s.next.ticket {
                root.rotateRight(s)
            } else {
                root.rotateLeft(s)
            }
        }
        // Remove s, now a leaf.
        // 删除 s,现在是叶子节点了
        if s.parent != nil {
            if s.parent.prev == s {
                s.parent.prev = nil
            } else {
                s.parent.next = nil
            }
        } else {
            root.treap = nil
        }
    }
    s.parent = nil
    s.elem = nil
    s.next = nil
    s.prev = nil
    s.ticket = 0
    return s, now
}
文章分享自微信公众号:
码农桃花源

本文参与 腾讯云自媒体分享计划 ,欢迎热爱写作的你一起参与!

作者:曹春晖
原始发表时间:2021-05-14
如有侵权,请联系 cloudcommunity@tencent.com 删除。
登录 后参与评论
0 条评论

相关文章

  • 基于AQS实现的简单的Semaphore

    张申傲
  • 面试官:说说CountDownLatch,CyclicBarrier,Semaphore的原理?

    CountDownLatch适用于在多线程的场景需要等待所有子线程全部执行完毕之后再做操作的场景。

    艾小仙
  • pprof 的原理与实现

    go 内置的 pprof API 在 runtime/pprof 包内, 它提供给了用户与 runtime 交互的能力, 让我们能够在应用运行的过程中分析当前应...

    梦醒人间
  • YOLO算法的原理与实现

    作者:叶 虎 编辑:祝鑫泉 前言 1 当我们谈起计算机视觉时,首先想到的就是图像分类,没错,图像分类是计算机视觉最基本的任务之一,但是在图像分类的基础上,...

    机器学习算法工程师
  • zip 的压缩原理与实现

    http://www.blueidea.com/bbs/newsdetail.asp?id=1819267&page=2&posts=&Daysprune=5&...

    Java架构师必看
  • LightGBM原理与实现

    不久前微软DMTK(分布式机器学习工具包)团队在GitHub上开源了性能超越其他boosting工具的LightGBM,在三天之内GitHub上被star了10...

    大数据技术与机器学习
  • XGBoost原理与实现

    XGBoost是陈天奇等人开发的一个开源机器学习项目,高效地实现了GBDT算法并进行了算法和工程上的许多改进,被广泛应用在Kaggle竞赛及其他许多机器学习竞赛...

    大数据技术与机器学习
  • vmalloc原理与实现

    在 Linux 系统中的每个进程都有独立 4GB 内存空间,而 Linux 把这 4GB 内存空间划分为用户内存空间(0 ~ 3GB)和内核内存空间(3GB ~...

    用户7686797
  • Dropout原理与实现

      Dropout是深度学习中的一种防止过拟合手段,在面试中也经常会被问到,因此有必要搞懂其原理。

    用户1432189
  • Redux 原理与实现

    Redux 和 React 之间并没有什么关系,脱离了 React,Redux 也可以与其它的 js 库(甚至是原生 js)搭配使用,Redux 只是一个状态管...

    多云转晴
  • SVM原理与实现

    支持向量机(Support Vector Machine,SVM)是众多监督学习方法中十分出色的一种,几乎所有讲述经典机器学习方法的教材都会介绍。关于SVM,流...

    大数据技术与机器学习
  • Java多线程并发控制工具信号量Semaphore,实现原理及案例

    信号量(Semaphore)是Java多线程兵法中的一种JDK内置同步器,通过它可以实现多线程对公共资源的并发访问控制。一个线程在进入公共资源时需要先获取一个许...

    码农架构
  • Java多线程并发控制工具信号量Semaphore,实现原理及案例

    信号量(Semaphore)是Java多线程兵法中的一种JDK内置同步器,通过它可以实现多线程对公共资源的并发访问控制。一个线程在进入公共资源时需要先获取一个许...

    码农架构
  • vue跨域实现与原理vue跨域实现与原理

    假如上线后前端在A(192.168.0.1:8080)服务器,后端在B(192.168.0.2:80801)服务器,this.$http.get("/api/g...

    小胖
  • 单点登录原理与简单实现(单点登录原理与简单实现)

    单点登录(Single Sign On),简称为 SSO,是目前比较流行的企业业务整合的解决方案之一。SSO的定义是在多个应用系统中,用户只需要登录一次就可以访...

    全栈程序员站长
  • LVS原理与实现 - 实现篇

    在上一篇文章中,我们主要介绍了 LVS 的原理,接下来我们将会介绍 LVS 的代码实现。

    用户7686797
  • 【分布式锁】07-Zookeeper实现分布式锁:Semaphore、读写锁实现原理

    前面已经讲解了Zookeeper可重入锁的实现原理,自己对分布式锁也有了更深的认知。

    一枝花算不算浪漫

扫码关注腾讯云开发者

领取腾讯云代金券