专栏首页go语言核心编程技术抢占系统调用执行时间过长的goroutine(22)

抢占系统调用执行时间过长的goroutine(22)

本文是《Go语言调度器源代码情景分析》系列的第22篇,也是第六章《抢占调度》的第2小节。


上一节我们分析了因运行时间过长而导致的抢占调度,这一节我们来分析因进入系统调用时间过长而发生的抢占调度。

剥夺工作线程的p

现在重新回到sysmon监控线程定期调用的retake函数:

runtime/proc.go : 4380

func retake(now int64) uint32 {
    ......
    for i := 0; i < len(allp); i++ {  //遍历所有p,然后根据p的状态进行抢占
        _p_ := allp[i]
        if _p_ == nil {
            // This can happen if procresize has grown
            // allp but not yet created new Ps.
            continue
        }
        
         //_p_.sysmontick用于sysmon监控线程监控p时记录系统调用时间和运行时间,由sysmon监控线程记录
        pd := &_p_.sysmontick
        s := _p_.status
        if s == _Psyscall { //系统调用抢占处理
             // Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us).
             //_p_.syscalltick用于记录系统调用的次数,主要由工作线程在完成系统调用之后++
            t := int64(_p_.syscalltick)
            if int64(pd.syscalltick) != t {
                 //pd.syscalltick != _p_.syscalltick,说明已经不是上次观察到的系统调用了,
                 //而是另外一次系统调用,所以需要重新记录tick和when值
                pd.syscalltick = uint32(t)
                pd.syscallwhen = now
                continue
            }
            
             //pd.syscalltick == _p_.syscalltick,说明还是之前观察到的那次系统调用,
             //计算这次系统调用至少过了多长时间了
            
            // On the one hand we don't want to retake Ps if there is no other work to do,
            // but on the other hand we want to retake them eventually
            // because they can prevent the sysmon thread from deep sleep.
             // 只要满足下面三个条件中的任意一个,则抢占该p,否则不抢占
             // 1. p的运行队列里面有等待运行的goroutine
             // 2. 没有无所事事的p
             // 3. 从上一次监控线程观察到p对应的m处于系统调用之中到现在已经超过10了毫秒
            if runqempty(_p_) &&  atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
                continue
            }
            // Drop allpLock so we can take sched.lock.
            unlock(&allpLock)
            // Need to decrement number of idle locked M's
            // (pretending that one more is running) before the CAS.
            // Otherwise the M from which we retake can exit the syscall,
            // increment nmidle and report deadlock.
            incidlelocked(-1)
            if atomic.Cas(&_p_.status, s, _Pidle) {
                ......
                _p_.syscalltick++
                handoffp(_p_)  //寻找一个新的m出来接管P
            }
            incidlelocked(1)
            lock(&allpLock)
        } else if s == _Prunning { //运行时间太长,抢占处理,前面已经分析
            ......
        }
    }
    ......
}

retake函数所做的主要事情就在遍历所有的p,并根据每个p的状态以及处于该状态的时长来决定是否需要发起抢占。从代码可以看出只有当p处于 _Prunning 或 _Psyscall 状态时才会进行抢占,而因p处于_Prunning状态的时间过长而发生的抢占调度我们在上一节已经分析过了,现在我们来看看如何对处于系统调用之中的p(对应的goroutine)进行抢占。

根据retake函数的代码,只要满足下面三个条件中的任意一个就需要对处于_Psyscall 状态的p进行抢占:

  1. p的运行队列里面有等待运行的goroutine。这用来保证当前p的本地运行队列中的goroutine得到及时的调度,因为该p对应的工作线程正处于系统调用之中,无法调度队列中goroutine,所以需要寻找另外一个工作线程来接管这个p从而达到调度这些goroutine的目的;
  2. 没有空闲的p。表示其它所有的p都已经与工作线程绑定且正忙于执行go代码,这说明系统比较繁忙,所以需要抢占当前正处于系统调用之中而实际上系统调用并不需要的这个p并把它分配给其它工作线程去调度其它goroutine。
  3. 从上一次监控线程观察到p对应的m处于系统调用之中到现在已经超过10了毫秒。这表示只要系统调用超时,就对其抢占,而不管是否真的有goroutine需要调度,这样保证sysmon线程不至于觉得无事可做(sysmon线程会判断retake函数的返回值,如果为0,表示retake并未做任何抢占,所以会觉得没啥事情做)而休眠太长时间最终会降低sysmon监控的实时性。至于如何计算某一次系统调用时长可以参考上面代码及注释。

