专栏首页go语言核心编程技术Go语言调度器之盗取goroutine(17)

Go语言调度器之盗取goroutine(17)

本文是《Go语言调度器源代码情景分析》系列的第17篇,也是第三章《Goroutine调度策略》的第2小节。


上一小节我们分析了从全局运行队列与工作线程的本地运行队列获取goroutine的过程,这一小节我们继续分析因无法从上述两个队列中拿到需要运行的goroutine而导致的从其它工作线程的本地运行队列中盗取goroutine的过程。

findrunnable() 函数负责处理与盗取相关的逻辑,该函数代码很繁杂,因为它还做了与gc和netpoll等相关的事情,为了不影响我们的分析思路,这里我们仍然把不相关的代码删掉了,不过代码还是比较多,但总结起来就一句话:尽力去各个运行队列中寻找goroutine,如果实在找不到则进入睡眠状态。下面是代码细节:

runtime/proc.go : 2176

// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from global queue, poll network.
func findrunnable() (gp *g, inheritTime bool) {
    _g_ := getg()

    // The conditions here and in handoffp must agree: if
    // findrunnable would return a G to run, handoffp must start
    // an M.

top:
    _p_ := _g_.m.p.ptr()

     ......

    // local runq
     //再次看一下本地运行队列是否有需要运行的goroutine
    if gp, inheritTime := runqget(_p_); gp != nil {
        return gp, inheritTime
    }

    // global runq
     //再看看全局运行队列是否有需要运行的goroutine
    if sched.runqsize != 0 {
        lock(&sched.lock)
        gp := globrunqget(_p_, 0)
        unlock(&sched.lock)
        if gp != nil {
            return gp, false
        }
    }

     ......

    // Steal work from other P's.
     //如果除了当前工作线程还在运行外,其它工作线程已经处于休眠中,那么也就不用去偷了,肯定没有
    procs := uint32(gomaxprocs)
    if atomic.Load(&sched.npidle) == procs-1 {
        // Either GOMAXPROCS=1 or everybody, except for us, is idle already.
        // New work can appear from returning syscall/cgocall, network or timers.
        // Neither of that submits to local run queues, so no point in stealing.
        goto stop
    }
    // If number of spinning M's >= number of busy P's, block.
    // This is necessary to prevent excessive CPU consumption
    // when GOMAXPROCS>>1 but the program parallelism is low.
     // 这个判断主要是为了防止因为寻找可运行的goroutine而消耗太多的CPU。
     // 因为已经有足够多的工作线程正在寻找可运行的goroutine,让他们去找就好了,自己偷个懒去睡觉
    if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {
        goto stop
    }
    if !_g_.m.spinning {
         //设置m的状态为spinning
        _g_.m.spinning = true
         //处于spinning状态的m数量加一
        atomic.Xadd(&sched.nmspinning, 1)
    }
    
     //从其它p的本地运行队列盗取goroutine
    for i := 0; i < 4; i++ {
        for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
            if sched.gcwaiting != 0 {
                goto top
            }
            stealRunNextG := i > 2 // first look for ready queues with more than 1 g
            if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil {
                return gp, false
            }
        }
    }

stop:

    ......

    // Before we drop our P, make a snapshot of the allp slice,
    // which can change underfoot once we no longer block
    // safe-points. We don't need to snapshot the contents because
    // everything up to cap(allp) is immutable.
    allpSnapshot := allp

    // return P and block
    lock(&sched.lock)
  
    ......
  
    if sched.runqsize != 0 {
        gp := globrunqget(_p_, 0)
        unlock(&sched.lock)
        return gp, false
    }
    
     // 当前工作线程解除与p之间的绑定,准备去休眠
    if releasep() != _p_ {
        throw("findrunnable: wrong p")
    }
     //把p放入空闲队列
    pidleput(_p_)
    unlock(&sched.lock)

