本文深入分析Go调度原理和实现,全文包含的主要内容有:Go程序是怎么运行起来的,经历了哪些流程,调度G的策略和时机,程序是如何在执行runtime代码与用户代码之间来回切换的。文章内容很长,感兴趣的同学可以收藏慢慢看。本文中分析的代码是Go1.14版本,涉及到的文件都在runtime包下。
对于编译性语言,需要将源代码文件编译成一个可执行程序,然后该执行程序被操作系统加载到内存开始运行。对于Go语言来说,就是我们编写的.go文件,通过编译、汇编和链接产生一个可执行的二进制文件。这个二进制文件也称为程序,当它被操作系统加载到内存后,会产生一个进程,进程就是可执行程序的一个实例。程序从加载到被执行这个过程概括起来有以下几个阶段
linux系统中典型的进程内存结构如下所示,当程序刚启动执行时,会从arg/environ下面的主线程栈开始执行。随着函数调用,栈向下生长,当函数调用完毕,又主动释放占用的内存空间。
我们先通过一个例子看如何找到一个Go可执行程序的入口,下面的代码保存在main.go文件中,运行 go build main.go默认生成可执行文件名称为main。然后使用gdb调试可以执行文件main.
package main
import "fmt"
func main() {
fmt.Println("hello world")
}
执行gdb main,进入程序调试交互窗口,输入info files可以看到程序的入口Entry point: 0x45d990,这里是程序的入口地址,然后在这个地址这里下一个断点,b *0x45d990, 最后执行r,定位到源代码的入口地方在_rt0_adm64_linux()。在go源码目录下grep -rn "_rt0_adm64_linux" 可以定位到该函数在runtime包中rt0_linux_amd64.s中,该文件中的代码是汇编语言。_rt0_amd64_linux执行的指令是跳转到_rt0_amd64符合的位置。
TEXT _rt0_amd64_linux(SB),NOSPLIT,$-8
JMP _rt0_amd64(SB)
TEXT _rt0_amd64(SB),NOSPLIT,$-8
// 将程序的参数argv地址拷贝到DI寄存器中
MOVQ 0(SP), DI // argc
// 将程序参数argv的地址拷贝到SI寄存器中
LEAQ 8(SP), SI // argv
// 跳转到runtime·rt0_go符合的位置
JMP runtime·rt0_go(SB)
前面我们通过一个例子找到Go程序的入口,下面我们看Go调度器是如何初始化的。Go程序调度模型也称为GMP模型,调度器初始化也是就核心数据结构G、M、P是怎么初始化的,以及它们之间的关系。对应到代码中也就是G0、M0和P的初始化。G0是一种特殊的G,它运行的是runtime中的代码,M0是进程启动的第一个线程,也称为主线程。下面通过源码分析调度器初始化的细节。
_rt0_amd64将程序参数argc和argv参数地址拷贝到寄存器之后,跳转到runtime·rt0_go位置执行。下面是runtime·rt0_go函数的源码,它也是汇编语言写的,runtime.rt0_g0函数完成了Go程序启动时的所有初始化工作,函数很长,这里提取了与调度相关的代码,这部分内容是我们需要关心的。
rt0_go函数开始的工作是将寄存器DI和SI的值分别赋值给AX和BX,因为DI和SI中的值是分别是函数参数argc和argv的地址,经过赋值之后,参数的信息也就存储在了AX和BX中。然后栈顶寄存器SP向下移动39字节,并调整SP的位置使它按16字节对齐。最后将argc和argv分别放到SP+16和SP+24字节处的内存中。
TEXT runtime·rt0_go(SB),NOSPLIT,$0
// 将寄存器DI的值赋值给AX
MOVQ DI, AX // argc
// 将寄存器SI的值赋值给BX
MOVQ SI, BX // argv
SUBQ $(4*8+7), SP // 2args 2auto
// 调整栈顶寄存器SP使得它按16字节对齐
ANDQ $~15, SP
// 将AX中的内容拷贝都SP+16字节处的内存中,AX中存储的是argc的值
// 也就是将argc放到SP+16字节处的内存中
MOVQ AX, 16(SP)
// 同理,将argv放到SP+24字节处的内存中
MOVQ BX, 24(SP)
下面是G0的初始化,G0是一个全局的变量,它为runtime代码的运行提供一个栈环境。程序先将G0的地址保存在DI寄存器中,然后栈顶寄存器SP向下移动64*1024-104个字节,即向下大约移动64KB,构造G0的栈空间。最后给G0的stackguard0、stackguard1、stack.lo和stack.hi字段设置初始值,这几个字段与栈的位置,以及栈扩容相关。读到这里,有同学可能有疑问这里直接对G0的字段进行赋值,G0本身是怎么分配出来的?因为G0是一个全局变量,在执行rt0_go函数时已经分配过内存空间了。其他的G的是通过newg函数分配的,在后面的源码分析中我们会看到。这也是G0与其他G不同的一个地方,还有不同是这里G0字段初始化用的是汇编语言,其他的G中的字段的初始化是Go语言完成的。
TEXT runtime·rt0_go(SB),NOSPLIT,$0
...
// 将g0的地址放到DI寄存器中
MOVQ $runtime·g0(SB), DI
// 将SP指向的位置向下(低地址空间)移动64*1024-104个字节,然后将该位置的地址赋值给BX
// 这里是在为g0构造栈空间,g0栈的空间大小约为64KB
LEAQ (-64*1024+104)(SP), BX
// 将BX的值拷贝给g0.stackguard0,即 g0.stackguard0=*SP-64*1024+104
MOVQ BX, g_stackguard0(DI)
// g0.stackguard1=*SP-64*1024+104
MOVQ BX, g_stackguard1(DI)
// g0.stack.lo=*SP-64*1024+104
MOVQ BX, (g_stack+stack_lo)(DI)
// g0.stack.hi=SP
MOVQ SP, (g_stack+stack_hi)(DI)
...
G0是一个全局变量
var (
// 全局m0
m0 m
// 全局g0
g0 g
raceprocctx0 uintptr
)
执行完成上面的语句之后,G0和栈空间的关系如下图所示
完成了G0的初始化之后,rt0_go开始设置线程的本地存储。本地存储简称TLS(Thread Local Storage),它其实是线程的私有的全局变量。普通的全局变量,如果其中一个线程对其进行了修改,其他线程看到这个变量的值是修改后的值。而线程的私有全局变量则不是这样,某线程对私有全局变量的修改,不会影响到其他线程看到的值,可以理解成每个线程对私有全局变量都有自己的一个副本,某个线程修改的只是自己的副本,并不会对其他线程造成影响。
对应到这里线程本地存储中保存的是g0的值,为啥要保存g0的值呢?因为runtime会启动很多个工作线程执行我们的go协程。每个工作线程m都会绑定一个g0,g0是一种特殊的g,它的功能是对我们用户创建的g进行调度,也就是它运行的代码是runtime中的代码,并不是我们编写的用户逻辑。在m的结构体中有一个g0字段,保存的就是这里的g0.利用本地存储保存g0,可以做到各个m之间不会影响各自的g0,又方便代码编写。
TEXT runtime·rt0_go(SB),NOSPLIT,$0
...
// 下面初始化线程本地存储
// 将m0的tls字段的地址给到DI寄存器,即 DI=&m0.tls
LEAQ runtime·m0+m_tls(SB), DI
// 调用settls设置线程本地存储,settls函数的参数已在DI寄存器中
CALL runtime·settls(SB)
// 验证刚才设置的本地线程存储是否可以正常工作
// 获取段基地址(FS)放入BX寄存器,也就是把m0.tls[1]的地址放入到BX寄存器
get_tls(BX)
// 将常数0x123赋值给BX寄存器指向的内存地址
MOVQ $0x123, g(BX)
// 将AX设置为m0.tls[0]
MOVQ runtime·m0+m_tls(SB), AX
// 在将AX指向的内存中的值与0x123进行比较,通过set-get-compare的形式检查tls是否工作正常
CMPQ AX, $0x123
JEQ 2(PC)
CALL runtime·abort(SB)
ok:
get_tls(BX)
// 寄存器CX存储g0的地址
LEAQ runtime·g0(SB), CX
// 将g0的地址保存到本地线程存储 m0.tls[0]中
MOVQ CX, g(BX)
// 将m0的地址保存到寄存器AX中
LEAQ runtime·m0(SB), AX
// save m->g0 = g0
// 将m0和g0互相绑定
// m0.g0=g0
MOVQ CX, m_g0(AX)
// g0.m=m0
MOVQ AX, g_m(CX)
CLD
CALL runtime·check(SB)
TEXT runtime·settls(SB),NOSPLIT,$32
#ifdef GOOS_android
SUBQ runtime·tls_g(SB), DI
#else
// DI寄存器中保存的是m.tls[0]的地址,执行下面的ADDQ之后,将DI寄存器中的值+8
// 此时DI中的值保存的是m.tls[1]的地址
ADDQ $8, DI
// 将DI寄存器的值拷贝到SI寄存器中,这是arch_prctl系统调用的第二个参数
MOVQ DI, SI
// 将值0x1002拷贝到DI寄存器中,这时arch_prctl系统调用的第一个参数
MOVQ $0x1002, DI // ARCH_SET_FS
// 将SYS_arch_prctl系统调用编号的值拷贝到AX寄存器中
MOVQ $SYS_arch_prctl, AX
// 执行系统调用
SYSCALL
CMPQ AX, $0xfffffffffffff001
JLS 2(PC)
MOVL $0xf1, 0xf1 // crash
RET
通过arch_prctl系统调用将m0.tls[1]的地址设为了FS段的段基地址,CPU中有个FS寄存器与之对应,这样以后,工作线程代码可以通过FS寄存器找到m.tls.
前面执行已完成G0的初始化,M0与G0的相互绑定,主线程中可以通过get_tls获取到G0,通过G0的成员字段m可以找到M0,实现了M0和G0与主线程之间的关联。此时主线程、m0、g0以及栈之间的关系如下图所示。
下面看Go进程启动的核心处理流程。整个流程主要分为5个步骤:
TEXT runtime·rt0_go(SB),NOSPLIT,$0
...
// 下面语句处理操作系统传递过来的参数
// 将argc从内存搬到AX存储器中
MOVL 16(SP), AX // copy argc
// 将argc搬到SP+0的位置,即栈顶位置
MOVL AX, 0(SP)
// 将argv从内存搬到AX寄存器中
MOVQ 24(SP), AX // copy argv
// 将argv搬到SP+8的位置
MOVQ AX, 8(SP)
CALL runtime·args(SB)
// 调用osinit函数,获取CPU的核数,存放在全局变量ncpu中,供后面的调度时使用
CALL runtime·osinit(SB)
// 调用schedinit进行调度的初始化
CALL runtime·schedinit(SB)
// runtime.mainPC是runtime.main函数,将runtine.main的地址拷贝到AX寄存器
MOVQ $runtime·mainPC(SB), AX // entry
// 将AX入栈,AX中存储的是runtime.main函数的地址,也就是下面将要开启新goroutine需要执行的函数
// newproc的第二个参数
PUSHQ AX
// newproc的第一个参数入栈,该值表示runtime.main函数占用的大小,因为runtime.main函数没有入参
// 所以这里是0
PUSHQ $0 // arg size
// 创建main goroutine
CALL runtime·newproc(SB)
POPQ AX
POPQ AX
// 调用mstart函数,主线程进入调度循环,运行刚刚创建的main goroutine
CALL runtime·mstart(SB)
// 上面的mstart函数是不会返回的,如果返回了,说明代码有问题,直接abort
CALL runtime·abort(SB)
RET
MOVQ $runtime·debugCallV1(SB), AX
RET
下面开始分析上面每个函数具体做了哪些事情:
osinit主要是获取CPU的核数并保持在全局变量ncpu中,linux系统下该函数在 runtime/os_linux.go文件中,该函数的实现与具体的系统架构有关,mac系统该函数在runtime/os_darwin.go文件中
func osinit() {
// 获得CPU的数量
ncpu = getproccount()
physHugePageSize = getHugePageSize()
osArchInit()
}
schedinit是真正的调度函数,完成调度的初始化操作。主要完成以下功能:
func schedinit() {
// _g_为g0
// 当前的goroutine为g0,获得g0的地址
_g_ := getg()
if raceenabled {
_g_.racectx, raceprocctx0 = raceinit()
}
// 初始化最大默认线程为1万个
sched.maxmcount = 10000
tracebackinit()
moduledataverify()
// 初始化栈空间
stackinit()
mallocinit()
fastrandinit()
// 初始化m0,_g_为g0,所以_g_.m即为g0.m,g0.m也就是m0
mcommoninit(_g_.m)
...
gcinit()
sched.lastpoll = uint64(nanotime())
// 初始化p的数量,初始值为cpu的核数,如果程序设置了GOMAXPROCS,则取
// GOMAXPROCS环境变量的值
procs := ncpu
if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
procs = n
}
// 调整p的数量,创建并初始化所有的p,所有的p保存在全局变量allp中
if procresize(procs) != nil {
throw("unknown runnable goroutine during bootstrap")
}
...
}
getg函数功能是从本地存储中获取当前正在运行的g,在源码中没有找到它的实现,查阅资料说它是在编译器中实现的。前面已经g0保存到了本地线程存储中,所以这里getg获取到的g就是g0.
schedinit函数中调用了mcommoninit函数,实现对m0的初始化,mcommoninit函数实现如下,它会设置m0的id字段,每个m有唯一的id标识,并将当前的m插入到全局保存m的链表对象allm中。这个链表是一个单链表,allm始终指向链表头。
func mcommoninit(mp *m) {
// _g_还是g0
_g_ := getg()
if _g_ != _g_.m.g0 {
callers(1, mp.createstack[:])
}
lock(&sched.lock)
// sched.mnext溢出了
if sched.mnext+1 < sched.mnext {
throw("runtime: thread ID overflow")
}
// 设置m的id字段,每个m有唯一的id标识
mp.id = sched.mnext
// 全局计时器mnext自增1
sched.mnext++
// 检查当前存活的m数量是否在不超过10000
checkmcount()
...
// allm保存了所有的m对象,它是以链表的形式存储的,通过头插法的方式将当前的
// m(mp)插入到全局链表对象allm中
mp.alllink = allm
// 更新allm的头节点为当前的m对象mp
atomicstorep(unsafe.Pointer(&allm), unsafe.Pointer(mp))
unlock(&sched.lock)
...
}
schedinit会调用procresize函数完成所有p的创建和初始化,整个procresize处理比较复杂,因为程序启动后我们可用通过runtime.GOMAXPROCS()函数动态修改p的数量。该函数主要有以下步骤:
上述步骤描述的是最基本的情况,即没有通过runtime.GOMAXPROCS()函数对p的数量进行调整。如果对p的数量进行了调整,会做一些其他处理逻辑,例如将多余的p的进行销毁,如果p的本地队列中已有等待运行的g,从全局空闲m队列中获取一个m将它与p进行绑定等。将所有可运行的p通过p.link串联起来,构成了一个链表结构,函数返回可运行p链表头节点。
func procresize(nprocs int32) *p {
// 初始时gomaxprocs的值为0
old := gomaxprocs
if old < 0 || nprocs <= 0 {
throw("procresize: invalid arg")
}
if trace.enabled {
traceGomaxprocs(nprocs)
}
// 更新统计信息
now := nanotime()
if sched.procresizetime != 0 {
sched.totaltime += int64(old) * (now - sched.procresizetime)
}
sched.procresizetime = now
// 初始时allp中没有p,即len(allp)=0, nprocs显然大于len(allp)
// 会进入这里的if逻辑
if nprocs > int32(len(allp)) {
lock(&allpLock)
if nprocs <= int32(cap(allp)) {
// 调整allp的len为nprocs, allp的cap不变
allp = allp[:nprocs]
} else {
// 初始化时会创建新的切片保存p
nallp := make([]*p, nprocs)
// 将旧切片allp中的p拷贝到新的切片nallp中
copy(nallp, allp[:cap(allp)])
// 将allp替换为新创建的切片nallp
allp = nallp
}
unlock(&allpLock)
}
// 开始时,old为0,即创建初始化所有的p,保存到allp中
for i := old; i < nprocs; i++ {
pp := allp[i]
// 如果pp非空,复用已有的pp,否则通过new新创建一个p对象
if pp == nil {
pp = new(p)
}
// 对pp字段进行初始化设置
pp.init(i)
atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
}
// _g_是g0
_g_ := getg()
// 初始时m都还未绑定p,不会进入到这个分支中,程序启动之后,在设置GOMAXPROCS
// 有可能进入下面的分支
if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs {
// 继续保持之前的p为_Prunning状态
_g_.m.p.ptr().status = _Prunning
_g_.m.p.ptr().mcache.prepareForSweep()
} else { // 初始化的时候会进入到这个分支
// 但初始化的时候_g_.m(g0.m)也就是m0还是未绑定p 所以不会走到这个if里面
if _g_.m.p != 0 {
if trace.enabled {
traceGoSched()
traceProcStop(_g_.m.p.ptr())
}
_g_.m.p.ptr().m = 0
}
// 清理m0.p和mcache
_g_.m.p = 0
_g_.m.mcache = nil
// 从全局p队列allp中取第一个p与m0进行绑定
p := allp[0]
p.m = 0
// 设置p的状态为_Pidle
p.status = _Pidle
// 将p就是allp[0]与m0互相绑定,并将p的状态修改为_Prunning
acquirep(p)
if trace.enabled {
traceGoStart()
}
}
// 将多余的p进行销毁
for i := nprocs; i < old; i++ {
p := allp[i]
p.destroy()
}
// Trim allp.
if int32(len(allp)) != nprocs {
lock(&allpLock)
allp = allp[:nprocs]
unlock(&allpLock)
}
var runnablePs *p
// 将除allp[0](与m0绑定的p)之外的所有的p放入到空闲链表中
for i := nprocs - 1; i >= 0; i-- {
p := allp[i]
// 如果当前的p是allp[0],跳过,因为它在前面已经绑定到m0上了
if _g_.m.p.ptr() == p {
continue
}
// 设置p的状态为_Pidle
p.status = _Pidle
// 判断当前的p的本地队列中是不是没有g,初始时p中是没有g的
if runqempty(p) {
// p的本地队列没有g,将其放入到全局空闲p队列(sched.pidle)中
pidleput(p)
} else {
// 从全局的空闲m队列(sched.midle)中拿出一个m,将这个m绑定到p上
p.m.set(mget())
// 将所有可运行的p通过p.link串联起来,构成了一个链表结构
// 例如 allp[1].link --> allp[2] allp[2].link-->allp[3]
p.link.set(runnablePs)
runnablePs = p
}
}
stealOrder.reset(uint32(nprocs))
var int32p *int32 = &gomaxprocs
// gomaxprocs设置为nprocs
atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs))
// 返回可运行p链表头节点
return runnablePs
}
至此,已完成g0和m0的互相绑定,m0和allp[0]的互相绑定,nprocs个p的创建与初始化。得到调度器各个组件的关系如下图所示。
前面说了Go进程启动的核心处理流程有5个步骤,main goroutine的创建对应的是里面的第4个步骤,这里为了看起来方便,将rt0_go中main goroutine创建逻辑重新摘录出来。下面的汇编语句做的一件事就是调用newproc函数创建一个新的goroutine用来执行mainPC所对应的runtime.main函数。而runtime.main函数又会调用我们用户编写的package main包中main函数,这样我们的用户代码就可以被执行到了。
TEXT runtime·rt0_go(SB),NOSPLIT,$0
...
// runtime.mainPC是runtime.main函数,将runtine.main的地址拷贝到AX寄存器
MOVQ $runtime·mainPC(SB), AX // entry
// 将AX入栈,AX中存储的是runtime.main函数的地址,也就是下面将要开启新goroutine需要执行的函数
// newproc的第二个参数
PUSHQ AX
// newproc的第一个参数入栈,该值表示runtime.main函数占用的大小,因为runtime.main函数没有入参
// 所以这里是0
PUSHQ $0 // arg size
// 创建main goroutine
CALL runtime·newproc(SB)
...
下面是newproc函数的实现,可以看到它有两个入参,第二个参数fn表示新创建出来的goroutine将从fn这个函数开始执行,第一个参数siz表示函数fn的参数占用的内存字节数。这里举个例子,方便我们理解,下面go add(1,2)会调用newproc创建一个协程,参数fn=&funcval{fn:add},siz=16. fn是一个结构体,它里面的fn字段就是新建协程调用的函数名称,对应到这里就是add函数。siz为add函数的参数占用的内存,这里add函数的参数是两个int64,所以占用的内存大小为16个字节,即siz为16.
需要传递函数参数大小siz给newproc函数的原因是,newproc函数将创建一个新的goroutine来执行fn函数,这个新创建的goroutine与当前程序正在执行的goroutine会使用不同的栈,它在运行的时候,需要知道fn函数参数的大小,而newproc函数自己不知道需要拷贝多少个字节的参数数据到新创建的goroutine栈上,所以用参数siz指明要拷贝的字节数。
func add(a, b int64) int64 {
return a + b
}
func main() {
go add(1, 2)
}
newproc函数在获取到fn函数的第一个参数地址(argp)后,调用systemstack切换到g0栈执行newproc1函数,并将argp参数传递给newproc1。
func newproc(siz int32, fn *funcval) {
// argp指向第一个fn的第一个参数,siz说明fn参数的个数
// 函数调用参数入栈顺序是从右向左
argp := add(unsafe.Pointer(&fn), sys.PtrSize)
// 获取正在运行的g,初始运行是main goroutine,此时的gp就是m0中的g0
gp := getg()
// 将调用newproc时由call指令压栈的函数返回地址,初始时,pc的值是
// CALL runtime.newproc(SB)指令后面的指令的地址
pc := getcallerpc()
// systemstack表示切换到g0栈运行,初始时执行到这里的时候已经在g0栈,
// 所以直接调用newproc1
systemstack(func() {
newproc1(fn, argp, siz, gp, pc)
})
}
newproc1函数的主要功能是从堆上分配一个g对象newg,并为该对象分配一个大小为2048字节的栈空间,其次初始化它的stack成员变量,newg.stack.hi和newg.stack.lo分别指向栈的栈底(高地址)和栈顶(低地址)位置, 然后将newg需要执行函数的参数从newproc函数栈拷贝到newg栈中,最后将newg的状态修改为_Grunnable,并把它加入到等待运行的g队列中。
func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerpc uintptr) {
// _g_为g0,初始时g0也是m0.go
_g_ := getg()
if fn == nil {
_g_.m.throwing = -1
throw("go of nil func value")
}
acquirem()
siz := narg
siz = (siz + 7) &^ 7
// _StackMin=2048, sys.RegSize=8
// siz>=2048-4*8-8=2048-40=2008
// 检查参数的大小不能超过2048-4*8-8=2048-40=2008 Bytes
if siz >= _StackMin-4*sys.RegSize-sys.RegSize {
throw("newproc: function arguments too large for new goroutine")
}
// 获取与当前g0(_g_)关联的m的绑定的p,初始时,_g_.m也就是m0
// m0绑定的p为allp[0]
_p_ := _g_.m.p.ptr()
// 从本地队列_p_中获取一个g,如果本地队列中没有g,从全局队列中获取
newg := gfget(_p_)
// 如果从_p_的本地队列和全局队列中都没有获取到g,则新创建一个g
// 初始时,是没有g的,所以会进入下面的if逻辑,新建一个main goroutine对象
if newg == nil {
// 申请创建一个新的g,栈的大小为2KB
newg = malg(_StackMin)
casgstatus(newg, _Gidle, _Gdead)
// 将当前新创建的newg加入到全局g列表allgs中
allgadd(newg)
}
if newg.stack.hi == 0 {
throw("newproc1: newg missing stack")
}
if readgstatus(newg) != _Gdead {
throw("newproc1: new g is not Gdead")
}
// 设置一定的保留空间
totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize
// 保留空间大小按系统对齐
totalSize += -totalSize & (sys.SpAlign - 1)
// 确定sp的位置
sp := newg.stack.hi - totalSize
spArg := sp
if usesLR {
*(*uintptr)(unsafe.Pointer(sp)) = 0
prepGoExitFrame(sp)
spArg += sys.MinFrameSize
}
if narg > 0 {
// 将参数argp从运行newproc函数的栈拷贝到要运行g的栈中
// 初始时就是从m0.g0栈拷贝参数到新运行g的栈
memmove(unsafe.Pointer(spArg), argp, uintptr(narg))
if writeBarrier.needed && !_g_.m.curg.gcscandone {
f := findfunc(fn.fn)
stkmap := (*stackmap)(funcdata(f, _FUNCDATA_ArgsPointerMaps))
if stkmap.nbit > 0 {
bv := stackmapdata(stkmap, 0)
bulkBarrierBitmap(spArg, spArg, uintptr(bv.n)*sys.PtrSize, 0, bv.bytedata)
}
}
}
// 清理掉newg中sched中内容,下面会对sched重新赋值
memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
// 设置newg的sched各字段的值,调度器根据这些字段把newg调度到CPU上运行
newg.sched.sp = sp
newg.stktopsp = sp
// 设置程序计数器pc,当newg被调度起来运行时,从这个地址开始执行指令
// pc的值为goexit函数偏移1(sys.PCQuantum)的位置
newg.sched.pc = funcPC(goexit) + sys.PCQuantum
// 设置sched g的地址为当前g的地址
newg.sched.g = guintptr(unsafe.Pointer(newg))
// 调整sched成员和newg的栈
gostartcallfn(&newg.sched, fn)
newg.gopc = callerpc
newg.ancestors = saveAncestors(callergp)
// 设置newg的startpc为fn.fn,该字段用于函数调用的traceback和栈收缩
newg.startpc = fn.fn
if _g_.m.curg != nil {
newg.labels = _g_.m.curg.labels
}
if isSystemGoroutine(newg, false) {
atomic.Xadd(&sched.ngsys, +1)
}
// 设置g的状态为_Grunnable,意味着该g可以运行了
casgstatus(newg, _Gdead, _Grunnable)
if _p_.goidcache == _p_.goidcacheend {
_p_.goidcache = atomic.Xadd64(&sched.goidgen, _GoidCacheBatch)
_p_.goidcache -= _GoidCacheBatch - 1
_p_.goidcacheend = _p_.goidcache + _GoidCacheBatch
}
// 设置g的唯一标识goid
newg.goid = int64(_p_.goidcache)
_p_.goidcache++
if raceenabled {
newg.racectx = racegostart(callerpc)
}
if trace.enabled {
traceGoCreate(newg, newg.startpc)
}
// 将newg加入到队列中,具体尝试放入队列的顺序为_p_.runnext、本地队列、全局队列
// 如果_p_.runnext为空直接将newg放在_p_.runnext位置,如果_p_.runnext不为空
// 将newg与_p_.runnext交换,原来_p_.runnext的g尝试放入p的本地队列runq中
// 如果本地队列满了,会将runq中一半的元素转移到全局队列中
// 初始时,_p_.runnext是空的,所以直接放在这里
runqput(_p_, newg, true)
// 唤醒一个m来运行g,初始时不会执行,因为mainStarted为false,即runtime包中的main函数还未执行
if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted {
wakep()
}
releasem(_g_.m)
}
newproc1函数中有两条非常关键的语句,这里拿出来进行单独说明. 我们先来看newg.sched字段的功能,查看结构体g的定义,g的sched字段是gobuf类型,它保存的是goroutine的调度信息,重点就是保存几个关键寄存器的值。Go程序goroutine调度的本质是一组CPU寄存器和执行流的切换,当前我们执行某个g的时候,将BP,SP等寄存器设置为合适的值,将程序计数器pc指向g中的函数地址,这样g就被调度运行起来了,当要切换出当前g换其他g运行的时候,需要将当前g的CPU寄存器等信息保存到内存中,以供下次运行该g的时候,直接将保存到内存中的信息恢复到寄存器中又可以运行了。
可以看到这里将newg.sched.pc的值设置为了goexit函数的第二条指令,sys.PCQuantum的值为1,funcPC(goexit) + sys.PCQuantum指向的是goexit的第二条指令。为什么要这样设置呢?理论上不应该是设置为fn.fn吗,下面通过分析gostartcallfn函数一探究竟。
...
newg.sched.pc = funcPC(goexit) + sys.PCQuantum
...
gostartcallfn(&newg.sched, fn)
gostartcallfn调整sched成员和newg的栈,核心处理调用的是gostartcall函数。gostartcall函数将栈顶寄存器SP向下移动一个指针的位置,然后将goexit+1即goexit的第二条指令。然后将buf.pc即newg.sched.pc重新设为fn(runtime.main函数)。相当于将goexit放到newg的栈顶,伪造成newg是被goeixt函数调用的,当newg中的fn函数执行完成之后,返回到goexit继续执行,做一些清理的操作。
func gostartcallfn(gobuf *gobuf, fv *funcval) {
var fn unsafe.Pointer
// 初始化时fn为runtime.main
if fv != nil {
fn = unsafe.Pointer(fv.fn)
} else {
fn = unsafe.Pointer(funcPC(nilfunc))
}
gostartcall(gobuf, fn, unsafe.Pointer(fv))
}
func gostartcall(buf *gobuf, fn, ctxt unsafe.Pointer) {
// newg的栈顶,当前newg栈上只有fn函数的参数,sp指向的是fn的第一个参数
sp := buf.sp
if sys.RegSize > sys.PtrSize {
sp -= sys.PtrSize
*(*uintptr)(unsafe.Pointer(sp)) = 0
}
// 为返回地址预留8字节空间
sp -= sys.PtrSize
// buf.pc保存的是goexit+1的地址,将buf.pc赋值给sp指向的内存,下面真正赋值给buf.pc的是fn函数,
// 初始时是runtime包中的main函数,这样使得fn执行完后返回到goexit继续执行,从而完成清理工作
// 将fn伪装成是被goexit函数调用的
*(*uintptr)(unsafe.Pointer(sp)) = buf.pc
// 重新设置newg的栈顶寄存器
buf.sp = sp
// 真正将newg的pc值设置为fn函数,等到newg被调度起来运行时,调度器会将buf.pc放入cpu的ip寄存器
// 即开始执行fn函数逻辑
buf.pc = uintptr(fn)
buf.ctxt = ctxt
}
结构体gobuf用于保存goroutine的调度信息,主要包含几个关键CPU寄存器的值
type gobuf struct {
// 栈指针,保存rsp寄存器的值
sp uintptr
// 程序计数器,保存rip寄存器的值
pc uintptr
// 与gobuf关联的goroutine地址
g guintptr
ctxt unsafe.Pointer
// 系统调用的返回值
ret sys.Uintreg
lr uintptr
bp uintptr
}
分析完了newproc1整理出来流程,现在再后头来看将newg加入到等待运行队列函数runqput实现细节。
runqput将g放入到p的本地队列中,传入参数有3个,_p_为g将放入哪个p的本地队列,gp为待放入的g, next为true表示将gp放入到_p_的runnext中,如果_p_的runnext中已经有g,会进行替换,之前的g会放入到_p_的本地队尾,如果next为false,直接将gp放入到本地队列_p_的队尾.如果本地队列满了,将当前P中前len(p)/2个G批量放入到全局队列中。
func runqput(_p_ *p, gp *g, next bool) {
if randomizeScheduler && next && fastrand()%2 == 0 {
next = false
}
// 如果next为true,将gp放入到_p_.runnext中,这个过程通过cas保证原子性
if next {
retryNext:
// 对_p_.runnext进行备份
oldnext := _p_.runnext
// 通过cas操作,将gp和oldnext进行交换
if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
goto retryNext
}
// 如果oldnext为0,说明_p_.runnext之前没有g,现在已放入完毕,直接返回
if oldnext == 0 {
return
}
// 将之前的g赋值给gp,下面会将gp放入_p_的本地队列
gp = oldnext.ptr()
}
retry:
// h为本地队列的队头
h := atomic.LoadAcq(&_p_.runqhead)
// t为本地队列的队尾
t := _p_.runqtail
// t-h小于队列的长度,即本地队列还没有满,放到本地队列的尾部
if t-h < uint32(len(_p_.runq)) {
_p_.runq[t%uint32(len(_p_.runq))].set(gp)
atomic.StoreRel(&_p_.runqtail, t+1)
return
}
// 走到这里说明本地队列满了,放到全局队列, 放入到全局队列并不是一个,而是将当前P中前len(p)/2个G批量放入到global queue中
if runqputslow(_p_, gp, h, t) {
return
}
// 走到这里说明往全局队列中没有放成功,没有成功的原因是,本地队列没有满,所以进一步重试,
// 尝试放入本地队列
goto retry
}
runqputslow将_p_本地队列中一半数量的g(另外加上gp)移动到全局队列中,在上面的runqput函数处理逻辑中,如果本地队列满了,会执行runqputslow.
func runqputslow(_p_ *p, gp *g, h, t uint32) bool {
// 存储要移动的g,移动的数量为本地队列的一半+1个,这里的1是为传入的gp分配一个位置
var batch [len(_p_.runq)/2 + 1]*g
n := t - h
n = n / 2
// 确保n为本地队列的长度的一半
if n != uint32(len(_p_.runq)/2) {
throw("runqputslow: queue is not full")
}
// 将队头的n个g存储到batch中
for i := uint32(0); i < n; i++ {
batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr()
}
// 原子更新_p_队列队头的位置,队头向后移动n个位置
if !atomic.CasRel(&_p_.runqhead, h, h+n) {
return false
}
// 将传入的gp放入到batch的末尾
batch[n] = gp
// 如果要随机化调度,打乱batch中元素的顺序
if randomizeScheduler {
for i := uint32(1); i <= n; i++ {
j := fastrandn(i + 1)
batch[i], batch[j] = batch[j], batch[i]
}
}
// 将batch中的g串起来,构成一个链表,因为batch中有n+1元素
// 所以这里循环n次,就将n+1个组成了链表结构
for i := uint32(0); i < n; i++ {
batch[i].schedlink.set(batch[i+1])
}
var q gQueue
// 将链表的头节点和尾节点加入到q中,方便一次性将batch中的g加入到全局队列
q.head.set(batch[0])
q.tail.set(batch[n])
lock(&sched.lock)
// 将g一次性批量放入全局队列
globrunqputbatch(&q, int32(n+1))
unlock(&sched.lock)
return true
}
func globrunqputbatch(batch *gQueue, n int32) {
// 将batch一次性放入sched.runq中
sched.runq.pushBackAll(*batch)
// 更新sched.runq中g的数量
sched.runqsize += n
*batch = gQueue{}
}
// 将q2链表中的g加入到全局队列中
func (q *gQueue) pushBackAll(q2 gQueue) {
if q2.tail == 0 {
return
}
q2.tail.ptr().schedlink = 0
// 直接将全局队列的队尾连接到q2的头节点,这样q2就加入了全局g链表中
if q.tail != 0 {
q.tail.ptr().schedlink = q2.head
} else {
q.head = q2.head
}
// 更新全局链表尾节点的位置,指向q2的尾部
q.tail = q2.tail
}
通过流程图的形式将runqput处理逻辑描述出来,得到如下流程图。
经过上面的处理逻辑,main goroutine(newg)就被创建了出来,此时主线程、g0栈和main goroutine栈之间的关系如下图所示。
目前已分析完Go进程启动的核心处理流程的前四个步骤,现在来看最后一个步骤:调用runtime.mstart启动工作线程m,进入调度系统.
mstart函数设置了g的堆栈保护字段stackguard0和stackguard1之后,直接调用了函数mstart1。
func mstart() {
// _g_为g0
_g_ := getg()
// 初始时,g0的stack.lo已完成初始化,它不等于0
osStack := _g_.stack.lo == 0
if osStack {
size := _g_.stack.hi
if size == 0 {
size = 8192 * sys.StackGuardMultiplier
}
_g_.stack.hi = uintptr(noescape(unsafe.Pointer(&size)))
_g_.stack.lo = _g_.stack.hi - size + 1024
}
// 初始化堆栈保护
_g_.stackguard0 = _g_.stack.lo + _StackGuard
_g_.stackguard1 = _g_.stackguard0
mstart1()
...
mexit(osStack)
}
mstart1会检测当前的g是否是g0,设置工作线程m的备用信号堆栈和信号掩码,如果当前的m是m0,做一些特殊处理,设置系统信号量的处理函数,检测m是否有起始任务函数,如果有就执行它,例如sysmon函数,获取一个p并将它与m进行绑定,最后执行调度程序函数schedule。可以看到mstart1还没有进行真正的调度,而是为调度做一些处理工作。
func mstart1() {
// 初始时,_g_为m0中的g0,其他情况下_g_也是各个m的g0
_g_ := getg()
// 确保g是系统栈上的g0,调度器只在g0上直执行
if _g_ != _g_.m.g0 {
throw("bad runtime·mstart")
}
// getcallerpc()获取mstart1执行完的返回地址
// getcallersp()获取调用mstart1时的栈顶地址
// 保存g0调度信息
save(getcallerpc(), getcallersp())
asminit()
// 初始化m,主要设置线程的备用信号堆栈和掩码
minit()
// 如果当前的m是m0,执行mstartm0操作
if _g_.m == &m0 {
// 对于初始m,需要设置系统信号量的处理函数
mstartm0()
}
// 对于m0是没有mstartfn函数,对其他m如果有起始任务函数,则需要执行,比如sysmon函数
if fn := _g_.m.mstartfn; fn != nil {
fn()
}
// 如果当前g的m不是m0,它现在还没有p,需要获取一个p, m0已经绑定了allp[0],所以不用关心m0
if _g_.m != &m0 {
// 完成_g_.m和p的互相绑定
acquirep(_g_.m.nextp.ptr())
_g_.m.nextp = 0
}
// 调用调度函数schedule,该函数不会返回
schedule()
}
schedule函数的核心功能就是竭尽所能找到一个可运行的g,然后调用execute执行g中的函数。查找g的算法流程如下:
func schedule() {
// _g_为每个工作线程m对应的g0,初始时_g_为m0的g0
_g_ := getg()
...
var gp *g
var inheritTime bool
...
if gp == nil && gcBlackenEnabled != 0 {
gp = gcController.findRunnableGCWorker(_g_.m.p.ptr())
tryWakeP = tryWakeP || gp != nil
}
if gp == nil {
// 每进行61次调度时优先从全局队列中获取待运行的g, 这样做是为了保证调度的公平性
// 防止全局队列中的g得不到调度饿死
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
// 先对全局队列加锁然后从中获取g
lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
}
}
if gp == nil {
// 从与m绑定的p的本地队列中获取g
gp, inheritTime = runqget(_g_.m.p.ptr())
}
if gp == nil {
// 走到这里说明,从全局队列或p的本地队列中都没有获取到g,则调用findrunnable函数
// 从其他工作线程绑定的p的本地队列中偷取g,如果没有偷取到,也就是当前都没有待运行的g
// 了,当前的工作线程进入睡眠模式,直到获取到运行的g之后findrunnable函数才会返回
gp, inheritTime = findrunnable()
}
...
// 当前运行的是runtime的代码,函数调用栈使用的是g0的栈空间,调用execute会切换到
// 用户程序创建的g(gp)的代码和栈空间去运行
execute(gp, inheritTime)
}
上面介绍了schedule函数整体是怎么找g的流程算法,其实就是GMP调度模型中的调度策略(找G的策略),有3大策略,对应到上面的处理函数分别是globrunqget、runqget和findrunnable。下面分别详细介绍每个策略的实现细节。
globrunqget从全局队列最多获取max个g,并将获取的max-1个g放入_p_的本地队列,并将剩下的1个作为返回值给调用方进行调度。该函数是一个通用函数,也就是一次可以从全局队列中搬多个g到本地队列,进行批量搬运。考虑到公平性,搬运的数量有一定限制,将全局队列中g的数量根据p的数量均分,每个p能从全局队列中最多获取到sched.runqsize/gomaxprocs + 1,证公平性每个p都有机会从全局队列中拿到一部分g,同时搬运的数量不能超过本地队列长度的一半(128)。如果取回来的g数量超过本地队列_p_可以容纳的怎么解决呢?可以看到下的runqput函数中如果本地队列满了又放回全局队列,所以g不会丢失。
// globrunqget从全局队列最多获取max个g,并将获取的max-1个g放入_p_的本地队列
// 并将剩下的1个作为返回值给调用方进行调度
func globrunqget(_p_ *p, max int32) *g {
// 如果全局队列中没有g,直接返回nil
if sched.runqsize == 0 {
return nil
}
// 将全局队列中g的数量根据p的数量均分,每个p能从全局队列中最多获取到sched.runqsize/gomaxprocs + 1
// 个g,这样保证公平性,每个p都有机会从全局队列中拿到一部分g
n := sched.runqsize/gomaxprocs + 1
// 进一步判断n不能大于全局队列中g的数量,这种情况在gomaxprocs为1的时候会出现n>全局队列中g的数量
if n > sched.runqsize {
n = sched.runqsize
}
// 调整n的值不超过max,max作为入参的含义是一次性从全局队列取走的g的最大数量
if max > 0 && n > max {
n = max
}
// 继续调整n的值,n的值不能超过本地队列长度的一半,本地队列_p_.runq为一个256的数组
// 它的一半为128,即n不能超过128,将_p_.runq作为一个环形数组循环使用的,这里有一种担心
// 如果取回来的g数量超过本地队列_p_可以容纳的怎么解决呢?可以看到下的runqput函数中如果本地
// 队列满了又放回全局队列,所以g不会丢失
if n > int32(len(_p_.runq))/2 {
n = int32(len(_p_.runq)) / 2
}
// 全局队列中的g的数量减少n个
sched.runqsize -= n
// 将全局队列的对头的g出队,直接通过函数返回gp,其他的g通过runqput放入_p_的本地队列
gp := sched.runq.pop()
n--
// 循环n次从全局队列中取出n个g并放入本地队列
for ; n > 0; n-- {
gp1 := sched.runq.pop()
// 将从全局队列获取的gp1放入到_p_的本地队列中
runqput(_p_, gp1, false)
}
return gp
}
runqget从_p_的本地队列中取走一个g返回。优先从p的runnext中获取,runnext只保存一个g,当runnext没有g时才会从runq中获取。当循环队列的队头与队尾相同时,表明队列中没有可运行的g。否则,从队头获取一个g返回。因为存在其他P偷取g造成同时访问的情况,所以写了两个for{}死循环,通过cas手段保证并发安全。
// runqget从_p_的本地队列中取走一个g返回,注意它有两个返回值,第一个返回值就是获取到的
// g,第二参数是bool类型,表示返回的gp的是否应该继承当前时间片中的剩余时间,如果为true
// 表示继承当前时间片的剩余时间,返回false表示开启一个新的时间片
func runqget(_p_ *p) (gp *g, inheritTime bool) {
// If there's a runnext, it's the next G to run.
// 上来就是两个for{}循环
for {
// 优先从p的runnext中获取,runnext只保存一个g,当runnext没有g时
// 才会从runq中获取
next := _p_.runnext
if next == 0 {
break
}
// 获取的过程是cas操作,防止并发问题
if _p_.runnext.cas(next, 0) {
return next.ptr(), true
}
}
for {
// 从循环队列中获取一个g
h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
t := _p_.runqtail
// h为一个下标,表示小于h下标 runq中的g已取走
// t表示从[h,t)范围 runq中的g还未取走
// t与h相等表示本地队列已经空了
if t == h {
return nil, false
}
// 将runq中h对应下标的g取出
gp := _p_.runq[h%uint32(len(_p_.runq))].ptr()
// 原子性自增_p_.runqhead
if atomic.CasRel(&_p_.runqhead, h, h+1) { // cas-release, commits consume
return gp, false
}
}
}
findrunnable从其他p中偷取可以运行的g,在从其他p偷取g前会当前是否有已经准备好运行的网络协程,如果有返回一个可运行的协程。并通过injectglist函数将其余协程放入全局运行队列等待被调度。如果没有可以运行的网络协程,全局运行队列中也没有可运行的g,当前也没有垃圾回收中进行标记的g需要运行,这时会调用runqsteal从其他p中的本地队列中偷取它里面一半数量的g到当前的本地队列。
func findrunnable() (gp *g, inheritTime bool) {
_g_ := getg()
top:
_p_ := _g_.m.p.ptr()
// 检查是否在处理gc
if sched.gcwaiting != 0 {
gcstopm()
goto top
}
if _p_.runSafePointFn != 0 {
runSafePointFn()
}
...
// 对本地队列再进行一次检查,查看是否有待运行的g
if gp, inheritTime := runqget(_p_); gp != nil {
return gp, inheritTime
}
// 查看全局队列是否有待运行的g
if sched.runqsize != 0 {
lock(&sched.lock)
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
if gp != nil {
return gp, false
}
}
// 从网络io轮询器中找到准备就绪的g,把这个g变为可运行的g
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
if list := netpoll(0); !list.empty() {
gp := list.pop()
// 把找到的可运行的网络io的g列表插入到全局队列
injectglist(&list)
// 将gp的状态从_Gwaiting修改为_Grunnable
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
}
procs := uint32(gomaxprocs)
ranTimer := false
// 如果当前的m没有自旋,并且工作中的p的数量小于正在自旋的m数的2倍,就不让它也在进行自旋了,
// 主要是为了防止有过多的m在寻找可以运行的g而消耗太多的cpu
if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {
goto stop
}
// 将当前的m自旋
if !_g_.m.spinning {
// 设置m的状态为自旋状态
_g_.m.spinning = true
// 处于自旋状态的m的数量+1
atomic.Xadd(&sched.nmspinning, 1)
}
// 从其他的p的本地运行队列中偷取g,p的选择是随机的,尝试4次
for i := 0; i < 4; i++ {
for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
if sched.gcwaiting != 0 {
goto top
}
stealRunNextG := i > 2
p2 := allp[enum.position()]
if _p_ == p2 {
continue
}
// 从p2偷取它本地队列一半数量的g到_p_中
if gp := runqsteal(_p_, p2, stealRunNextG); gp != nil {
return gp, false
}
...
}
}
...
// 垃圾回收中有进行标记工作的g需要运行,调度运行标记工作的g
if gcBlackenEnabled != 0 && _p_.gcBgMarkWorker != 0 && gcMarkWorkAvailable(_p_) {
_p_.gcMarkWorkerMode = gcMarkWorkerIdleMode
gp := _p_.gcBgMarkWorker.ptr()
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
...
// 走到这里,说明前面都没有找到可运行的g,将当前的工作线程与p之间的绑定解除
if releasep() != _p_ {
throw("findrunnable: wrong p")
}
// 把与当前m绑定的p放入空闲队列中,m接下来要准备休眠了
pidleput(_p_)
unlock(&sched.lock)
wasSpinning := _g_.m.spinning
if _g_.m.spinning {
// 取消m的自旋状态标记
_g_.m.spinning = false
// 处于自旋状态的m数量减1
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("findrunnable: negative nmspinning")
}
}
// 再一次检查所有的p中是否有有可用运行的g
for _, _p_ := range allpSnapshot {
...
}
// 再次检查是有垃圾回收中进行标记的g需要运行
if gcBlackenEnabled != 0 && gcMarkWorkAvailable(nil) {
...
}
...
// 当前的m进入休眠
stopm()
goto top
}
// runqsteal从p2的本地队列中偷取它里面一半数量的g到_p_的本地队列
func runqsteal(_p_, p2 *p, stealRunNextG bool) *g {
t := _p_.runqtail
// 从p2中偷取一批g放入到p中
n := runqgrab(p2, &_p_.runq, t, stealRunNextG)
if n == 0 {
// 没有偷取到,直接返回
return nil
}
// 这里减1个是因为函数会返回一个g
n--
gp := _p_.runq[(t+n)%uint32(len(_p_.runq))].ptr()
// 只偷取到了1个g,直接返回它
if n == 0 {
return gp
}
// 检查_p_的本地队列数量是否越界
h := atomic.LoadAcq(&_p_.runqhead)
if t-h+n >= uint32(len(_p_.runq)) {
throw("runqsteal: runq overflow")
}
// 更新_p_本地队列队尾的位置
atomic.StoreRel(&_p_.runqtail, t+n)
return gp
}
// runqgrab从_p_的本地运行队列中选择一批g放入到batch中
func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 {
for {
// 获取_p_本地队列队头的位置,原子操作
h := atomic.LoadAcq(&_p_.runqhead)
// 获取_p_本地队列队尾的位置,原子操作
t := atomic.LoadAcq(&_p_.runqtail)
// n为_p_的本地队列中有多少个可待运行的g
n := t - h
// n为队列中g数量的一半
n = n - n/2
if n == 0 {
// 走进这里,说明n为0,什么时候n为0呢,就是h=t的时候,也就是_p_的本地队列中没有g的时候
// 如果设置了从_p_.runnext(stealRunNextG为true)获取,尝试从runnext获取g
if stealRunNextG {
// 将_p_.runnext中的g放入到batch中
if next := _p_.runnext; next != 0 {
if _p_.status == _Prunning {
if GOOS != "windows" {
usleep(3)
} else {
osyield()
}
}
if !_p_.runnext.cas(next, 0) {
continue
}
batch[batchHead%uint32(len(batch))] = next
return 1
}
}
// 说明_p_.runnext中也没有g,直接返回
return 0
}
// 进一步判断n不能超过_p_本地队列中g数量的一半,为什么要做这个判断呢?
// 因为在前面读取runqhead和runqtail的两个操作整体看不是原子性的,虽然它们单独是原子性的
// 也就是说在这两个操作的中间有可能有其他的线程也在偷取g或向里面放g,导致计算不一致,所以这里
// 进一步检查
if n > uint32(len(_p_.runq)/2) {
continue
}
// 从_p_本地队列搬n个g到batch中
for i := uint32(0); i < n; i++ {
g := _p_.runq[(h+i)%uint32(len(_p_.runq))]
batch[(batchHead+i)%uint32(len(batch))] = g
}
// 更新_p_本地队列队头的位置,向后移动n个位置
if atomic.CasRel(&_p_.runqhead, h, h+n) {
return n
}
}
}
分析完了怎么选取运行g的问题,现在在回到schedule调度函数,分析它是如何执行要运行g的,也就是对应到schedule中的execute处理逻辑。
execute切换到选择的要运行的g的栈执行,设置g为运行状态和是否被抢占等信息之后,调用gogo函数从当前g0栈切换到要运行g的栈,真正开始执行用户程序。
func execute(gp *g, inheritTime bool) {
// _g_为g0
_g_ := getg()
// 设置当前m运行用户程序的g位gp,即将执行用户程序的g和m绑定
_g_.m.curg = gp
// 设置执行用户程序的g(gp)的m为_g_.m,这样gp和当前的m相互绑定
gp.m = _g_.m
// 将gp的状态从_Grunnable修改为_Grunning,意味着gp马上会得到执行
casgstatus(gp, _Grunnable, _Grunning)
gp.waitsince = 0
// 设置gp的抢占标志为false
gp.preempt = false
gp.stackguard0 = gp.stack.lo + _StackGuard
if !inheritTime {
_g_.m.p.ptr().schedtick++
}
hz := sched.profilehz
if _g_.m.profilehz != hz {
setThreadCPUProfiler(hz)
}
if trace.enabled {
if gp.syscallsp != 0 && gp.sysblocktraced {
traceGoSysExit(gp.sysexitticks)
}
traceGoStart()
}
// gogo将从g0栈切换到执行用户程序的gp,真正开始执行用户程序
gogo(&gp.sched)
}
gogo函数是用汇编语言实现的,具体源码解析如下,切换原理就是将要运行g的调度信息g.sched从内存中恢复到CPU寄存器,设置SP和IP等寄存器的值,跳转到要运行的位置开始执行指令。总之一句话,gogo函数完成了从g0到用户g的切换,即CPU执行权的转让以及栈的切换。
TEXT runtime·gogo(SB), NOSPLIT, $16-8
// 0(FP)表示第一个参数,即buf=&gp.sched
// BX=buf
MOVQ buf+0(FP), BX // gobuf
// DX=buf.g=&gp.sched.g
MOVQ gobuf_g(BX), DX
MOVQ 0(DX), CX // make sure g != nil
get_tls(CX)
// 把g放入到tls[0],即把要运行的g的指针放进线程本地存储,后面的代码可以通过本地线程存储
// 获取到当前正在执行的goroutine的地址,在这之前,本地线程存储中存放的是g0的地址
MOVQ DX, g(CX)
// SP=buf.sp=&gp.sched.sp,即把CPU的栈顶寄存器SP设置为gp.sched.sp,成功完成从
// g0栈切换到gp栈
MOVQ gobuf_sp(BX), SP // restore SP
// 恢复调度上下文到CPU对应的寄存器
// 将系统调用的返回值放入到AX寄存器中
MOVQ gobuf_ret(BX), AX
MOVQ gobuf_ctxt(BX), DX
MOVQ gobuf_bp(BX), BP
// 前面已经将调度相关的值都放入到CPU的寄存器中了,将gp.sched中的值清空,这样可以减轻gc的工作量
MOVQ $0, gobuf_sp(BX)
MOVQ $0, gobuf_ret(BX)
MOVQ $0, gobuf_ctxt(BX)
MOVQ $0, gobuf_bp(BX)
// 把gp.sched.pc的值放入到BX寄存器,对于main goroutine,sched.pc中存储的是runtime包
// 中main()函数的地址
MOVQ gobuf_pc(BX), BX
// JMP把BX寄存器中的地址值放入到CPU的IP寄存器中,然后CPU跳转到该地址的位置开始执行指令
// 即跳转到main()函数执行代码
JMP BX
execute调用gogo从g0栈切换到用户g栈,在刚开始时,队列中只有一个main goroutine,所以第一个被运行的用户g就是main goroutine.执行完gogo后会跳转到runtime.main(main goroutine的function)运行。
runtime.main函数会调用我们main包中的main函数,在调用main包中main函数前,会做一些其他工作,主要有:
执行完上述准备逻辑之后,才开始真正执行main.main函数,从main.main函数返回后调用exit(0)系统调用退出进程.如果exit(0)进程没退出,通过for循环一直访问非法地址,正常情况下,一但出现非法地址访问,系统就会把该进程杀死,用这样的方法来确保进程退出。
可以看到runtime.main执行完main.main函数之后,此时函数调用链是schedule()-->execute()-->gogo()-->runtime.main()-->main.main(),直接调用exit系统调用结束进程了。读到这里有同学可能疑问?前面不是说在创建goroutine的时候伪造成g是被goexit函数调用的,按理说执行完g逻辑之后,要返回goexit函数继续执行。这是对main goroutine以外的其他goroutine准备的,对于非main goroutine执行完之后会返回到goexit函数继续执行,而main goroutine执行完直接结束整个进程了,因为main goroutine执行完意味着程序结束了,所以不用再做其他处理了,也就无需再回到goexit函数继续执行。
func main() {
// 这里的g位于main goroutine
g := getg()
g.m.g0.racectx = 0
// 设置栈大小的最大值,对于32系统,栈的大小不能超过250M,对于64位系统,栈的大小不超过1G
if sys.PtrSize == 8 {
maxstacksize = 1000000000
} else {
maxstacksize = 250000000
}
// mainStarted为全局变量,表示main goroutine现在已经开始运行了
// 现在可以允许其他创建的goroutine启动新的m来运行创建的g了
mainStarted = true
// wasm架构不支持线程,所以也就没有监控线程sysmon
if GOARCH != "wasm" {
// 切换到g0栈,启动一个监控线程
systemstack(func() {
// 传递给newm的第二参数p为nil,说明该线程不需要与p绑定,即sysmon工作线程
// m不需要调度器的调度,独立于调度器,因为该m只干一件事就是执行sysmon函数,不会
// 执行用户程序的g
newm(sysmon, nil)
})
}
...
runtimeInitTime = nanotime()
//启动垃圾清理的goroutine
gcenable()
...
// 执行package main 中的init函数, main包中引用的依赖包中的init函数也会执行
doInit(&main_inittask)
close(main_init_done)
needUnlock = false
unlockOSThread()
//使用 -buildmode=c-archive 或 c-shared 编译的程序有一个 main,但它不会被执行。
if isarchive || islibrary {
return
}
// 进行间接调用,因为链接器在放置运行时不知道主包的地址
fn := main_main
// 执行main包中的main函数
fn()
...
// 通过exit系统调用退出进程,可以看到main goroutine并没有返回,直接进入系统调用退出程序
exit(0)
for {
var x *int32
// x是一个空指针,是一个非法的地址,给他赋值会导致程序崩溃,操作系统就会把该进程杀死,确保程序一定会退出
*x = 0
}
}
前面分析了main goroutine的退出流程,它执行完main.main()函数后,直接调用exit系统调用结束进程了。而对于非main goroutine来说,执行完用户逻辑之后会回到goexit函数,因为在创建非main goroutine的时候伪造成g是被goexit函数调用的.下面开始分析回到goexit函数做了哪些处理。
执行完用户逻辑回到goexit+PCQuantum,也就是goexit的第二条指令,就是CALL runtime·goexit1(SB). 它直接调用goexit1函数。
TEXT runtime·goexit(SB),NOSPLIT,$0-0
BYTE $0x90 // NOP
// 用户程序执行完返回goexit的第二条指令就是这里
CALL runtime·goexit1(SB)
BYTE $0x90 // NOP
goexit1函数处理一些data race等检查逻辑之后,调用了mcall函数,mcall是汇编实现的,具体实现细节见下面的注解。mcall函数的功能是从当前用户程序g切换到g0上运行,然后在g0栈上执行goexit0函数。概括起来,mcall完成两个主要逻辑:
可以看到mcall完成的功能与前面介绍的gogo函数功能完全相反。gogo函数实现从g0栈到用户程序g的切换,而这里的mcall恰好实现从用户程序g到g0的切换,所以通过gogo和mcall函数,我们可以在runtime代码和用户程序代码之间来回切换。
func goexit1() {
// 这里是检查data race逻辑
...
mcall(goexit0)
}
// 传给mcall函数的参数是一个指向funcval对象的指针
TEXT runtime·mcall(SB), NOSPLIT, $0-8
// 取出参数的值也就是goexit0的地址,放入到DI寄存器中,
MOVQ fn+0(FP), DI
// 获取当前的g,这里的g还是用户程序的g
get_tls(CX)
// 设置AX寄存器的值为g
MOVQ g(CX), AX
// mcall返回地址放入到BX寄存器中
MOVQ 0(SP), BX
// 将当前的调度信息也就是CPU寄存器的值保存到g的sched字段中
// 因为接下来将发送goroutine的切换,从用户程序g切换到g0
// 保存PC
MOVQ BX, (g_sched+gobuf_pc)(AX)
LEAQ fn+0(FP), BX
// 保存SP
MOVQ BX, (g_sched+gobuf_sp)(AX)
// 保存g
MOVQ AX, (g_sched+gobuf_g)(AX)
// 保存BP
MOVQ BP, (g_sched+gobuf_bp)(AX)
// 将g保存到BX寄存器中
MOVQ g(CX), BX
// 将g.m保存到BX寄存器中
MOVQ g_m(BX), BX
// 将g.m.g0保存到寄存器SI中
MOVQ m_g0(BX), SI
// 经过上面3步操作,成功拿到了g0的地址,非常完美
// 比较g0是否与g相等,理论上不应该相等,如果出现相等,一定是有问题
CMPQ SI, AX
JNE 3(PC)
MOVQ $runtime·badmcall(SB), AX
JMP AX
// SI中保存的是g0的地址,经过这步操作将g0的地址放到了线程本地存储中
MOVQ SI, g(CX) // g = m->g0
// 恢复g0的栈顶指针到CPU的SP寄存器中,成功将g0栈放入到CPU上,完成了从
// 用户程序g栈到g0的栈切换
MOVQ (g_sched+gobuf_sp)(SI), SP
// 将goexit0的参数g入栈
PUSHQ AX
// 将funcval对象的指针保存到DX
MOVQ DI, DX
// 获取funcval对象的第一个成员也就是goexit0的地址
MOVQ 0(DI), DI
// 调用goexit0(g)函数
CALL DI
POPQ AX
MOVQ $runtime·badmcall2(SB), AX
JMP AX
RET
goexit0函数把gp(用户程序g)状态从_Grunning修改为_Gdead,然后清理gp对象中保存内容,其次通过函数dropg解除gp和m之间的绑定关系,然后将gp放入到P的freeg队列中缓存起来,以便后续复用,最后调用schedule,进行新一轮调度.
// goexit0函数是在g0上执行的,入参gp是用户程序g
func goexit0(gp *g) {
// _g_为g0
_g_ := getg()
// 将gp的状态从_Grunning修改为_Gdead
casgstatus(gp, _Grunning, _Gdead)
if isSystemGoroutine(gp, false) {
atomic.Xadd(&sched.ngsys, -1)
}
// 清理gp对象中保存内容
gp.m = nil
...
// 如果当前在进行垃圾回收,将gp赋值标记的信息刷新到全局信用池中
if gcBlackenEnabled != 0 && gp.gcAssistBytes > 0 {
scanCredit := int64(gcController.assistWorkPerByte * float64(gp.gcAssistBytes))
atomic.Xaddint64(&gcController.bgScanCredit, scanCredit)
gp.gcAssistBytes = 0
}
// 解除gp和m之间的绑定关系
dropg()
...
// 将gp放入到p的freeg队列中,以便下次可以服用,不用new一个g对象,避免重新申请内存
gfput(_g_.m.p.ptr(), gp)
if locked {
if GOOS != "plan9" {
gogo(&_g_.m.g0.sched)
} else {
_g_.m.lockedExt = 0
}
}
// 再次调用schedule,进行新一轮调度
schedule()
}
至此,我们已经分析完了main goroutine和非main goroutine的整个调度流程,下图概括了调度流中函数调度栈。对于非main goroutine来说,从shedule开始,经过一些列函数调用执行用户的代码,最后又会回到shedule进行新一轮调度,这个过程会进行无数轮这样的循环,而函数的每一次调用会占用一定的内存空间,这样一直循环调度下去那不是会耗尽g0栈的所有空间,太危险了?这里是不会耗尽的,因为每次执行完mcall切换到g0栈时都是从一个固定的g0.sched.sp即固定的g0栈顶位置开始的,所以会重复使用上一轮调度时使用过的栈内存空间,这时没有问题的,因为前一轮调度已经结束了,覆盖掉之前空间的内容是没有问题的。
前面分析了调度器的初始化,main goroutine的创建与运行,调度选g的策略等,下面看在什么时候什么情况下会发生调度,也就是调度的时机。根据调度方式的不同,调度时机分为3种,分别是主动调度、被动调度和抢占调度。
在用户代码中通过调用runtime.Gosched()函数发生主动调度。Gosched函数会调用mcall从当前g的栈切换到m的g0栈执行gosched_m函数,mcall细节前面已经分析过了。下面直接看gosched_m函数的实现,gosched_m调用goschedImpl挂起用户协程g. goschedImpl主要完成以下4个功能:
主动调度的逻辑还是很好理解的,就是当前的g放弃CPU执行权,将其放入到全局运行队列中,因为它还是可以继续运行的,只是我们主动放弃了,等待下次被调度程序执行。
// Gosched提供给用户程序主动让出调度的接口
func Gosched() {
// checkTimeouts为空实现
checkTimeouts()
// 切换到当前m的g0栈执行gosched_m函数
mcall(gosched_m)
}
func gosched_m(gp *g) {
if trace.enabled {
traceGoSched()
}
// gp为准备挂起的用户协程g,即调用runtime.Gosched()函数所在的协程g
goschedImpl(gp)
}
func goschedImpl(gp *g) {
// 获取准备挂起g(gp)的状态,它的状态为目前处于_Grunning,因为它正在运行
status := readgstatus(gp)
// 检查gp的状态是否合法,如果不为_Grunning状态说明状态出现了异常
if status&^_Gscan != _Grunning {
dumpgstatus(gp)
throw("bad g status")
}
// 将gp的状态从_Grunning修改为_Grunnable
casgstatus(gp, _Grunning, _Grunnable)
// 解除m和gp的互相绑定,分别将m.curg和gp.m设置为nil
dropg()
lock(&sched.lock)
// 把gp放入全局的运行队列中
globrunqput(gp)
unlock(&sched.lock)
// 进行新一轮调度
schedule()
}
被动调度就是一个g被迫让出CPU执行权,有哪些情况会让出CPU使用权,我们常见的有channel发生阻塞、网络IO阻塞、执行垃圾回收而暂停用户程序执行等。那为什么要让出CPU的使用权,为了提供CPU的使用率,与其发生阻塞导致CPU干等不如让出CPU给其他可以执行的程序使用。下面以channel阻塞为例分析被动调度的处理流程。通道的接收操作<-c底层调用的是chanrecv1函数,本文主要分析channnel操作时是如果让出调度又是如何恢复调度的,下面只截取了与之相关的关键的代码,详细channel源码分析见说说channel哪些事-上篇说说channel哪些事-下篇. chanrecv1只是一个包装函数,真正调用的是chanrecv函数。chanrecv会先判断channel上是否有数据可读,如果有数据直接读取并返回,如果没有数据,则把当前的g放到当前channel对象的读取队列(recvq)上,然后调用goparkunlock函数阻塞当前g的运行。
// 接收操作入口<-c, 有2个入参,c表示通道结构体指针
// elem是接收通道元素的变量地址,即<-c左边的接收者
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
// 把协程g挂在channel c的读取队列上,调用goparkunlock函数阻塞当前的g
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
...
}
gopark函数最后会调用mcall(park_m)函数,切换到g0栈上执行park_m函数。park_m函数先将用户程序协程gp的状态从_Grunning修改为_Gwaiting,然后调用dropg函数解除gp和m的互相绑定关系,最后调用schedule()进入新一轮调度循环。
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
...
// 切换到g0栈上执行park_m函数
mcall(park_m)
}
func park_m(gp *g) {
// _g_为g0
_g_ := getg()
if trace.enabled {
traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
}
// 将用户程序协程gp的状态从_Grunning修改为_Gwaiting
casgstatus(gp, _Grunning, _Gwaiting)
// 解除gp与m的绑定
dropg()
...
// 进行新一轮调度
schedule()
}
分析完了channel读取操作中调度相关的操作,下面接着看channel发送操作中调度相关的代码。发送操作c<-x底层调用的是chansend1函数,chansend1直接调用了chansend函数,chansend函数会先检查是否有因channel没有数据可读而挂起的g,如果有直接发数据给sg。如果发送阻塞,出现阻塞原因是要么是非缓冲channel的还没有读取的g,要么是缓存channel buffer满了,这时与接收操作类似调用gopark挂起当前的发送协程g.
// 发送操作c<-x调用入口
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
// 查看是否有因channel没有数据可读而挂起的g,如果有直接发数据给sg
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
...
c.sendq.enqueue(mysg)
// 走到这里,说明要么是非缓冲channel的还没有读取的g,要么是缓存channel buffer满了
// 都是直接发送不了数据,需要挂起当前的g
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
...
}
如果不阻塞,调用send函数直接发送数据,send函数最后会调用goready函数,goready会切换到g0栈上执行ready函数。ready函数把要唤醒的g的状态设置为_Grunnable并将它加入运行队列,如果有空闲的p并且没有工作线程m处于自旋状态,也就是没有别的m正在进行偷取g的工作,则需要唤醒或新建一个m来运行可运行的g.
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
...
goready(gp, skip+1)
}
// goready完成的功能是将一个g状态修改为_Grunnable,并将它加入待运行队列,
// 等待被调度程序运行
func goready(gp *g, traceskip int) {
// 切换到g0栈执行ready
systemstack(func() {
ready(gp, traceskip, true)
})
}
// ready函数把要唤醒的g的状态设置为_Grunnable并将它加入运行队列
func ready(gp *g, traceskip int, next bool) {
if trace.enabled {
traceGoUnpark(gp, traceskip)
}
// status为即将唤醒的g(gp)的状态
status := readgstatus(gp)
// Mark runnable.
_g_ := getg()
mp := acquirem()
// 检查gp的状态是不是_Gwaiting,如果不是说明gp的状态出现了异常
if status&^_Gscan != _Gwaiting {
dumpgstatus(gp)
throw("bad g->status in ready")
}
// 设置gp的状态为可以运行状态(_Grunnable)
casgstatus(gp, _Gwaiting, _Grunnable)
// 将gp放入运行队列
runqput(_g_.m.p.ptr(), gp, next)
// 有空闲的p并且没有工作线程m处于自旋状态,也就是没有别的m正在进行偷取g的工作,则需要唤醒或新建一个m来运行可
// 运行的g
if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 {
wakep()
}
releasem(mp)
}
// 尝试获取一个m来运行可以运行的g,在新创建一个g(newproc函数)和唤醒一个g(ready)函数中
// 会调用wakep
func wakep() {
// 原子性检查当前是否已经有线程m在进行自旋了,因为很可能也有别的线程执行了goready,已唤醒了m
// 有一个自旋的m会努力找g运行,就不用再找一个m来工作了
if !atomic.Cas(&sched.nmspinning, 0, 1) {
return
}
startm(nil, true)
}
// startm函数功能是获取一个m,在获取m之前先获取一个空闲的p,如果获取不到p则返回
// 获取p后,在从全局空闲m队列中获取一个m,如果没有获取到则新创建一个m
func startm(_p_ *p, spinning bool) {
lock(&sched.lock)
if _p_ == nil {
// 如果p为nil,则从全局空闲p队列中获取一个空闲的p
_p_ = pidleget()
if _p_ == nil {
unlock(&sched.lock)
if spinning {
// 没有获取到p,也不会获取m了,因为在调用函数中spinning为true,已对sched.nmspinning
// 进行了+1操作,这里需要减一进行还原
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("startm: negative nmspinning")
}
}
// 没有拿到空闲的p,不会获取m,直接返回
return
}
}
// 从空闲的m队列中获取一个空闲(休眠)的工作线程
mp := mget()
unlock(&sched.lock)
if mp == nil {
// 空闲队列中没有休眠的m
var fn func()
if spinning {
fn = mspinning
}
// 创建一个新的工作线程m
newm(fn, _p_)
return
}
// 走到这里,表明mp是从空闲m队列获取的,这里的m都是处于休眠状态,它的状态不可能是spinning
if mp.spinning {
throw("startm: m is spinning")
}
// 空闲m队列中获取的m的nextp不可能为非0,mp.nextp将会和前面获取的p关联绑定
if mp.nextp != 0 {
throw("startm: m has p")
}
// spinning为true,需要进行自旋,就是找不到g了,但是本地队列中还存在等待运行的g,
// 是相互矛盾的
if spinning && !runqempty(_p_) {
throw("startm: p has runnable gs")
}
// 设置mp的自旋状态,是自旋还是非自旋
mp.spinning = spinning
// 将p保存到m的nextp中
mp.nextp.set(_p_)
// 唤醒处于休眠状态的工作线程
notewakeup(&mp.park)
}
为了确保每个g都有机会被调度执行,保证调度的公平性,在初始化的时候会启动一个特殊的线程来执行监控任务(sysmon函数),sysmon在runtime.main函数被启动,详情看前面runtime.main代码分析。下面看sysmon具体处理逻辑。sysmon函数是一个死循环函数,在第一轮循环的时候休眠20微妙,之后每轮循环中休眠时间加倍,直到最大休眠时间达到10毫秒。在循环中会检查是否有准备就绪的网络,并将其放入到全局队列中,也会进行抢占处理,按时间强制执行gc等操作。这里主要关注抢占处理retake函数,具体怎么抢占都是由该函数实现的。
// 监控线程工作函数
func sysmon() {
lock(&sched.lock)
sched.nmsys++
// 检查程序是否已经死锁
checkdead()
unlock(&sched.lock)
lasttrace := int64(0)
idle := 0
delay := uint32(0)
...
lastpoll := int64(atomic.Load64(&sched.lastpoll))
// 如果超过10ms没有进行netpoll,强制进行一次netpoll,如果获取到了可运行的g,插入g的全局列表
if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
list := netpoll(0)
if !list.empty() {
incidlelocked(-1)
injectglist(&list)
incidlelocked(1)
}
}
if next < now {
startm(nil, false)
}
// 重新获取在系统调用中阻塞的p并抢占长时间运行的g
if retake(now) != 0 {
idle = 0
} else {
idle++
}
// 检查是否需要启动垃圾回收程序
if t := (gcTrigger{kind: gcTriggerTime, now: now}); t.test() && atomic.Load(&forcegc.idle) != 0 {
lock(&forcegc.lock)
forcegc.idle = 0
var list gList
list.push(forcegc.g)
injectglist(&list)
unlock(&forcegc.lock)
}
...
}
}
retake函数重新获取在系统调用中阻塞的p并抢占长时间运行的g。retake是一个大循环,检查所有的p,所有的p保存在全局变量allp中。对于p只对它的两种情况进行处理,分别是_Prunning和_Psyscall,因为只有这两种状态的p与之关联的g正在执行,需要判断是否进行抢占。下面对两种情况分别进行细致的介绍。
func retake(now int64) uint32 {
n := 0
lock(&allpLock)
// 我们不能在 allp 上使用范围循环,因为我们可能会暂时删除 allpLock。因此,
// 我们每次都需要在循环中重新获取allp
// 遍历所有的p,因为m运行g需要绑定一个p,检查每个与p绑定的m中运行的g是否需要被抢占
for i := 0; i < len(allp); i++ {
_p_ := allp[i]
if _p_ == nil {
// 如果procresize已经增长了allp但还没有创建新的Ps,就会发生这种情况。
continue
}
// _p_.sysmontick用于监控线程(sysmon)记录被监控的p的运行时间和系统调用时间
// 以及运行g时的计数器值和进行系统调用时计数器值
pd := &_p_.sysmontick
// 获取p的状态保存到s中
s := _p_.status
sysretake := false
// 如果当前的p正在运行或者处于系统调用中,需要检查是否需要进行抢占
if s == _Prunning || s == _Psyscall {
// 每进行一次调度,_p_.schedtick会+1,也就是每切换一个g运行,schedtick会+1
t := int64(_p_.schedtick)
// 如果pd.schedtick和t不等,也就是pd.schedtick和_p_.schedtick不等
// 说明p上有进行过调度操作,即切换过g运行,重新更新pd的值schedtick和schedwhen
// 为下一次判断做准备
if int64(pd.schedtick) != t {
pd.schedtick = uint32(t)
pd.schedwhen = now
} else if pd.schedwhen+forcePreemptNS <= now {
// 走到这里说明pd.schedtick与t相等,说明从pd.schedwhen到now这段时间
// 没有发生过调度,也就是在这段时间,同一个g一直在运行,检查这个g运行的时间
// 是否超过了10毫秒,就是schedwhen+forcePreemptNS比当前时间小,说明已经超过了
// 对与_p_绑定的m中运行的g进行抢占
preemptone(_p_)
// 在系统调用的情况下,preemptone()不起作用, 因为没有m绑定到p
sysretake = true
}
}
// p处在系统调用中,需要检查是否进行抢占
if s == _Psyscall {
// _p_.syscalltick 用于记录系统调用的次数,在完成系统调用之后加 1
t := int64(_p_.syscalltick)
// 如果sysretake为false并且p的pd中记录的系统调用tick和当前p的系统调用tick不等
// 说明进行过调度,不用进行抢占,直接更新pd的值
if !sysretake && int64(pd.syscalltick) != t {
pd.syscalltick = uint32(t)
pd.syscallwhen = now
continue
}
// 同时满足下面3个条件,并进行抢占:
// 1. _p_的本地运行队列和runnext都没有g
// 2. 当前进行自旋的m数量+当前空闲的p的数量之和大于0
// 3. 进入系统调用的时间到现在还不够10毫秒
if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
continue
}
unlock(&allpLock)
incidlelocked(-1)
// 将_p_的状态从_Psyscall修改为_Pidle
if atomic.Cas(&_p_.status, s, _Pidle) {
if trace.enabled {
traceGoSysBlock(_p_)
traceProcStop(_p_)
}
// n记录处于在系统调用中的p并且需要被抢占的总数
n++
// 系统调用tick+1
_p_.syscalltick++
// 将与进入系统调用的m绑定的p分离
handoffp(_p_)
}
incidlelocked(1)
lock(&allpLock)
}
}
unlock(&allpLock)
return uint32(n)
}
p在_Prunning状态,说明与p关联的m上的g正在被运行,判断g运行时间是否超过10毫秒,如果超过了调用preemptone进行抢占。哪是怎么判断当前的g运行时间是否超过了10毫秒呢?p中有一个schedtick字段,每进行一次调度,schedtick的值会+1,也就是每切换一个g运行,schedtick的值加1. p中还有一个sysmontick字段,该字段是一个复合结构,它有四个成员,见下面的定义。这里我们先只关心前两个,schedtick和schedwhen分别保存上一次p调度的时候计数器值和调度的时间。p的schedtick保存的是当前的计数器值,如果两者相等,说明这段时间没有进行g的切换,那么就比较当前的时间与sysmontick中schedwhen时间差是否大于10毫秒,如果大于10毫秒,需要进行抢占。如果p的schedtick值与sysmontick与schedtick值不等,说明这段时间进行了g的切换,直接更新sysmontick计数器的值为当前p的schedtick值,更新sysmontick调度时间为当前时间。
type sysmontick struct {
// 调度的计数器
schedtick uint32
// 保存调度的时间
schedwhen int64
// 进行系统调用的计数器
syscalltick uint32
// 进入系统调用时的时间
syscallwhen int64
}
下面看进行真正抢占的处理逻辑函数preemptone,此函数处理比较简单,设置当前被抢占的g的preempt字段为true,并将stackguard0字段设置为0xfffffade之后很快就返回了,并没有看到将当前g暂停执行的逻辑。猜测很可能这是一个异步处理,即被抢占的g的执行函数时可能会检查这里设置的标志位,判断如果是设置了抢占,执行某些处理。接下来进行对猜测进行验证。我们看哪些处理逻辑涉及到了这里的stackguard0和preempt关键词,在runtime文件夹中执行grep查找,嗯,很快找到了newstack()函数用到了这里的关键词。继续追溯newstack函数在哪里被使用了,找到了一条这样的函数调用关系。morestack_noctxt()调用了morestack(),morestack中调用了newstack()。
func preemptone(_p_ *p) bool {
mp := _p_.m.ptr()
if mp == nil || mp == getg().m {
return false
}
// gp为当前被抢占的g
gp := mp.curg
if gp == nil || gp == mp.g0 {
return false
}
// 设置gp的是否可被抢占标志为true,表示可以被抢占
gp.preempt = true
// 设置gp.stackguard0值为0xfffffade,这是一个很大的数,用于区分判断
gp.stackguard0 = stackPreempt
if preemptMSupported && debug.asyncpreemptoff == 0 {
_p_.preempt = true
preemptM(mp)
}
return true
}
为了验证上面的程序是否真的会执行上面的函数调用关系,这里通过举个例子进行验证。下面是一个简单的输出hello world简单程序(main.go),然后查看它的汇编代码。
package main
import "fmt"
func main() {
fmt.Println("hello world")
}
对上面的程序执行go tool compile -S main.go得到汇编代码。
Go在函数调用的时候会设置安全点,这个是编译器为我们插入的代码。在函数调用的时候会检查stackguard0的大小,而决定是否调用runtime.morestack_noctxt函数。morestack_noctxt是汇编编写的,实现如下,它直接调用morestack函数,morestack会将当前调度信息保存起来,然后切换到g0栈执行newstack函数。
TEXT runtime·morestack_noctxt(SB),NOSPLIT,$0
MOVL $0, DX
JMP runtime·morestack(SB)
TEXT runtime·morestack(SB),NOSPLIT,$0-0
...
// 将当前的调度信息保存到g.sched字段中,具体是将PC、SP、
// BP寄存器的值和当前g的地址值保存到内存中
MOVQ 0(SP), AX // f's PC
MOVQ AX, (g_sched+gobuf_pc)(SI)
MOVQ SI, (g_sched+gobuf_g)(SI)
LEAQ 8(SP), AX // f's SP
MOVQ AX, (g_sched+gobuf_sp)(SI)
MOVQ BP, (g_sched+gobuf_bp)(SI)
MOVQ DX, (g_sched+gobuf_ctxt)(SI)
// 下面开始从当前的g切换到g0
// 将m.g0的地址拷贝到BX寄存器中
MOVQ m_g0(BX), BX
// 将寄存器BX中的值拷贝到tls中,即设置tls中的g为g0
MOVQ BX, g(CX)
// 把g0栈的栈顶寄存器的值恢复到CPU的寄存器SP中,顺利的切换到g0栈
MOVQ (g_sched+gobuf_sp)(BX), SP
// 调用newstack函数
CALL runtime·newstack(SB)
CALL runtime·abort(SB) // crash if newstack returns
RET
newstack函数主要完成两项工作:一是检查是否响应sysmon发起的抢占请求,二是检查栈是否需要进行扩容,下面只抽取了与响应抢占相关的代码。newstack检查被抢占g的状态,如果处于抢占状态,调用gopreempt_m将被抢占的gp切换出去放入到全局g运行队列。gopreempt_m是goschedImpl的简单包装,真正处理逻辑在goschedImpl函数,此函数在前面的主动调度中已分析过了,详情见前面的分析。
func newstack() {
// thisg为g0
thisg := getg()
...
// gp为用户程序g,就是需要进行抢占或者需要扩容的g
gp := thisg.m.curg
...
// 设置了抢占
if preempt {
// 检查被抢占g的状态,如果不是真正处于抢占状态
if !canPreemptM(thisg.m) {
// 将stackguard0的值恢复为原来的正常值,表示已经处理过该抢占请求了
gp.stackguard0 = gp.stack.lo + _StackGuard
// 实际并不需要进行抢占,调用gogo函数继续运行当前的用户程序g
gogo(&gp.sched)
}
}
...
if preempt {
...
// 调用gopreempt_m将被抢占的gp切换出去放入到全局g运行队列
gopreempt_m(gp) // never return
}
// 栈扩容的逻辑
...
}
func gopreempt_m(gp *g) {
if trace.enabled {
traceGoPreempt()
}
// 调用goschedImpl将gp的状态从_Grunning修改为_Grunnable,并把gp放入到全局g运行队列
goschedImpl(gp)
}
p在_Psyscall状态,说明与p关联的m上的g正在进行系统调用。只要下面的三个条件有任何一个不满足,就会对处于_Psyscall状态的p进行抢占。
当上述三个条件有任何一个不满足时,会将p的状态从_Psyscall修改为_Pidle,然后调用handoffp函数,将与进入系统调用的m绑定的p分离。handoff会对当前的条件进行检查,如果满足下面的条件则会调用startm函数启动新的工作线程来与当前的p进行关联,执行可运行的g.具体条件有以下几种:
最后,如果上述三个条件都满足,说明当前比较空闲,将p放入到P的全局空闲队列中即可。
func handoffp(_p_ *p) {
// 如果_p_的本地队列或全局队列中有运行的g,立即启动工作线程对g进行调度
if !runqempty(_p_) || sched.runqsize != 0 {
startm(_p_, false)
return
}
// 如果处于在垃圾回收标记阶段,启动线程运行gc work goroutine
if gcBlackenEnabled != 0 && gcMarkWorkAvailable(_p_) {
startm(_p_, false)
return
}
// 走到这里,说明_p_的本地队列和全局队列都没有可运行的g,也没有赋值垃圾回收标记的g需要运行
// 如果当前没有进行自旋的m也没有空闲的p,启动一个线程m进行自旋
if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 && atomic.Cas(&sched.nmspinning, 0, 1) {
startm(_p_, true)
return
}
lock(&sched.lock)
// 当前在gc阶段,需要STW,将_p_的状态切换到__Pgcstop
// 暂停_p_提供g资源
if sched.gcwaiting != 0 {
_p_.status = _Pgcstop
sched.stopwait--
if sched.stopwait == 0 {
notewakeup(&sched.stopnote)
}
unlock(&sched.lock)
return
}
// _p_.runSafePointFn被设置为1,需要调用安全点函数safePointFn处理一些gc相关的内容
if _p_.runSafePointFn != 0 && atomic.Cas(&_p_.runSafePointFn, 1, 0) {
sched.safePointFn(_p_)
sched.safePointWait--
if sched.safePointWait == 0 {
notewakeup(&sched.safePointNote)
}
}
// 如果全局队列中有g,启动一个线程执行g
if sched.runqsize != 0 {
unlock(&sched.lock)
startm(_p_, false)
return
}
// 如果这是最后一个运行的p并且没有人在轮询网络,则需要唤醒另一个m来轮询网络。
if sched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) != 0 {
unlock(&sched.lock)
startm(_p_, false)
return
}
if when := nobarrierWakeTime(_p_); when != 0 {
wakeNetPoller(when)
}
// 将p放入全局空闲队列中
pidleput(_p_)
unlock(&sched.lock)
}
协程是Go语言中非常精华的部分,我们知道Go语言可以轻松在单机上运行成千上万个协程,得益于调度器很好的设计.例如通过GMP模型实现了M的复用,引入P让各个处理器可以并行,充分发挥多核的优势,通过优先级队列保证调度的公平性等。小编在这次深入细致的学习调度器原理和实现中收获多多。
golang-scheduler-1-history[1]runtime-bootstrap[2]Go语言goroutine调度器初始化[3]Go语言调度器之调度main goroutine[4]深入golang runtime的调度[5]锲而不舍 —— M 是怎样找工作的[6]
[1]
golang-scheduler-1-history: http://lessisbetter.site/2019/03/10/golang-scheduler-1-history/
[2]
runtime-bootstrap: https://blog.altoros.com/golang-internals-part-5-runtime-bootstrap-process.html
[3]
Go语言goroutine调度器初始化: https://mp.weixin.qq.com/s?__biz=MzU1OTg5NDkzOA==&mid=2247483769&idx=1&sn=3d77609a293d87e64639afc8d2219e1c&scene=19#wechat_redirect
[4]
Go语言调度器之调度main goroutine: https://mp.weixin.qq.com/s?__biz=MzU1OTg5NDkzOA==&mid=2247483783&idx=1&sn=1128dbd7794d7d53c37abab94771a7d7&scene=19#wechat_redirect
[5]
深入golang runtime的调度: https://zboya.github.io/post/go_scheduler/
[6]
锲而不舍 —— M 是怎样找工作的: https://mp.weixin.qq.com/s/6sNtrdlKtwfJIvBA8UPnKg