retake函数发现如果需要抢占,则通过使用cas修改p的状态来获取p的使用权(为什么需要使用cas呢?从后面的分析我们将知道,工作线程此时此刻可能正好从系统调用返回了,也正在获取p的使用权),如果使用权获取成功则调用handoffp寻找新的工作线程来接管这个p。

runtime/proc.go : 1995

// Hands off P from syscall or locked M.
// Always runs without a P, so write barriers are not allowed.
//go:nowritebarrierrec
func handoffp(_p_ *p) {
    // handoffp must start an M in any situation where
    // findrunnable would return a G to run on _p_.

    // if it has local work, start it straight away
     //运行队列不为空,需要启动m来接管
    if !runqempty(_p_) || sched.runqsize != 0 {
        startm(_p_, false)
        return
    }
    // if it has GC work, start it straight away
     //有垃圾回收工作需要做,也需要启动m来接管
    if gcBlackenEnabled != 0 && gcMarkWorkAvailable(_p_) {
        startm(_p_, false)
        return
    }
    // no local work, check that there are no spinning/idle M's,
    // otherwise our help is not required
     //所有其它p都在运行goroutine,说明系统比较忙,需要启动m
    if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 &&  atomic.Cas(&sched.nmspinning, 0, 1) { // TODO: fast atomic
        startm(_p_, true)
        return
    }
    lock(&sched.lock)
    if sched.gcwaiting != 0 { //如果gc正在等待Stop The World
        _p_.status = _Pgcstop
        sched.stopwait--
        if sched.stopwait == 0 {
            notewakeup(&sched.stopnote)
        }
        unlock(&sched.lock)
        return
    }
    ......
    if sched.runqsize != 0 { //全局运行队列有工作要做
        unlock(&sched.lock)
        startm(_p_, false)
        return
    }
    // If this is the last running P and nobody is polling network,
    // need to wakeup another M to poll network.
     //不能让所有的p都空闲下来,因为需要监控网络连接读写事件
    if sched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) != 0 {
        unlock(&sched.lock)
        startm(_p_, false)
        return
    }
    pidleput(_p_)  //无事可做,把p放入全局空闲队列
    unlock(&sched.lock)
}

handoffp函数流程比较简单,它的主要任务是通过各种条件判断是否需要启动工作线程来接管_p_,如果不需要则把_p_放入P的全局空闲队列。

从handoffp的代码可以看出,在如下几种情况下则需要调用我们已经分析过的startm函数启动新的工作线程出来接管_p_:

  1. _p_的本地运行队列或全局运行队列里面有待运行的goroutine;
  2. 需要帮助gc完成标记工作;
  3. 系统比较忙,所有其它_p_都在运行goroutine,需要帮忙;
  4. 所有其它P都已经处于空闲状态,如果需要监控网络连接读写事件,则需要启动新的m来poll网络连接。

到此,sysmon监控线程对处于系统调用之中的p的抢占就已经完成。

系统调用

从上面的分析可以看出,这里对正在进行系统调用的goroutine的抢占实质上是剥夺与其对应的工作线程所绑定的p,虽然说处于系统调用之中的工作线程并不需要p,但一旦从操作系统内核返回到用户空间之后就必须绑定一个p才能运行go代码,那么,工作线程从系统调用返回之后如果发现它进入系统调用之前所使用的p被监控线程拿走了,该怎么办呢?接下来我们就来分析这个问题。

为了搞清楚工作线程从系统调用返回之后需要做哪些事情,我们需要找到相关的代码,怎么找代码呢?这里我们通过对一个使用了系统调用的程序的调试来寻找。

package main

import (
    "fmt"
    "os"
)

func main() {
    fd, err := os.Open("./syscall.go")  //一定会执行系统调用
    if err != nil {
        fmt.Println(err)
    }

    fd.Close()
}