// Delicate dance: thread transitions from spinning to non-spinning state,
// potentially concurrently with submission of new goroutines. We must
// drop nmspinning first and then check all per-P queues again (with
// #StoreLoad memory barrier in between). If we do it the other way around,
// another thread can submit a goroutine after we've checked all run queues
// but before we drop nmspinning; as the result nobody will unpark a thread
// to run the goroutine.
// If we discover new work below, we need to restore m.spinning as a signal
// for resetspinning to unpark a new worker thread (because there can be more
// than one starving goroutine). However, if after discovering new work
// we also observe no idle Ps, it is OK to just park the current thread:
// the system is fully loaded so no spinning threads are required.
// Also see "Worker thread parking/unparking" comment at the top of the file.
    wasSpinning := _g_.m.spinning
    if _g_.m.spinning {
         //m即将睡眠,状态不再是spinning
        _g_.m.spinning = false
        if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
            throw("findrunnable: negative nmspinning")
        }
    }

    // check all runqueues once again
     // 休眠之前再看一下是否有工作要做
    for _, _p_ := range allpSnapshot {
        if !runqempty(_p_) {
            lock(&sched.lock)
            _p_ = pidleget()
            unlock(&sched.lock)
            if _p_ != nil {
                acquirep(_p_)
                if wasSpinning {
                    _g_.m.spinning = true
                    atomic.Xadd(&sched.nmspinning, 1)
                }
                goto top
            }
            break
        }
    }

    ......
     //休眠
    stopm()
    goto top
}

从上面的代码可以看到,工作线程在放弃寻找可运行的goroutine而进入睡眠之前,会反复尝试从各个运行队列寻找需要运行的goroutine,可谓是尽心尽力了。这个函数需要重点注意以下两点:

第一点,工作线程M的自旋状态(spinning)工作线程在从其它工作线程的本地运行队列中盗取goroutine时的状态称为自旋状态。从上面代码可以看到,当前M在去其它p的运行队列盗取goroutine之前把spinning标志设置成了true,同时增加处于自旋状态的M的数量,而盗取结束之后则把spinning标志还原为false,同时减少处于自旋状态的M的数量,从后面的分析我们可以看到,当有空闲P又有goroutine需要运行的时候,这个处于自旋状态的M的数量决定了是否需要唤醒或者创建新的工作线程。

第二点,盗取算法。盗取过程用了两个嵌套for循环。内层循环实现了盗取逻辑,从代码可以看出盗取的实质就是遍历allp中的所有p,查看其运行队列是否有goroutine,如果有,则取其一半到当前工作线程的运行队列,然后从findrunnable返回,如果没有则继续遍历下一个p。但这里为了保证公平性,遍历allp时并不是固定的从allp[0]即第一个p开始,而是从随机位置上的p开始,而且遍历的顺序也随机化了,并不是现在访问了第i个p下一次就访问第i+1个p,而是使用了一种伪随机的方式遍历allp中的每个p,防止每次遍历时使用同样的顺序访问allp中的元素。下面是这个算法的伪代码:

offset := uint32(random()) % nprocs
coprime := 随机选取一个小于nprocs且与nprocs互质的数
for i := 0; i < nprocs; i++ {
     p := allp[offset]
     从p的运行队列偷取goroutine
     if 偷取成功 {
         break
     }
     offset += coprime
     offset = offset % nprocs
}

下面举例说明一下上述算法过程,现假设nprocs为8,也就是一共有8个p。

如果第一次随机选择的offset = 6,coprime = 3(3与8互质,满足算法要求)的话,则从allp切片中偷取的下标顺序为6, 1, 4, 7, 2, 5, 0, 3,计算过程:

6,(6+3)%8=1,(1+3)%8=4, (4+3)%8=7, (7+3)%8=2, (2+3)%8=5, (5+3)%8=0, (0+3)%8=3

如果第二次随机选择的offset = 4,coprime = 5的话,则从allp切片中偷取的下标顺序为1, 6, 3, 0, 5, 2, 7, 4,计算过程:

1,(1+5)%8=6,(6+5)%8=3, (3+5)%8=0, (0+5)%8=5, (5+5)%8=2, (2+5)%8=7, (7+5)%8=4