使用gdb跟踪调试上面这个程序可以发现,main函数调用的os.Open函数最终会调用到Syscall6函数,因为中间调用过程与我们分析目标没关系,所以我们直接从Syscall6函数开始分析。

syscall/asm_linux_amd64.s : 42

// func Syscall6(trap, a1, a2, a3, a4, a5, a6 uintptr) (r1, r2, err uintptr)
TEXT ·Syscall6(SB), NOSPLIT, $0-80
    CALL  runtime·entersyscall(SB)

    #按照linux系统约定复制参数到寄存器并调用syscall指令进入内核
    MOVQ  a1+8(FP), DI
    MOVQ  a2+16(FP), SI
    MOVQ  a3+24(FP), DX
    MOVQ  a4+32(FP), R10
    MOVQ  a5+40(FP), R8
    MOVQ  a6+48(FP), R9
    MOVQ  trap+0(FP), AX#syscall entry,系统调用编号放入AX
    SYSCALL  #进入内核

    #从内核返回,判断返回值,linux使用 -1 ~ -4095 作为错误码
    CMPQ  AX, $0xfffffffffffff001
    JLS  ok6

    #系统调用返回错误,为Syscall6函数准备返回值
    MOVQ  $-1, r1+56(FP)
    MOVQ  $0, r2+64(FP)
    NEGQ  AX
    MOVQ  AX, err+72(FP)
    CALL  runtime·exitsyscall(SB)
    RET
ok6:      #系统调用返回错误
    MOVQ  AX, r1+56(FP)
    MOVQ  DX, r2+64(FP)
    MOVQ  $0, err+72(FP)
    CALL  runtime·exitsyscall(SB)
    RET

Syscall6函数主要依次干了如下3件事:

  1. 调用runtime.entersyscall函数;
  2. 使用SYSCALL指令进入系统调用;
  3. 调用runtime.exitsyscall函数。

根据前面的分析和这段代码我们可以猜测,exitsyscall函数将会处理当前工作线程进入系统调用之前所拥有的p被监控线程抢占剥夺的情况。但这里怎么会有个entersyscall呢,它是干啥的?我们先来看看。

entersyscall函数

runtime/proc.go : 2847

// Standard syscall entry used by the go syscall library and normal cgo calls.
//go:nosplit
func entersyscall() {
    reentersyscall(getcallerpc(), getcallersp())
}

func reentersyscall(pc, sp uintptr) {
    _g_ := getg()  //执行系统调用的goroutine

    // Disable preemption because during this function g is in Gsyscall status,
    // but can have inconsistent g->sched, do not let GC observe it.
    _g_.m.locks++

    // Entersyscall must not call any function that might split/grow the stack.
    // (See details in comment above.)
    // Catch calls that might, by replacing the stack guard with something that
    // will trip any stack check and leaving a flag to tell newstack to die.
    _g_.stackguard0 = stackPreempt
    _g_.throwsplit = true

    // Leave SP around for GC and traceback.
    save(pc, sp)  //save函数分析过,用来保存g的现场信息,rsp, rbp, rip等等
    _g_.syscallsp = sp
    _g_.syscallpc = pc
    casgstatus(_g_, _Grunning, _Gsyscall)  
     ......
    _g_.m.syscalltick = _g_.m.p.ptr().syscalltick
    _g_.sysblocktraced = true
    _g_.m.mcache = nil
    pp := _g_.m.p.ptr()
    pp.m = 0  //p解除与m之间的绑定
    _g_.m.oldp.set(pp)   //把p记录在oldp中,等从系统调用返回时,优先绑定这个p
    _g_.m.p = 0  //m解除与p之间的绑定
    atomic.Store(&pp.status, _Psyscall)  //修改当前p的状态,sysmon线程依赖状态实施抢占
    .....
    _g_.m.locks--
}

entersyscall函数直接调用了reentersyscall函数,reentersyscall首先把现场信息保存在当前g的sched成员中,然后解除m和p的绑定关系并设置p的状态为_Psyscall,前面我们已经看到sysmon监控线程需要依赖该状态实施抢占。

这里有几个问题需要澄清一下:

  1. 有sysmon监控线程来抢占剥夺,为什么这里还需要主动解除m和p之间的绑定关系呢?原因主要在于这里主动解除m和p的绑定关系之后,sysmon线程就不需要通过加锁或cas操作来修改m.p成员从而解除m和p之间的关系;
  2. 为什么要记录工作线程进入系统调用之前所绑定的p呢?因为记录下来可以让工作线程从系统调用返回之后快速找到一个可能可用的p,而不需要加锁从sched的pidle全局队列中去寻找空闲的p。
  3. 为什么要把进入系统调用之前所绑定的p搬到m的oldp中,而不是直接使用m的p成员?笔者第一次看到这里也有疑惑,于是翻看了github上的提交记录,从代码作者的提交注释来看,这里主要是从保持m的p成员清晰的语义方面考虑的,因为处于系统调用的m事实上并没有绑定p,所以如果记录在p成员中,p的语义并不够清晰明了。

看完进入系统调用之前调用的entersyscall函数后,我们再来看系统调用返回之后需要调用的exitsyscall函数。

exitsyscall函数

runtime/proc.go : 2931

// The goroutine g exited its system call.
// Arrange for it to run on a cpu again.
// This is called only from the go syscall library, not
// from the low-level system calls used by the runtime.
//
// Write barriers are not allowed because our P may have been stolen.
//
//go:nosplit
//go:nowritebarrierrec
func exitsyscall() {
    _g_ := getg()
    ......
    oldp := _g_.m.oldp.ptr()  //进入系统调用之前所绑定的p
    _g_.m.oldp = 0
    if exitsyscallfast(oldp) {//因为在进入系统调用之前已经解除了m和p之间的绑定,所以现在需要绑定p
         //绑定成功,设置一些状态
        ......
        
        // There's a cpu for us, so we can run.
        _g_.m.p.ptr().syscalltick++  //系统调用完成,增加syscalltick计数,sysmon线程依靠它判断是否是同一次系统调用
        // We need to cas the status and scan before resuming...
         //casgstatus函数会处理一些垃圾回收相关的事情,我们只需知道该函数重新把g设置成_Grunning状态即可
        casgstatus(_g_, _Gsyscall, _Grunning)
        ......
        return
    }
    ......
    _g_.m.locks--

    // Call the scheduler.
     //没有绑定到p,调用mcall切换到g0栈执行exitsyscall0函数
    mcall(exitsyscall0)
    ......
}

因为在进入系统调用之前,工作线程调用entersyscall解除了m和p之间的绑定,现在已经从系统调用返回需要重新绑定一个p才能继续运行go代码,所以exitsyscall函数首先就调用exitsyscallfast去尝试绑定一个空闲的p,如果绑定成功则结束exitsyscall函数按函数调用链原路返回去执行其它用户代码,否则则调用mcall函数切换到g0栈执行exitsyscall0函数。下面先来看exitsyscallfast如何尝试绑定一个p,然后再去分析exitsyscall0函数。

exitsyscallfast首先尝试绑定进入系统调用之前所使用的p,如果绑定失败就需要调用exitsyscallfast_pidle去获取空闲的p来绑定。

runtime/proc.go : 3020

//go:nosplit
func exitsyscallfast(oldp *p) bool {
    _g_ := getg()
    ......
    // Try to re-acquire the last P.
     //尝试快速绑定进入系统调用之前所使用的p
    if oldp != nil && oldp.status == _Psyscall && atomic.Cas(&oldp.status, _Psyscall, _Pidle) {
         //使用cas操作获取到p的使用权,所以之后的代码不需要使用锁就可以直接操作p
        // There's a cpu for us, so we can run.
        wirep(oldp) //绑定p
        exitsyscallfast_reacquired()
        return true
    }

    // Try to get any other idle P.
    if sched.pidle != 0 {
        var ok bool
        systemstack(func() {
            ok = exitsyscallfast_pidle()  //从全局队列中寻找空闲的p,需要加锁,比较慢
            ......
        })
        if ok {
            return true
        }
    }
    return false
}