可以看到只要随机数不一样,偷取p的顺序也不一样,但可以保证经过8次循环,每个p都会被访问到。可以用数论知识证明,不管nprocs是多少,这个算法都可以保证经过nprocs次循环,每个p都可以得到访问。

挑选出盗取的对象p之后,则调用runqsteal盗取p的运行队列中的goroutine,runqsteal函数再调用runqgrap从p的队列中批量拿出多个goroutine,这两个函数本身比较简单,但runqgrab有一个小细节需要注意一下,见下面代码:

runtime/proc.go : 4854

// Grabs a batch of goroutines from _p_'s runnable queue into batch.
// Batch is a ring buffer starting at batchHead.
// Returns number of grabbed goroutines.
// Can be executed by any P.
func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 {
    for {
        h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
        t := atomic.LoadAcq(&_p_.runqtail) // load-acquire, synchronize with the producer
        n := t - h        //计算队列中有多少个goroutine
        n = n - n/2     //取队列中goroutine个数的一半
        if n == 0 {
            ......
            return ......
        }
         //小细节:按理说队列中的goroutine个数最多就是len(_p_.runq),
         //所以n的最大值也就是len(_p_.runq)/2,那为什么需要这个判断呢?
        if n > uint32(len(_p_.runq)/2) { // read inconsistent h and t
            continue
        }
        ......
    }
}

代码中n的计算很简单,从计算过程来看n应该是runq队列中goroutine数量的一半,它的最大值不会超过队列容量的一半,但为什么这里的代码却偏偏要去判断n是否大于队列容量的一半呢?这里关键点在于读取runqhead和runqtail是两个操作而非一个原子操作,当我们读取runqhead之后但还未读取runqtail之前,如果有其它线程快速的在增加(这是完全有可能的,其它偷取者从队列中偷取goroutine会增加runqhead,而队列的所有者往队列中添加goroutine会增加runqtail)这两个值,则会导致我们读取出来的runqtail已经远远大于我们之前读取出来放在局部变量h里面的runqhead了,也就是代码注释中所说的h和t已经不一致了,所以这里需要这个if判断来检测异常情况。

工作线程进入睡眠

分析完盗取过程,我们继续回到findrunnable函数。

如果工作线程经过多次努力一直找不到需要运行的goroutine则调用stopm进入睡眠状态,等待被其它工作线程唤醒。

runtime/proc.go : 1918

// Stops execution of the current m until new work is available.
// Returns with acquired P.
func stopm() {
     _g_ := getg()

     if _g_.m.locks != 0 {
         throw("stopm holding locks")
     }
     if _g_.m.p != 0 {
         throw("stopm holding p")
     }
     if _g_.m.spinning {
         throw("stopm spinning")
     }

     lock(&sched.lock)
     mput(_g_.m)   //把m结构体对象放入sched.midle空闲队列
     unlock(&sched.lock)
     notesleep(&_g_.m.park)  //进入睡眠状态
  
     //被其它工作线程唤醒
     noteclear(&_g_.m.park)
     acquirep(_g_.m.nextp.ptr())
     _g_.m.nextp = 0
}

stopm的核心是调用mput把m结构体对象放入sched的midle空闲队列,然后通过notesleep(&m.park)函数让自己进入睡眠状态

note是go runtime实现的一次性睡眠和唤醒机制,一个线程可以通过调用notesleep(*note)进入睡眠状态,而另外一个线程则可以通过notewakeup(*note)把其唤醒。note的底层实现机制跟操作系统相关,不同系统使用不同的机制,比如linux下使用的futex系统调用,而mac下则是使用的pthread_cond_t条件变量,note对这些底层机制做了一个抽象和封装,这种封装给扩展性带来了很大的好处,比如当睡眠和唤醒功能需要支持新平台时,只需要在note层增加对特定平台的支持即可,不需要修改上层的任何代码。

回到stopm,当从notesleep函数返回后,需要再次绑定一个p,然后返回到findrunnable函数继续重新寻找可运行的goroutine,一旦找到可运行的goroutine就会返回到schedule函数,并把找到的goroutine调度起来运行,如何把goroutine调度起来运行的代码我们已经分析过了。现在继续看notesleep函数。

runtime/lock_futex.go : 139

func notesleep(n *note) {
    gp := getg()
    if gp != gp.m.g0 {
        throw("notesleep not on g0")
    }
    ns := int64(-1)  //超时时间设置为-1,表示无限期等待
    if *cgo_yield != nil {
        // Sleep for an arbitrary-but-moderate interval to poll libc interceptors.
        ns = 10e6
    }
  
     //使用循环,保证不是意外被唤醒
    for atomic.Load(key32(&n.key)) == 0 {
        gp.m.blocked = true
        futexsleep(key32(&n.key), 0, ns)
        if *cgo_yield != nil {
            asmcgocall(*cgo_yield, nil)
        }
        gp.m.blocked = false
    }
}

notesleep函数调用futexsleep进入睡眠,这里之所以需要用一个循环,是因为futexsleep有可能意外从睡眠中返回,所以从futexsleep函数返回后还需要检查note.key是否还是0,如果是0则表示并不是其它工作线程唤醒了我们,只是futexsleep意外返回了,需要再次调用futexsleep进入睡眠。

futexsleep调用futex函数进入睡眠。

runtime/os_linux.go : 32

// Atomically,
//if(*addr == val) sleep
// Might be woken up spuriously; that's allowed.
// Don't sleep longer than ns; ns < 0 means forever.
//go:nosplit
func futexsleep(addr *uint32, val uint32, ns int64) {
    var ts timespec

    // Some Linux kernels have a bug where futex of
    // FUTEX_WAIT returns an internal error code
    // as an errno. Libpthread ignores the return value
    // here, and so can we: as it says a few lines up,
    // spurious wakeups are allowed.
    if ns < 0 {
         //调用futex进入睡眠
        futex(unsafe.Pointer(addr), _FUTEX_WAIT_PRIVATE, val, nil, nil, 0)
        return
    }

    // It's difficult to live within the no-split stack limits here.
    // On ARM and 386, a 64-bit divide invokes a general software routine
    // that needs more stack than we can afford. So we use timediv instead.
    // But on real 64-bit systems, where words are larger but the stack limit
    // is not, even timediv is too heavy, and we really need to use just an
    // ordinary machine instruction.
    if sys.PtrSize == 8 {
        ts.set_sec(ns / 1000000000)
        ts.set_nsec(int32(ns % 1000000000))
    } else {
        ts.tv_nsec = 0
        ts.set_sec(int64(timediv(ns, 1000000000, (*int32)(unsafe.Pointer(&ts.tv_nsec)))))
    }
    futex(unsafe.Pointer(addr), _FUTEX_WAIT_PRIVATE, val, unsafe.Pointer(&ts), nil, 0)
}

futex是go汇编实现的函数,主要功能就是执行futex系统调用进入操作系统内核进行睡眠。

runtime/sys_linux_amd64.s : 525

// int64 futex(int32 *uaddr, int32 op, int32 val,
//struct timespec *timeout, int32 *uaddr2, int32 val2);
TEXT runtime·futex(SB),NOSPLIT,$0
    #下面的6条指令在为futex系统调用准备参数
MOVQ  addr+0(FP), DI
MOVL   op+8(FP), SI
MOVL   val+12(FP), DX
MOVQ  ts+16(FP), R10
MOVQ  addr2+24(FP), R8
MOVL   val3+32(FP), R9

MOVL   $SYS_futex, AX   #系统调用编号放入AX寄存器
SYSCALL  #执行futex系统调用进入睡眠,从睡眠中被唤醒后接着执行下一条MOVL指令
MOVL   AX, ret+40(FP)    #保存系统调用的返回值
RET

futex系统的参数比较多,其函数原型为