exitsyscallfast首先尝试快速绑定进入系统调用之前所使用的p,因为该p的状态目前还是_Psyscall,监控线程此时可能也正好准备操作这个p的状态,所以这里需要使用cas原子操作来修改状态,保证只有一个线程的cas能够成功,一旦cas操作成功,就表示当前线程获取到了p的使用权,这样当前线程的后续代码就可以直接操作该p了。具体到exitsyscallfast函数,一旦我们拿到p的使用权,就调用wirep把工作线程m和p关联起来,完成绑定工作。所谓的绑定其实就是设置m的p成员指向p和p的m成员指向m。

runtime/proc.go : 4099

// wirep is the first step of acquirep, which actually associates the
// current M to _p_. This is broken out so we can disallow write
// barriers for this part, since we don't yet have a P.
//
//go:nowritebarrierrec
//go:nosplit
func wirep(_p_ *p) {
     _g_ := getg()
     ......
     //相互赋值,绑定m和p
     _g_.m.mcache = _p_.mcache
     _g_.m.p.set(_p_)
     _p_.m.set(_g_.m)
     _p_.status = _Prunning
}

exitsyscallfast函数如果绑定进入系统调用之前所使用的p失败,则调用exitsyscallfast_pidle从p的全局空闲队列中获取一个p出来绑定,注意这里使用了systemstack(func())函数来调用exitsyscallfast_pidle,systemstack(func())函数有一个func()类型的参数,该函数首先会把栈切换到g0栈,然后调用通过参数传递进来的函数(这里是一个闭包,包含了对exitsyscallfast_pidle函数的调用),最后再切换回原来的栈并返回,为什么这些代码需要在系统栈也就是g0的栈上执行呢?原则上来说,只要调用链上某个函数有nosplit这个编译器指示就需要在g0栈上去执行,因为有nosplit指示的话编译器就不会插入检查溢出的代码,这样在非g0栈上执行这些nosplit函数就有可能导致栈溢出,g0栈其实就是操作系统线程所使用的栈,它的空间比较大,不需要对runtime代码中的每个函数都做栈溢出检查,否则会严重影响效率。

为什么绑定进入系统调用之前所使用的p会失败呢?原因就在于这个p可能被sysmon监控线程拿走并绑定到其它工作线程,这部分内容我们已经在前面分析过了。

现在继续看exitsyscallfast_pidle函数,从代码可以看到从全局空闲队列获取p需要加锁,如果锁冲突比较严重的话,这个过程就很慢了,这也是为什么exitsyscallfast函数首先会去尝试绑定之前使用的p的原因。

runtime/proc.go : 3083

func exitsyscallfast_pidle() bool {
    lock(&sched.lock)
    _p_ := pidleget()//从全局空闲队列中获取p
    if _p_ != nil && atomic.Load(&sched.sysmonwait) != 0 {
        atomic.Store(&sched.sysmonwait, 0)
        notewakeup(&sched.sysmonnote)
    }
    unlock(&sched.lock)
    if _p_ != nil {
        acquirep(_p_)
        return true
    }
    return false
}

回到exitsyscall函数,如果exitsyscallfast绑定p失败,则调用mcall执行exitsyscall0函数,mcall我们已经见到过多次,所以这里只分析exitsyscall0函数。

runtime/proc.go : 3098

// exitsyscall slow path on g0.
// Failed to acquire P, enqueue gp as runnable.
//
//go:nowritebarrierrec
func exitsyscall0(gp *g) {
    _g_ := getg()

    casgstatus(gp, _Gsyscall, _Grunnable)
    
     //当前工作线程没有绑定到p,所以需要解除m和g的关系
    dropg()
    lock(&sched.lock)
    var _p_ *p
    if schedEnabled(_g_) {
        _p_ = pidleget() //再次尝试获取空闲的p
    }
    if _p_ == nil { //还是没有空闲的p
        globrunqput(gp)  //把g放入全局运行队列
    } else if atomic.Load(&sched.sysmonwait) != 0 {
        atomic.Store(&sched.sysmonwait, 0)
        notewakeup(&sched.sysmonnote)
    }
    unlock(&sched.lock)
    if _p_ != nil {//获取到了p
        acquirep(_p_) //绑定p
         //继续运行g
        execute(gp, false) // Never returns.
    }
    if _g_.m.lockedg != 0 {
         // Wait until another thread schedules gp and so m again.
        stoplockedm()
        execute(gp, false) // Never returns.
    }
    stopm()  //当前工作线程进入睡眠,等待被其它线程唤醒
    
     //从睡眠中被其它线程唤醒,执行schedule调度循环重新开始工作
    schedule() // Never returns.
}