int64 futex(int32 *uaddr, int32 op, int32 val, struct timespec *timeout, int32 *uaddr2, int32 val2);

这里,futex系统调用为我们提供的功能为如果 *uaddr == val 则进入睡眠,否则直接返回。顺便说一下,为什么futex系统调用需要第三个参数val,需要在内核判断*uaddr与val是否相等,而不能在用户态先判断它们是否相等,如果相等才进入内核睡眠岂不是更高效?原因在于判断*uaddr与val是否相等和进入睡眠这两个操作必须是一个原子操作,否则会存在一个竞态条件:如果不是原子操作,则当前线程在第一步判断完*uaddr与val相等之后进入睡眠之前的这一小段时间内,有另外一个线程通过唤醒操作把*uaddr的值修改了,这就会导致当前工作线程永远处于睡眠状态而无人唤醒它。而在用户态无法实现判断与进入睡眠这两步为一个原子操作,所以需要内核来为其实现原子操作。

我们知道线程一旦进入睡眠状态就停止了运行,那么如果后来又有可运行的goroutine需要工作线程去运行,正在睡眠的线程怎么知道有工作可做了呢?

从前面的代码我们已经看到,stopm调用notesleep时给它传递的参数是m结构体的park成员,而m又早已通过mput放入了全局的milde空闲队列,这样其它运行着的线程一旦发现有更多的goroutine需要运行时就可以通过全局的m空闲队列找到处于睡眠状态的m,然后调用notewakeup(&m.park)将其唤醒,至于怎么唤醒,我们在其它章节继续讨论。

到此,我们已经完整分析了调度器的调度策略,从下一章起我们将开始讨论有关调度的另外一个话题:调度时机,即什么时候会发生调度。


最后,如果你觉得本文对你有帮助的话,麻烦帮忙点一下文末右下角的 在看 或转发到朋友圈,非常感谢!

本文分享自微信公众号 - go语言核心编程技术(gh_8b5b60477260),作者:爱写程序的阿波张

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-05-17

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 第三章 Goroutine调度策略(16)

    本文是《Go语言调度器源代码情景分析》系列的第16篇,也是第三章《Goroutine调度策略》的第1小节。

    阿波张
  • Goroutine被动调度之一(18)

    本文是《Go语言调度器源代码情景分析》系列的第18篇,也是第四章《Goroutine被动调度》的第1小节。

    阿波张
  • 工作线程的唤醒及创建(19)

    本文是《Go语言调度器源代码情景分析》系列的第19篇,也是第四章《Goroutine被动调度》的第2小节。

    阿波张
  • 第三章 Goroutine调度策略(16)

    本文是《Go语言调度器源代码情景分析》系列的第16篇,也是第三章《Goroutine调度策略》的第1小节。

    阿波张
  • Python进阶之强大的装饰器 Decorators (三)

    接下来要来谈谈 functools.wraps 的功用,虽然使用装饰器可以大大的减少重复的 code,但是他有一个缺点,就是你会发现 f1 function 中...

    用户4945346
  • Python-EEG工具库MNE中文教程(12)-注释连续数据

    本案例主要介绍如何向原始(Raw)对象添加注释,以及在数据处理的后期阶段如何使用注释。

    脑机接口社区
  • 利用ZABBIX进行服务器自动巡检并导出报表

    没有故事的陈师傅
  • Python date,datetime,time等相关操作总结

    __author__ = '授客' import time from datetime import date from datetim...

    授客
  • 币聪科技:Stratis行情分析,START跌破1.5美元,熊市中能否守住1美元?

    Stratis在过去24小时的交易中价格上涨了6.46%。在过去7个交易日内价格小幅下跌3.84%后,加密货币目前正以1.42美元交易。

    币聪财经
  • ThinkPHP连续签到小案例

    小伙伴们平时做网站开发的时候,是不是也遇到过会员连续签到送积分,比如我有一个加积分的规则是针对连续签到的,那么我们在实现这个功能的时候,我们面对...

    思梦php

扫码关注云+社区

领取腾讯云代金券