因为工作线程没有绑定p是不能运行goroutine的,所以这里会再次尝试从全局空闲队列找一个p出来绑定,找到了就通过execute函数继续执行当前这个goroutine,如果找不到则把当前goroutine放入全局运行队列,由其它工作线程负责把它调度起来运行,自己则调用stopm函数进入睡眠状态。execute和stopm函数我们已经分析过,所以这里就不再重复。

至此,我们已经分析完工作线程从系统调用返回需要做到,

小结

从上一节和本小节的分析我们可以看出,因运行时间过长与因系统调用时间过长而导致的抢占是有差别的:

  • 对于运行时间过长的goroutine,系统监控线程首先会提出抢占请求,然后工作线程在适当的时候会去响应这个请求并暂停被抢占goroutine的运行,最后工作线程再调用schedule函数继续去调度其它goroutine;
  • 而对于系统调用执行时间过长的goroutine,调度器并没有暂停其执行,只是剥夺了正在执行系统调用的工作线程所绑定的p,要等到工作线程从系统调用返回之后绑定p失败的情况下该goroutine才会真正被暂停运行。

思考

最后,我们用一个思考题来结束本专题,读者朋友可以思考一下当GOMAXPROCS等于1时,下面这个程序会输出什么?

package main

import (
    "fmt"
    "runtime"
)

func g2() {
    sum := 0
    for  {
        sum++
    }
}

func main() {
    go g2()

    for {
         runtime.Gosched()
        fmt.Println("main is scheduled!")
    }
}

本文分享自微信公众号 - go语言核心编程技术(gh_8b5b60477260),作者:爱写程序的阿波张

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-06-01

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 工作线程的唤醒及创建(19)

    本文是《Go语言调度器源代码情景分析》系列的第19篇,也是第四章《Goroutine被动调度》的第2小节。

    阿波张
  • Go语言调度器之盗取goroutine(17)

    本文是《Go语言调度器源代码情景分析》系列的第17篇,也是第三章《Goroutine调度策略》的第2小节。

    阿波张
  • go调度器源代码情景分析之九:操作系统线程及线程调度

    要深入理解goroutine的调度器,就需要对操作系统线程有个大致的了解,因为go的调度系统是建立在操作系统线程之上的,所以接下来我们对其做一个简单的介绍。

    阿波张
  • 使用Python处理NetCDF格式文件

    NetCDF(Network Common Data Form)是一种科学二进制数据格式,由UCAR负责开发和维护netCDF软件,主要用于存储多维科学数据。在...

    zhangqibot
  • 2019 AI 安防:机会、战争、活下去

    行业剧变、机会遍地、欲望蔓延,同时不确定性骤增,形态各异的欢喜、悲伤、期待、遗憾,构成了一幅独属于2019年的浮世绘。

    AI掘金志
  • 从一个简单的汇编程序学习linux下的系统调用机制

    上节 从一个简单的汇编程序学习汇编程序的结构以及编译链接的过程中,打印hello world的汇编程序的详细解释为:

    Elapse
  • 基于宝塔面板安装Aria2+Aria2NG工具

    刺_猬
  • 腾讯云认证考试常见问题答疑

    认证报名官网:https://cloud.tencent.com/edu/training,您需要先注册腾讯云帐号,并通过个人身份认证,完成个人认证后在该页面选...

    KLKL
  • 信安之路推出 SRC 同盟计划并招募同盟

    信安之路的使命是帮助安全从业人员成长,在安全从业人员中有很大一部分人员通过挖掘漏洞提交给 SRC 平台来获得一些收益或者提升技能,而 SRC 部门的安全从业人员...

    信安之路
  • 如何优雅的搞垮服务器,再优雅的救活

    新开发的jar包部署在老服务器上,版本是Red Hat Enterprise Linux AS release 4 (Nahant Update 5),提示需要...

    震八方紫面昆仑侠

扫码关注云+社区

领取腾讯云代金券