前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >深入理解Go调度原理和实现

深入理解Go调度原理和实现

作者头像
数据小冰
发布2022-08-15 14:50:41
1.1K0
发布2022-08-15 14:50:41
举报
文章被收录于专栏:数据小冰

本文深入分析Go调度原理和实现,全文包含的主要内容有:Go程序是怎么运行起来的,经历了哪些流程,调度G的策略和时机,程序是如何在执行runtime代码与用户代码之间来回切换的。文章内容很长,感兴趣的同学可以收藏慢慢看。本文中分析的代码是Go1.14版本,涉及到的文件都在runtime包下。

对于编译性语言,需要将源代码文件编译成一个可执行程序,然后该执行程序被操作系统加载到内存开始运行。对于Go语言来说,就是我们编写的.go文件,通过编译、汇编和链接产生一个可执行的二进制文件。这个二进制文件也称为程序,当它被操作系统加载到内存后,会产生一个进程,进程就是可执行程序的一个实例。程序从加载到被执行这个过程概括起来有以下几个阶段

  1. 操作系统把磁盘上的可执行程序读入到内存中
  2. 创建可执行程序的实例进程,并为进程创建一个主线程,同时为主线程分配栈空间
  3. 将用户的设置的环境列表(environment list)和输入的命令行参数拷贝到主线程的栈中
  4. 把进程(或线程)加入到操作系统的运行队列等待被调度执行

linux系统中典型的进程内存结构如下所示,当程序刚启动执行时,会从arg/environ下面的主线程栈开始执行。随着函数调用,栈向下生长,当函数调用完毕,又主动释放占用的内存空间。

Go程序入口

我们先通过一个例子看如何找到一个Go可执行程序的入口,下面的代码保存在main.go文件中,运行 go build main.go默认生成可执行文件名称为main。然后使用gdb调试可以执行文件main.

代码语言:javascript
复制
package main

import "fmt"

func main() {
 fmt.Println("hello world")
}

执行gdb main,进入程序调试交互窗口,输入info files可以看到程序的入口Entry point: 0x45d990,这里是程序的入口地址,然后在这个地址这里下一个断点,b *0x45d990, 最后执行r,定位到源代码的入口地方在_rt0_adm64_linux()。在go源码目录下grep -rn "_rt0_adm64_linux" 可以定位到该函数在runtime包中rt0_linux_amd64.s中,该文件中的代码是汇编语言。_rt0_amd64_linux执行的指令是跳转到_rt0_amd64符合的位置。

代码语言:javascript
复制
TEXT _rt0_amd64_linux(SB),NOSPLIT,$-8
 JMP _rt0_amd64(SB)
    
TEXT _rt0_amd64(SB),NOSPLIT,$-8
    // 将程序的参数argv地址拷贝到DI寄存器中
 MOVQ 0(SP), DI // argc
 // 将程序参数argv的地址拷贝到SI寄存器中
 LEAQ 8(SP), SI // argv
 // 跳转到runtime·rt0_go符合的位置
 JMP runtime·rt0_go(SB)

调度器的初始化

前面我们通过一个例子找到Go程序的入口,下面我们看Go调度器是如何初始化的。Go程序调度模型也称为GMP模型,调度器初始化也是就核心数据结构G、M、P是怎么初始化的,以及它们之间的关系。对应到代码中也就是G0、M0和P的初始化。G0是一种特殊的G,它运行的是runtime中的代码,M0是进程启动的第一个线程,也称为主线程。下面通过源码分析调度器初始化的细节。

_rt0_amd64将程序参数argc和argv参数地址拷贝到寄存器之后,跳转到runtime·rt0_go位置执行。下面是runtime·rt0_go函数的源码,它也是汇编语言写的,runtime.rt0_g0函数完成了Go程序启动时的所有初始化工作,函数很长,这里提取了与调度相关的代码,这部分内容是我们需要关心的。

rt0_go函数开始的工作是将寄存器DI和SI的值分别赋值给AX和BX,因为DI和SI中的值是分别是函数参数argc和argv的地址,经过赋值之后,参数的信息也就存储在了AX和BX中。然后栈顶寄存器SP向下移动39字节,并调整SP的位置使它按16字节对齐。最后将argc和argv分别放到SP+16和SP+24字节处的内存中。

代码语言:javascript
复制
TEXT runtime·rt0_go(SB),NOSPLIT,$0
 // 将寄存器DI的值赋值给AX
 MOVQ DI, AX  // argc
 // 将寄存器SI的值赋值给BX
 MOVQ SI, BX  // argv
 SUBQ $(4*8+7), SP  // 2args 2auto
 // 调整栈顶寄存器SP使得它按16字节对齐
 ANDQ $~15, SP
 // 将AX中的内容拷贝都SP+16字节处的内存中,AX中存储的是argc的值
 // 也就是将argc放到SP+16字节处的内存中
 MOVQ AX, 16(SP)
 // 同理,将argv放到SP+24字节处的内存中
 MOVQ BX, 24(SP)

下面是G0的初始化,G0是一个全局的变量,它为runtime代码的运行提供一个栈环境。程序先将G0的地址保存在DI寄存器中,然后栈顶寄存器SP向下移动64*1024-104个字节,即向下大约移动64KB,构造G0的栈空间。最后给G0的stackguard0、stackguard1、stack.lo和stack.hi字段设置初始值,这几个字段与栈的位置,以及栈扩容相关。读到这里,有同学可能有疑问这里直接对G0的字段进行赋值,G0本身是怎么分配出来的?因为G0是一个全局变量,在执行rt0_go函数时已经分配过内存空间了。其他的G的是通过newg函数分配的,在后面的源码分析中我们会看到。这也是G0与其他G不同的一个地方,还有不同是这里G0字段初始化用的是汇编语言,其他的G中的字段的初始化是Go语言完成的。

代码语言:javascript
复制
TEXT runtime·rt0_go(SB),NOSPLIT,$0
    ...
    // 将g0的地址放到DI寄存器中
 MOVQ $runtime·g0(SB), DI
 // 将SP指向的位置向下(低地址空间)移动64*1024-104个字节,然后将该位置的地址赋值给BX
 // 这里是在为g0构造栈空间,g0栈的空间大小约为64KB
 LEAQ (-64*1024+104)(SP), BX
 // 将BX的值拷贝给g0.stackguard0,即 g0.stackguard0=*SP-64*1024+104
 MOVQ BX, g_stackguard0(DI)
 // g0.stackguard1=*SP-64*1024+104
 MOVQ BX, g_stackguard1(DI)
 // g0.stack.lo=*SP-64*1024+104
 MOVQ BX, (g_stack+stack_lo)(DI)
 // g0.stack.hi=SP
 MOVQ SP, (g_stack+stack_hi)(DI)
    ...

G0是一个全局变量

代码语言:javascript
复制
var (
    // 全局m0
 m0           m
    // 全局g0
 g0           g
 raceprocctx0 uintptr
)

执行完成上面的语句之后,G0和栈空间的关系如下图所示

完成了G0的初始化之后,rt0_go开始设置线程的本地存储。本地存储简称TLS(Thread Local Storage),它其实是线程的私有的全局变量。普通的全局变量,如果其中一个线程对其进行了修改,其他线程看到这个变量的值是修改后的值。而线程的私有全局变量则不是这样,某线程对私有全局变量的修改,不会影响到其他线程看到的值,可以理解成每个线程对私有全局变量都有自己的一个副本,某个线程修改的只是自己的副本,并不会对其他线程造成影响。

对应到这里线程本地存储中保存的是g0的值,为啥要保存g0的值呢?因为runtime会启动很多个工作线程执行我们的go协程。每个工作线程m都会绑定一个g0,g0是一种特殊的g,它的功能是对我们用户创建的g进行调度,也就是它运行的代码是runtime中的代码,并不是我们编写的用户逻辑。在m的结构体中有一个g0字段,保存的就是这里的g0.利用本地存储保存g0,可以做到各个m之间不会影响各自的g0,又方便代码编写。

代码语言:javascript
复制
TEXT runtime·rt0_go(SB),NOSPLIT,$0
    ...
    // 下面初始化线程本地存储
    // 将m0的tls字段的地址给到DI寄存器,即 DI=&m0.tls
 LEAQ runtime·m0+m_tls(SB), DI
 // 调用settls设置线程本地存储,settls函数的参数已在DI寄存器中
 CALL runtime·settls(SB)

 // 验证刚才设置的本地线程存储是否可以正常工作
 // 获取段基地址(FS)放入BX寄存器,也就是把m0.tls[1]的地址放入到BX寄存器
 get_tls(BX)
 // 将常数0x123赋值给BX寄存器指向的内存地址
 MOVQ $0x123, g(BX)
 // 将AX设置为m0.tls[0]
 MOVQ runtime·m0+m_tls(SB), AX
 // 在将AX指向的内存中的值与0x123进行比较,通过set-get-compare的形式检查tls是否工作正常
 CMPQ AX, $0x123
 JEQ 2(PC)
 CALL runtime·abort(SB)
ok:
 get_tls(BX)
 // 寄存器CX存储g0的地址
 LEAQ runtime·g0(SB), CX
 // 将g0的地址保存到本地线程存储 m0.tls[0]中
 MOVQ CX, g(BX)
 // 将m0的地址保存到寄存器AX中
 LEAQ runtime·m0(SB), AX

 // save m->g0 = g0
 // 将m0和g0互相绑定
 // m0.g0=g0
 MOVQ CX, m_g0(AX)
 // g0.m=m0
 MOVQ AX, g_m(CX)

 CLD    
 CALL runtime·check(SB)

 
TEXT runtime·settls(SB),NOSPLIT,$32
#ifdef GOOS_android
 SUBQ runtime·tls_g(SB), DI
#else
    // DI寄存器中保存的是m.tls[0]的地址,执行下面的ADDQ之后,将DI寄存器中的值+8
    // 此时DI中的值保存的是m.tls[1]的地址
 ADDQ $8, DI 
    // 将DI寄存器的值拷贝到SI寄存器中,这是arch_prctl系统调用的第二个参数
 MOVQ DI, SI
 // 将值0x1002拷贝到DI寄存器中,这时arch_prctl系统调用的第一个参数
 MOVQ $0x1002, DI // ARCH_SET_FS
 // 将SYS_arch_prctl系统调用编号的值拷贝到AX寄存器中
 MOVQ $SYS_arch_prctl, AX
 // 执行系统调用
 SYSCALL
 CMPQ AX, $0xfffffffffffff001
 JLS 2(PC)
 MOVL $0xf1, 0xf1  // crash
 RET

通过arch_prctl系统调用将m0.tls[1]的地址设为了FS段的段基地址,CPU中有个FS寄存器与之对应,这样以后,工作线程代码可以通过FS寄存器找到m.tls.

前面执行已完成G0的初始化,M0与G0的相互绑定,主线程中可以通过get_tls获取到G0,通过G0的成员字段m可以找到M0,实现了M0和G0与主线程之间的关联。此时主线程、m0、g0以及栈之间的关系如下图所示。

下面看Go进程启动的核心处理流程。整个流程主要分为5个步骤:

  1. 处理操作系统传递过来的参数,即把argc和argv放在当前栈顶SP的位置
  2. 调用runtime.osinit函数,获取CPU的核数,存放在全局变量ncpu中,供后面的调度时使用
  3. 调用runtime.schedinit进行调度的初始化,会对p进行初始化,把m0和allp[0]绑定
  4. 调用runtime.newproc函数,创建出main goroutine,也就是主协程,该g将运行的任务函数是runtime.main函数,并把main goroutine放入与m0绑定的p的本地队列
  5. 调用runtime.mstart启动工作线程m,进入调度系统
代码语言:javascript
复制
TEXT runtime·rt0_go(SB),NOSPLIT,$0
    ...
    // 下面语句处理操作系统传递过来的参数
    // 将argc从内存搬到AX存储器中
 MOVL 16(SP), AX  // copy argc
 // 将argc搬到SP+0的位置,即栈顶位置
 MOVL AX, 0(SP)
 // 将argv从内存搬到AX寄存器中
 MOVQ 24(SP), AX  // copy argv
 // 将argv搬到SP+8的位置
 MOVQ AX, 8(SP)
 CALL runtime·args(SB)
 // 调用osinit函数,获取CPU的核数,存放在全局变量ncpu中,供后面的调度时使用
 CALL runtime·osinit(SB)
 // 调用schedinit进行调度的初始化
 CALL runtime·schedinit(SB)

 // runtime.mainPC是runtime.main函数,将runtine.main的地址拷贝到AX寄存器
 MOVQ $runtime·mainPC(SB), AX  // entry
 // 将AX入栈,AX中存储的是runtime.main函数的地址,也就是下面将要开启新goroutine需要执行的函数
 // newproc的第二个参数
 PUSHQ AX
 // newproc的第一个参数入栈,该值表示runtime.main函数占用的大小,因为runtime.main函数没有入参
 // 所以这里是0
 PUSHQ $0   // arg size
 // 创建main goroutine
 CALL runtime·newproc(SB)
 POPQ AX
 POPQ AX

 // 调用mstart函数,主线程进入调度循环,运行刚刚创建的main goroutine
 CALL runtime·mstart(SB)

    // 上面的mstart函数是不会返回的,如果返回了,说明代码有问题,直接abort
 CALL runtime·abort(SB)
 RET
    
 MOVQ $runtime·debugCallV1(SB), AX
 RET

下面开始分析上面每个函数具体做了哪些事情:

osinit

osinit主要是获取CPU的核数并保持在全局变量ncpu中,linux系统下该函数在 runtime/os_linux.go文件中,该函数的实现与具体的系统架构有关,mac系统该函数在runtime/os_darwin.go文件中

代码语言:javascript
复制
func osinit() {
 // 获得CPU的数量
 ncpu = getproccount()
 physHugePageSize = getHugePageSize()
 osArchInit()
}
schedinit

schedinit是真正的调度函数,完成调度的初始化操作。主要完成以下功能:

  1. 设置最大线程数m的数量为10000,m0结构的初始化
  2. 根据cpu的核数或者设置的GOMAXPROCS完成对应数量p对象的创建和初始化
  3. 完成m0和p(allp[0])的互相绑定,至此,g0与m0, m0与p的互相绑定
代码语言:javascript
复制
func schedinit() {
 // _g_为g0
 // 当前的goroutine为g0,获得g0的地址
 _g_ := getg()
 if raceenabled {
  _g_.racectx, raceprocctx0 = raceinit()
 }
 // 初始化最大默认线程为1万个
 sched.maxmcount = 10000

 tracebackinit()
 moduledataverify()
 // 初始化栈空间
 stackinit()
 mallocinit()
 fastrandinit() 
 // 初始化m0,_g_为g0,所以_g_.m即为g0.m,g0.m也就是m0
 mcommoninit(_g_.m)
 ...
 gcinit()

 sched.lastpoll = uint64(nanotime())
 // 初始化p的数量,初始值为cpu的核数,如果程序设置了GOMAXPROCS,则取
 // GOMAXPROCS环境变量的值
 procs := ncpu
 if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
  procs = n
 }
 // 调整p的数量,创建并初始化所有的p,所有的p保存在全局变量allp中
 if procresize(procs) != nil {
  throw("unknown runnable goroutine during bootstrap")
 }
    ...
}

getg函数功能是从本地存储中获取当前正在运行的g,在源码中没有找到它的实现,查阅资料说它是在编译器中实现的。前面已经g0保存到了本地线程存储中,所以这里getg获取到的g就是g0.

schedinit函数中调用了mcommoninit函数,实现对m0的初始化,mcommoninit函数实现如下,它会设置m0的id字段,每个m有唯一的id标识,并将当前的m插入到全局保存m的链表对象allm中。这个链表是一个单链表,allm始终指向链表头。

代码语言:javascript
复制
func mcommoninit(mp *m) {
 // _g_还是g0
 _g_ := getg()

 if _g_ != _g_.m.g0 {
  callers(1, mp.createstack[:])
 }

 lock(&sched.lock)
 // sched.mnext溢出了
 if sched.mnext+1 < sched.mnext {
  throw("runtime: thread ID overflow")
 }
 // 设置m的id字段,每个m有唯一的id标识
 mp.id = sched.mnext
 // 全局计时器mnext自增1
 sched.mnext++
 // 检查当前存活的m数量是否在不超过10000
 checkmcount()
    
    ...
    
 // allm保存了所有的m对象,它是以链表的形式存储的,通过头插法的方式将当前的
 // m(mp)插入到全局链表对象allm中
 mp.alllink = allm

 // 更新allm的头节点为当前的m对象mp
 atomicstorep(unsafe.Pointer(&allm), unsafe.Pointer(mp))
 unlock(&sched.lock)
    ...
}

schedinit会调用procresize函数完成所有p的创建和初始化,整个procresize处理比较复杂,因为程序启动后我们可用通过runtime.GOMAXPROCS()函数动态修改p的数量。该函数主要有以下步骤:

  1. 计算当前真正p的数量nprocs,初始化保存所有p的全局变量allp,allp为一个切片,它里面保存的对象为*p类型,利用make初始化allp.
  2. 循环创建nprocs个p对象并初始化它,并把*p对象保存到全局变量allp切片中.
  3. 将m0和allp[0]互相绑定,并将allp[0]状态设置为_Prunning
  4. 将allp[1:nprocs]中的p放入到全局变量sched的pidle空闲队列中

上述步骤描述的是最基本的情况,即没有通过runtime.GOMAXPROCS()函数对p的数量进行调整。如果对p的数量进行了调整,会做一些其他处理逻辑,例如将多余的p的进行销毁,如果p的本地队列中已有等待运行的g,从全局空闲m队列中获取一个m将它与p进行绑定等。将所有可运行的p通过p.link串联起来,构成了一个链表结构,函数返回可运行p链表头节点。

代码语言:javascript
复制
func procresize(nprocs int32) *p {
 // 初始时gomaxprocs的值为0
 old := gomaxprocs
 if old < 0 || nprocs <= 0 {
  throw("procresize: invalid arg")
 }
 if trace.enabled {
  traceGomaxprocs(nprocs)
 }

 // 更新统计信息
 now := nanotime()
 if sched.procresizetime != 0 {
  sched.totaltime += int64(old) * (now - sched.procresizetime)
 }
 sched.procresizetime = now

 // 初始时allp中没有p,即len(allp)=0, nprocs显然大于len(allp)
 // 会进入这里的if逻辑
 if nprocs > int32(len(allp)) {
  lock(&allpLock)
  if nprocs <= int32(cap(allp)) {
   // 调整allp的len为nprocs, allp的cap不变
   allp = allp[:nprocs]
  } else {
   // 初始化时会创建新的切片保存p
   nallp := make([]*p, nprocs)
            
   // 将旧切片allp中的p拷贝到新的切片nallp中
   copy(nallp, allp[:cap(allp)])
   // 将allp替换为新创建的切片nallp
   allp = nallp
  }
  unlock(&allpLock)
 }

 // 开始时,old为0,即创建初始化所有的p,保存到allp中
 for i := old; i < nprocs; i++ {
  pp := allp[i]
  // 如果pp非空,复用已有的pp,否则通过new新创建一个p对象
  if pp == nil {
   pp = new(p)
  }
  // 对pp字段进行初始化设置
  pp.init(i)
  atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
 }
 // _g_是g0
 _g_ := getg()
 // 初始时m都还未绑定p,不会进入到这个分支中,程序启动之后,在设置GOMAXPROCS
 // 有可能进入下面的分支
 if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs {
  // 继续保持之前的p为_Prunning状态
  _g_.m.p.ptr().status = _Prunning
  _g_.m.p.ptr().mcache.prepareForSweep()
 } else { // 初始化的时候会进入到这个分支
  // 但初始化的时候_g_.m(g0.m)也就是m0还是未绑定p 所以不会走到这个if里面
  if _g_.m.p != 0 {
   if trace.enabled {
    traceGoSched()
    traceProcStop(_g_.m.p.ptr())
   }
   _g_.m.p.ptr().m = 0
  }
  // 清理m0.p和mcache
  _g_.m.p = 0
  _g_.m.mcache = nil
  // 从全局p队列allp中取第一个p与m0进行绑定
  p := allp[0]
  p.m = 0
  // 设置p的状态为_Pidle
  p.status = _Pidle
  // 将p就是allp[0]与m0互相绑定,并将p的状态修改为_Prunning
  acquirep(p)
  if trace.enabled {
   traceGoStart()
  }
 }

 // 将多余的p进行销毁
 for i := nprocs; i < old; i++ {
  p := allp[i]
  p.destroy()
 }

 // Trim allp.
 if int32(len(allp)) != nprocs {
  lock(&allpLock)
  allp = allp[:nprocs]
  unlock(&allpLock)
 }

 var runnablePs *p
 // 将除allp[0](与m0绑定的p)之外的所有的p放入到空闲链表中
 for i := nprocs - 1; i >= 0; i-- {
  p := allp[i]
  // 如果当前的p是allp[0],跳过,因为它在前面已经绑定到m0上了
  if _g_.m.p.ptr() == p {
   continue
  }
  // 设置p的状态为_Pidle
  p.status = _Pidle
  // 判断当前的p的本地队列中是不是没有g,初始时p中是没有g的
  if runqempty(p) {
   // p的本地队列没有g,将其放入到全局空闲p队列(sched.pidle)中
   pidleput(p)
  } else {
   // 从全局的空闲m队列(sched.midle)中拿出一个m,将这个m绑定到p上
   p.m.set(mget())
   // 将所有可运行的p通过p.link串联起来,构成了一个链表结构
   // 例如 allp[1].link --> allp[2]  allp[2].link-->allp[3]
   p.link.set(runnablePs)
   runnablePs = p
  }
 }
 stealOrder.reset(uint32(nprocs))
 var int32p *int32 = &gomaxprocs 
 // gomaxprocs设置为nprocs
 atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs))
 // 返回可运行p链表头节点
 return runnablePs
}

至此,已完成g0和m0的互相绑定,m0和allp[0]的互相绑定,nprocs个p的创建与初始化。得到调度器各个组件的关系如下图所示。

main goroutine的创建与运行

前面说了Go进程启动的核心处理流程有5个步骤,main goroutine的创建对应的是里面的第4个步骤,这里为了看起来方便,将rt0_go中main goroutine创建逻辑重新摘录出来。下面的汇编语句做的一件事就是调用newproc函数创建一个新的goroutine用来执行mainPC所对应的runtime.main函数。而runtime.main函数又会调用我们用户编写的package main包中main函数,这样我们的用户代码就可以被执行到了。

代码语言:javascript
复制
TEXT runtime·rt0_go(SB),NOSPLIT,$0
    ...
 // runtime.mainPC是runtime.main函数,将runtine.main的地址拷贝到AX寄存器
 MOVQ $runtime·mainPC(SB), AX  // entry
 // 将AX入栈,AX中存储的是runtime.main函数的地址,也就是下面将要开启新goroutine需要执行的函数
 // newproc的第二个参数
 PUSHQ AX
 // newproc的第一个参数入栈,该值表示runtime.main函数占用的大小,因为runtime.main函数没有入参
 // 所以这里是0
 PUSHQ $0   // arg size
 // 创建main goroutine
 CALL runtime·newproc(SB)
    ...

下面是newproc函数的实现,可以看到它有两个入参,第二个参数fn表示新创建出来的goroutine将从fn这个函数开始执行,第一个参数siz表示函数fn的参数占用的内存字节数。这里举个例子,方便我们理解,下面go add(1,2)会调用newproc创建一个协程,参数fn=&funcval{fn:add},siz=16. fn是一个结构体,它里面的fn字段就是新建协程调用的函数名称,对应到这里就是add函数。siz为add函数的参数占用的内存,这里add函数的参数是两个int64,所以占用的内存大小为16个字节,即siz为16.

需要传递函数参数大小siz给newproc函数的原因是,newproc函数将创建一个新的goroutine来执行fn函数,这个新创建的goroutine与当前程序正在执行的goroutine会使用不同的栈,它在运行的时候,需要知道fn函数参数的大小,而newproc函数自己不知道需要拷贝多少个字节的参数数据到新创建的goroutine栈上,所以用参数siz指明要拷贝的字节数。

代码语言:javascript
复制
func add(a, b int64) int64 {
 return a + b
}

func main() {
 go add(1, 2)
}

newproc函数在获取到fn函数的第一个参数地址(argp)后,调用systemstack切换到g0栈执行newproc1函数,并将argp参数传递给newproc1。

代码语言:javascript
复制
func newproc(siz int32, fn *funcval) {
 // argp指向第一个fn的第一个参数,siz说明fn参数的个数
 // 函数调用参数入栈顺序是从右向左
 argp := add(unsafe.Pointer(&fn), sys.PtrSize)
 // 获取正在运行的g,初始运行是main goroutine,此时的gp就是m0中的g0
 gp := getg()
 // 将调用newproc时由call指令压栈的函数返回地址,初始时,pc的值是
 // CALL runtime.newproc(SB)指令后面的指令的地址
 pc := getcallerpc()
 // systemstack表示切换到g0栈运行,初始时执行到这里的时候已经在g0栈,
 // 所以直接调用newproc1
 systemstack(func() {
  newproc1(fn, argp, siz, gp, pc)
 })
}

newproc1函数的主要功能是从堆上分配一个g对象newg,并为该对象分配一个大小为2048字节的栈空间,其次初始化它的stack成员变量,newg.stack.hi和newg.stack.lo分别指向栈的栈底(高地址)和栈顶(低地址)位置, 然后将newg需要执行函数的参数从newproc函数栈拷贝到newg栈中,最后将newg的状态修改为_Grunnable,并把它加入到等待运行的g队列中。

代码语言:javascript
复制
func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerpc uintptr) {
 // _g_为g0,初始时g0也是m0.go
 _g_ := getg()

 if fn == nil {
  _g_.m.throwing = -1 
  throw("go of nil func value")
 }
 acquirem() 
 siz := narg
 siz = (siz + 7) &^ 7

 // _StackMin=2048, sys.RegSize=8
 // siz>=2048-4*8-8=2048-40=2008

 // 检查参数的大小不能超过2048-4*8-8=2048-40=2008 Bytes
 if siz >= _StackMin-4*sys.RegSize-sys.RegSize {
  throw("newproc: function arguments too large for new goroutine")
 }
 // 获取与当前g0(_g_)关联的m的绑定的p,初始时,_g_.m也就是m0
 // m0绑定的p为allp[0]
 _p_ := _g_.m.p.ptr()
 // 从本地队列_p_中获取一个g,如果本地队列中没有g,从全局队列中获取
 newg := gfget(_p_)
 // 如果从_p_的本地队列和全局队列中都没有获取到g,则新创建一个g
 // 初始时,是没有g的,所以会进入下面的if逻辑,新建一个main goroutine对象
 if newg == nil {
  // 申请创建一个新的g,栈的大小为2KB
  newg = malg(_StackMin)
  casgstatus(newg, _Gidle, _Gdead)
  // 将当前新创建的newg加入到全局g列表allgs中
  allgadd(newg) 
 }
 if newg.stack.hi == 0 {
  throw("newproc1: newg missing stack")
 }

 if readgstatus(newg) != _Gdead {
  throw("newproc1: new g is not Gdead")
 }

 // 设置一定的保留空间
 totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize 
 // 保留空间大小按系统对齐
 totalSize += -totalSize & (sys.SpAlign - 1)
 // 确定sp的位置
 sp := newg.stack.hi - totalSize
 spArg := sp
 if usesLR {
  *(*uintptr)(unsafe.Pointer(sp)) = 0
  prepGoExitFrame(sp)
  spArg += sys.MinFrameSize
 }
 if narg > 0 {
  // 将参数argp从运行newproc函数的栈拷贝到要运行g的栈中
  // 初始时就是从m0.g0栈拷贝参数到新运行g的栈
  memmove(unsafe.Pointer(spArg), argp, uintptr(narg))
  if writeBarrier.needed && !_g_.m.curg.gcscandone {
   f := findfunc(fn.fn)
   stkmap := (*stackmap)(funcdata(f, _FUNCDATA_ArgsPointerMaps))
   if stkmap.nbit > 0 {
    bv := stackmapdata(stkmap, 0)
    bulkBarrierBitmap(spArg, spArg, uintptr(bv.n)*sys.PtrSize, 0, bv.bytedata)
   }
  }
 }
 // 清理掉newg中sched中内容,下面会对sched重新赋值
 memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
 // 设置newg的sched各字段的值,调度器根据这些字段把newg调度到CPU上运行
 newg.sched.sp = sp
 newg.stktopsp = sp
 // 设置程序计数器pc,当newg被调度起来运行时,从这个地址开始执行指令
 // pc的值为goexit函数偏移1(sys.PCQuantum)的位置
 newg.sched.pc = funcPC(goexit) + sys.PCQuantum 
 // 设置sched g的地址为当前g的地址
 newg.sched.g = guintptr(unsafe.Pointer(newg))
 // 调整sched成员和newg的栈
 gostartcallfn(&newg.sched, fn)
 newg.gopc = callerpc
 newg.ancestors = saveAncestors(callergp)
 // 设置newg的startpc为fn.fn,该字段用于函数调用的traceback和栈收缩
 newg.startpc = fn.fn
 if _g_.m.curg != nil {
  newg.labels = _g_.m.curg.labels
 }
 if isSystemGoroutine(newg, false) {
  atomic.Xadd(&sched.ngsys, +1)
 }
 // 设置g的状态为_Grunnable,意味着该g可以运行了
 casgstatus(newg, _Gdead, _Grunnable)

 if _p_.goidcache == _p_.goidcacheend {
  _p_.goidcache = atomic.Xadd64(&sched.goidgen, _GoidCacheBatch)
  _p_.goidcache -= _GoidCacheBatch - 1
  _p_.goidcacheend = _p_.goidcache + _GoidCacheBatch
 }
 // 设置g的唯一标识goid
 newg.goid = int64(_p_.goidcache)
 _p_.goidcache++
 if raceenabled {
  newg.racectx = racegostart(callerpc)
 }
 if trace.enabled {
  traceGoCreate(newg, newg.startpc)
 }
 // 将newg加入到队列中,具体尝试放入队列的顺序为_p_.runnext、本地队列、全局队列
 // 如果_p_.runnext为空直接将newg放在_p_.runnext位置,如果_p_.runnext不为空
 // 将newg与_p_.runnext交换,原来_p_.runnext的g尝试放入p的本地队列runq中
 // 如果本地队列满了,会将runq中一半的元素转移到全局队列中
 // 初始时,_p_.runnext是空的,所以直接放在这里
 runqput(_p_, newg, true)

 // 唤醒一个m来运行g,初始时不会执行,因为mainStarted为false,即runtime包中的main函数还未执行
 if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted {
  wakep()
 }
 releasem(_g_.m)
}

newproc1函数中有两条非常关键的语句,这里拿出来进行单独说明. 我们先来看newg.sched字段的功能,查看结构体g的定义,g的sched字段是gobuf类型,它保存的是goroutine的调度信息,重点就是保存几个关键寄存器的值。Go程序goroutine调度的本质是一组CPU寄存器和执行流的切换,当前我们执行某个g的时候,将BP,SP等寄存器设置为合适的值,将程序计数器pc指向g中的函数地址,这样g就被调度运行起来了,当要切换出当前g换其他g运行的时候,需要将当前g的CPU寄存器等信息保存到内存中,以供下次运行该g的时候,直接将保存到内存中的信息恢复到寄存器中又可以运行了。

可以看到这里将newg.sched.pc的值设置为了goexit函数的第二条指令,sys.PCQuantum的值为1,funcPC(goexit) + sys.PCQuantum指向的是goexit的第二条指令。为什么要这样设置呢?理论上不应该是设置为fn.fn吗,下面通过分析gostartcallfn函数一探究竟。

代码语言:javascript
复制
    ...
 newg.sched.pc = funcPC(goexit) + sys.PCQuantum 
    ...
 gostartcallfn(&newg.sched, fn)

gostartcallfn调整sched成员和newg的栈,核心处理调用的是gostartcall函数。gostartcall函数将栈顶寄存器SP向下移动一个指针的位置,然后将goexit+1即goexit的第二条指令。然后将buf.pc即newg.sched.pc重新设为fn(runtime.main函数)。相当于将goexit放到newg的栈顶,伪造成newg是被goeixt函数调用的,当newg中的fn函数执行完成之后,返回到goexit继续执行,做一些清理的操作。

代码语言:javascript
复制
func gostartcallfn(gobuf *gobuf, fv *funcval) {
 var fn unsafe.Pointer
 // 初始化时fn为runtime.main
 if fv != nil {
  fn = unsafe.Pointer(fv.fn)
 } else {
  fn = unsafe.Pointer(funcPC(nilfunc))
 }
 gostartcall(gobuf, fn, unsafe.Pointer(fv))
}

func gostartcall(buf *gobuf, fn, ctxt unsafe.Pointer) {
 // newg的栈顶,当前newg栈上只有fn函数的参数,sp指向的是fn的第一个参数
 sp := buf.sp
 if sys.RegSize > sys.PtrSize {
  sp -= sys.PtrSize
  *(*uintptr)(unsafe.Pointer(sp)) = 0
 }
 // 为返回地址预留8字节空间
 sp -= sys.PtrSize
 // buf.pc保存的是goexit+1的地址,将buf.pc赋值给sp指向的内存,下面真正赋值给buf.pc的是fn函数,
 // 初始时是runtime包中的main函数,这样使得fn执行完后返回到goexit继续执行,从而完成清理工作
 // 将fn伪装成是被goexit函数调用的
 *(*uintptr)(unsafe.Pointer(sp)) = buf.pc
 // 重新设置newg的栈顶寄存器
 buf.sp = sp
 // 真正将newg的pc值设置为fn函数,等到newg被调度起来运行时,调度器会将buf.pc放入cpu的ip寄存器
 // 即开始执行fn函数逻辑
 buf.pc = uintptr(fn)
 buf.ctxt = ctxt
}

结构体gobuf用于保存goroutine的调度信息,主要包含几个关键CPU寄存器的值

代码语言:javascript
复制
type gobuf struct {
 // 栈指针,保存rsp寄存器的值
 sp uintptr
 // 程序计数器,保存rip寄存器的值
 pc uintptr
 // 与gobuf关联的goroutine地址
 g    guintptr
 ctxt unsafe.Pointer
 // 系统调用的返回值
 ret sys.Uintreg
 lr  uintptr
 bp  uintptr 
}

分析完了newproc1整理出来流程,现在再后头来看将newg加入到等待运行队列函数runqput实现细节。

runqput将g放入到p的本地队列中,传入参数有3个,_p_为g将放入哪个p的本地队列,gp为待放入的g, next为true表示将gp放入到_p_的runnext中,如果_p_的runnext中已经有g,会进行替换,之前的g会放入到_p_的本地队尾,如果next为false,直接将gp放入到本地队列_p_的队尾.如果本地队列满了,将当前P中前len(p)/2个G批量放入到全局队列中。

代码语言:javascript
复制
func runqput(_p_ *p, gp *g, next bool) {
 if randomizeScheduler && next && fastrand()%2 == 0 {
  next = false
 }

 // 如果next为true,将gp放入到_p_.runnext中,这个过程通过cas保证原子性
 if next {
 retryNext:
  // 对_p_.runnext进行备份
  oldnext := _p_.runnext
  // 通过cas操作,将gp和oldnext进行交换
  if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
   goto retryNext
  }
  // 如果oldnext为0,说明_p_.runnext之前没有g,现在已放入完毕,直接返回
  if oldnext == 0 {
   return
  }
 
  // 将之前的g赋值给gp,下面会将gp放入_p_的本地队列
  gp = oldnext.ptr()
 }

retry:
 // h为本地队列的队头
 h := atomic.LoadAcq(&_p_.runqhead)
 // t为本地队列的队尾
 t := _p_.runqtail
 // t-h小于队列的长度,即本地队列还没有满,放到本地队列的尾部
 if t-h < uint32(len(_p_.runq)) {
  _p_.runq[t%uint32(len(_p_.runq))].set(gp)
  atomic.StoreRel(&_p_.runqtail, t+1) 
  return
 }
 // 走到这里说明本地队列满了,放到全局队列, 放入到全局队列并不是一个,而是将当前P中前len(p)/2个G批量放入到global queue中
 if runqputslow(_p_, gp, h, t) {
  return
 }
 
 // 走到这里说明往全局队列中没有放成功,没有成功的原因是,本地队列没有满,所以进一步重试,
 // 尝试放入本地队列
 goto retry
}

runqputslow将_p_本地队列中一半数量的g(另外加上gp)移动到全局队列中,在上面的runqput函数处理逻辑中,如果本地队列满了,会执行runqputslow.

代码语言:javascript
复制
func runqputslow(_p_ *p, gp *g, h, t uint32) bool {
 // 存储要移动的g,移动的数量为本地队列的一半+1个,这里的1是为传入的gp分配一个位置
 var batch [len(_p_.runq)/2 + 1]*g

 n := t - h
 n = n / 2
 // 确保n为本地队列的长度的一半
 if n != uint32(len(_p_.runq)/2) {
  throw("runqputslow: queue is not full")
 }

 // 将队头的n个g存储到batch中
 for i := uint32(0); i < n; i++ {
  batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr()
 }
 // 原子更新_p_队列队头的位置,队头向后移动n个位置
 if !atomic.CasRel(&_p_.runqhead, h, h+n) { 
  return false
 }
 // 将传入的gp放入到batch的末尾
 batch[n] = gp

 // 如果要随机化调度,打乱batch中元素的顺序
 if randomizeScheduler {
  for i := uint32(1); i <= n; i++ {
   j := fastrandn(i + 1)
   batch[i], batch[j] = batch[j], batch[i]
  }
 }

 // 将batch中的g串起来,构成一个链表,因为batch中有n+1元素
 // 所以这里循环n次,就将n+1个组成了链表结构
 for i := uint32(0); i < n; i++ {
  batch[i].schedlink.set(batch[i+1])
 }
 var q gQueue
 // 将链表的头节点和尾节点加入到q中,方便一次性将batch中的g加入到全局队列
 q.head.set(batch[0])
 q.tail.set(batch[n])

 lock(&sched.lock)
 // 将g一次性批量放入全局队列
 globrunqputbatch(&q, int32(n+1))
 unlock(&sched.lock)
 return true
}

func globrunqputbatch(batch *gQueue, n int32) {
 // 将batch一次性放入sched.runq中
 sched.runq.pushBackAll(*batch)
 // 更新sched.runq中g的数量
 sched.runqsize += n
 *batch = gQueue{}
}

// 将q2链表中的g加入到全局队列中
func (q *gQueue) pushBackAll(q2 gQueue) {
 if q2.tail == 0 {
  return
 }
 q2.tail.ptr().schedlink = 0
 // 直接将全局队列的队尾连接到q2的头节点,这样q2就加入了全局g链表中
 if q.tail != 0 {
  q.tail.ptr().schedlink = q2.head
 } else {
  q.head = q2.head
 }
 // 更新全局链表尾节点的位置,指向q2的尾部
 q.tail = q2.tail
}

通过流程图的形式将runqput处理逻辑描述出来,得到如下流程图。

经过上面的处理逻辑,main goroutine(newg)就被创建了出来,此时主线程、g0栈和main goroutine栈之间的关系如下图所示。

mstart

目前已分析完Go进程启动的核心处理流程的前四个步骤,现在来看最后一个步骤:调用runtime.mstart启动工作线程m,进入调度系统.

mstart函数设置了g的堆栈保护字段stackguard0和stackguard1之后,直接调用了函数mstart1。

代码语言:javascript
复制
func mstart() {
 // _g_为g0
 _g_ := getg()

 // 初始时,g0的stack.lo已完成初始化,它不等于0
 osStack := _g_.stack.lo == 0
 if osStack {
  size := _g_.stack.hi
  if size == 0 {
   size = 8192 * sys.StackGuardMultiplier
  }
  _g_.stack.hi = uintptr(noescape(unsafe.Pointer(&size)))
  _g_.stack.lo = _g_.stack.hi - size + 1024
 }

 // 初始化堆栈保护
 _g_.stackguard0 = _g_.stack.lo + _StackGuard
 _g_.stackguard1 = _g_.stackguard0
 mstart1()
    ...
 mexit(osStack)
}

mstart1会检测当前的g是否是g0,设置工作线程m的备用信号堆栈和信号掩码,如果当前的m是m0,做一些特殊处理,设置系统信号量的处理函数,检测m是否有起始任务函数,如果有就执行它,例如sysmon函数,获取一个p并将它与m进行绑定,最后执行调度程序函数schedule。可以看到mstart1还没有进行真正的调度,而是为调度做一些处理工作。

代码语言:javascript
复制
func mstart1() {
 // 初始时,_g_为m0中的g0,其他情况下_g_也是各个m的g0
 _g_ := getg()

 // 确保g是系统栈上的g0,调度器只在g0上直执行
 if _g_ != _g_.m.g0 {
  throw("bad runtime·mstart")
 }

 // getcallerpc()获取mstart1执行完的返回地址
 // getcallersp()获取调用mstart1时的栈顶地址
 // 保存g0调度信息
 save(getcallerpc(), getcallersp())
 asminit()
 // 初始化m,主要设置线程的备用信号堆栈和掩码
 minit()

 // 如果当前的m是m0,执行mstartm0操作
 if _g_.m == &m0 {
  // 对于初始m,需要设置系统信号量的处理函数
  mstartm0()
 }
 // 对于m0是没有mstartfn函数,对其他m如果有起始任务函数,则需要执行,比如sysmon函数
 if fn := _g_.m.mstartfn; fn != nil {
  fn()
 }

 // 如果当前g的m不是m0,它现在还没有p,需要获取一个p, m0已经绑定了allp[0],所以不用关心m0
 if _g_.m != &m0 {
  // 完成_g_.m和p的互相绑定
  acquirep(_g_.m.nextp.ptr())
  _g_.m.nextp = 0
 }
 // 调用调度函数schedule,该函数不会返回
 schedule()
}
调度策略

schedule函数的核心功能就是竭尽所能找到一个可运行的g,然后调用execute执行g中的函数。查找g的算法流程如下:

  1. 如果当前真正进行垃圾回收(GC)操作,需要暂停所有程序的运行(STW),调用gcstopm休眠当前的工作线程m
  2. 保证公平性,每进行61次调度时优先从全局队列中获取待运行的g
  3. 从p的本地队列中获取,优先从p.runnext中获取,没有再从p.runq中获取
  4. 调用findrunnable从其他p中偷取g,如果也没有偷取到的话,将当前的m休眠,等待被唤醒
代码语言:javascript
复制
func schedule() {
 // _g_为每个工作线程m对应的g0,初始时_g_为m0的g0
 _g_ := getg()
    ...
 var gp *g
 var inheritTime bool
    ...
 if gp == nil && gcBlackenEnabled != 0 {
  gp = gcController.findRunnableGCWorker(_g_.m.p.ptr())
  tryWakeP = tryWakeP || gp != nil
 }
 if gp == nil {
  // 每进行61次调度时优先从全局队列中获取待运行的g, 这样做是为了保证调度的公平性
  // 防止全局队列中的g得不到调度饿死
  if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
   // 先对全局队列加锁然后从中获取g
   lock(&sched.lock)
   gp = globrunqget(_g_.m.p.ptr(), 1)
   unlock(&sched.lock)
  }
 }
 if gp == nil {
  // 从与m绑定的p的本地队列中获取g
  gp, inheritTime = runqget(_g_.m.p.ptr())
 }
 if gp == nil {
  // 走到这里说明,从全局队列或p的本地队列中都没有获取到g,则调用findrunnable函数
  // 从其他工作线程绑定的p的本地队列中偷取g,如果没有偷取到,也就是当前都没有待运行的g
  // 了,当前的工作线程进入睡眠模式,直到获取到运行的g之后findrunnable函数才会返回
  gp, inheritTime = findrunnable() 
 }
    ...
 // 当前运行的是runtime的代码,函数调用栈使用的是g0的栈空间,调用execute会切换到
 // 用户程序创建的g(gp)的代码和栈空间去运行
 execute(gp, inheritTime)
}

上面介绍了schedule函数整体是怎么找g的流程算法,其实就是GMP调度模型中的调度策略(找G的策略),有3大策略,对应到上面的处理函数分别是globrunqget、runqget和findrunnable。下面分别详细介绍每个策略的实现细节。

从全局队列中获取G

globrunqget从全局队列最多获取max个g,并将获取的max-1个g放入_p_的本地队列,并将剩下的1个作为返回值给调用方进行调度。该函数是一个通用函数,也就是一次可以从全局队列中搬多个g到本地队列,进行批量搬运。考虑到公平性,搬运的数量有一定限制,将全局队列中g的数量根据p的数量均分,每个p能从全局队列中最多获取到sched.runqsize/gomaxprocs + 1,证公平性每个p都有机会从全局队列中拿到一部分g,同时搬运的数量不能超过本地队列长度的一半(128)。如果取回来的g数量超过本地队列_p_可以容纳的怎么解决呢?可以看到下的runqput函数中如果本地队列满了又放回全局队列,所以g不会丢失。

代码语言:javascript
复制
// globrunqget从全局队列最多获取max个g,并将获取的max-1个g放入_p_的本地队列
// 并将剩下的1个作为返回值给调用方进行调度
func globrunqget(_p_ *p, max int32) *g {
 // 如果全局队列中没有g,直接返回nil
 if sched.runqsize == 0 {
  return nil
 }

 // 将全局队列中g的数量根据p的数量均分,每个p能从全局队列中最多获取到sched.runqsize/gomaxprocs + 1
 // 个g,这样保证公平性,每个p都有机会从全局队列中拿到一部分g
 n := sched.runqsize/gomaxprocs + 1
 // 进一步判断n不能大于全局队列中g的数量,这种情况在gomaxprocs为1的时候会出现n>全局队列中g的数量
 if n > sched.runqsize {
  n = sched.runqsize
 }

 // 调整n的值不超过max,max作为入参的含义是一次性从全局队列取走的g的最大数量
 if max > 0 && n > max {
  n = max
 }
 // 继续调整n的值,n的值不能超过本地队列长度的一半,本地队列_p_.runq为一个256的数组
 // 它的一半为128,即n不能超过128,将_p_.runq作为一个环形数组循环使用的,这里有一种担心
 // 如果取回来的g数量超过本地队列_p_可以容纳的怎么解决呢?可以看到下的runqput函数中如果本地
 // 队列满了又放回全局队列,所以g不会丢失
 if n > int32(len(_p_.runq))/2 {
  n = int32(len(_p_.runq)) / 2
 }
 // 全局队列中的g的数量减少n个
 sched.runqsize -= n

 // 将全局队列的对头的g出队,直接通过函数返回gp,其他的g通过runqput放入_p_的本地队列
 gp := sched.runq.pop()
 n--
 // 循环n次从全局队列中取出n个g并放入本地队列
 for ; n > 0; n-- {
  gp1 := sched.runq.pop()
  // 将从全局队列获取的gp1放入到_p_的本地队列中
  runqput(_p_, gp1, false)
 }
 return gp
}
从P的本地队列中获取G

runqget从_p_的本地队列中取走一个g返回。优先从p的runnext中获取,runnext只保存一个g,当runnext没有g时才会从runq中获取。当循环队列的队头与队尾相同时,表明队列中没有可运行的g。否则,从队头获取一个g返回。因为存在其他P偷取g造成同时访问的情况,所以写了两个for{}死循环,通过cas手段保证并发安全。

代码语言:javascript
复制
// runqget从_p_的本地队列中取走一个g返回,注意它有两个返回值,第一个返回值就是获取到的
// g,第二参数是bool类型,表示返回的gp的是否应该继承当前时间片中的剩余时间,如果为true
// 表示继承当前时间片的剩余时间,返回false表示开启一个新的时间片
func runqget(_p_ *p) (gp *g, inheritTime bool) {
 // If there's a runnext, it's the next G to run.
 // 上来就是两个for{}循环
 for {
  // 优先从p的runnext中获取,runnext只保存一个g,当runnext没有g时
  // 才会从runq中获取
  next := _p_.runnext
  if next == 0 {
   break
  }
  // 获取的过程是cas操作,防止并发问题
  if _p_.runnext.cas(next, 0) {
   return next.ptr(), true
  }
 }

 for {
  // 从循环队列中获取一个g
  h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
  t := _p_.runqtail
  // h为一个下标,表示小于h下标 runq中的g已取走
  // t表示从[h,t)范围 runq中的g还未取走
  // t与h相等表示本地队列已经空了
  if t == h {
   return nil, false
  }
  // 将runq中h对应下标的g取出
  gp := _p_.runq[h%uint32(len(_p_.runq))].ptr()
  // 原子性自增_p_.runqhead
  if atomic.CasRel(&_p_.runqhead, h, h+1) { // cas-release, commits consume
   return gp, false
  }
 }
}
从其他P的本地队列中偷取G

findrunnable从其他p中偷取可以运行的g,在从其他p偷取g前会当前是否有已经准备好运行的网络协程,如果有返回一个可运行的协程。并通过injectglist函数将其余协程放入全局运行队列等待被调度。如果没有可以运行的网络协程,全局运行队列中也没有可运行的g,当前也没有垃圾回收中进行标记的g需要运行,这时会调用runqsteal从其他p中的本地队列中偷取它里面一半数量的g到当前的本地队列。

代码语言:javascript
复制
func findrunnable() (gp *g, inheritTime bool) {
 _g_ := getg()
top:
 _p_ := _g_.m.p.ptr()
 // 检查是否在处理gc
 if sched.gcwaiting != 0 {
  gcstopm()
  goto top
 }
 if _p_.runSafePointFn != 0 {
  runSafePointFn()
 }

 ...
 // 对本地队列再进行一次检查,查看是否有待运行的g
 if gp, inheritTime := runqget(_p_); gp != nil {
  return gp, inheritTime
 }

 // 查看全局队列是否有待运行的g
 if sched.runqsize != 0 {
  lock(&sched.lock)
  gp := globrunqget(_p_, 0)
  unlock(&sched.lock)
  if gp != nil {
   return gp, false
  }
 }

 // 从网络io轮询器中找到准备就绪的g,把这个g变为可运行的g
 if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
  if list := netpoll(0); !list.empty() { 
   gp := list.pop()
   // 把找到的可运行的网络io的g列表插入到全局队列
   injectglist(&list)
   // 将gp的状态从_Gwaiting修改为_Grunnable
   casgstatus(gp, _Gwaiting, _Grunnable)
   if trace.enabled {
    traceGoUnpark(gp, 0)
   }
   return gp, false
  }
 }

 procs := uint32(gomaxprocs)
 ranTimer := false
 // 如果当前的m没有自旋,并且工作中的p的数量小于正在自旋的m数的2倍,就不让它也在进行自旋了,
 // 主要是为了防止有过多的m在寻找可以运行的g而消耗太多的cpu
 if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {
  goto stop
 }
 // 将当前的m自旋
 if !_g_.m.spinning {
  // 设置m的状态为自旋状态
  _g_.m.spinning = true
  // 处于自旋状态的m的数量+1
  atomic.Xadd(&sched.nmspinning, 1)
 }

 // 从其他的p的本地运行队列中偷取g,p的选择是随机的,尝试4次
 for i := 0; i < 4; i++ {
  for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
   if sched.gcwaiting != 0 {
    goto top
   }
   stealRunNextG := i > 2 
   p2 := allp[enum.position()]
   if _p_ == p2 {
    continue
   }
   // 从p2偷取它本地队列一半数量的g到_p_中
   if gp := runqsteal(_p_, p2, stealRunNextG); gp != nil {
    return gp, false
   }
           ...
  }
 }
    ...
 // 垃圾回收中有进行标记工作的g需要运行,调度运行标记工作的g
 if gcBlackenEnabled != 0 && _p_.gcBgMarkWorker != 0 && gcMarkWorkAvailable(_p_) {
  _p_.gcMarkWorkerMode = gcMarkWorkerIdleMode
  gp := _p_.gcBgMarkWorker.ptr()
  casgstatus(gp, _Gwaiting, _Grunnable)
  if trace.enabled {
   traceGoUnpark(gp, 0)
  }
  return gp, false
 }
 ...
 // 走到这里,说明前面都没有找到可运行的g,将当前的工作线程与p之间的绑定解除
 if releasep() != _p_ {
  throw("findrunnable: wrong p")
 }
 // 把与当前m绑定的p放入空闲队列中,m接下来要准备休眠了
 pidleput(_p_)
 unlock(&sched.lock)
 wasSpinning := _g_.m.spinning
 if _g_.m.spinning {
  // 取消m的自旋状态标记
  _g_.m.spinning = false
  // 处于自旋状态的m数量减1
  if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
   throw("findrunnable: negative nmspinning")
  }
 }
 // 再一次检查所有的p中是否有有可用运行的g
 for _, _p_ := range allpSnapshot {
  ...
 }

 // 再次检查是有垃圾回收中进行标记的g需要运行
 if gcBlackenEnabled != 0 && gcMarkWorkAvailable(nil) {
    ...
    }
    ...
 // 当前的m进入休眠
 stopm()
 goto top
}

// runqsteal从p2的本地队列中偷取它里面一半数量的g到_p_的本地队列
func runqsteal(_p_, p2 *p, stealRunNextG bool) *g {
 t := _p_.runqtail
 // 从p2中偷取一批g放入到p中
 n := runqgrab(p2, &_p_.runq, t, stealRunNextG)
 if n == 0 {
  // 没有偷取到,直接返回
  return nil
 }
 // 这里减1个是因为函数会返回一个g
 n--
 gp := _p_.runq[(t+n)%uint32(len(_p_.runq))].ptr()
 // 只偷取到了1个g,直接返回它
 if n == 0 {
  return gp
 }
 // 检查_p_的本地队列数量是否越界
 h := atomic.LoadAcq(&_p_.runqhead) 
 if t-h+n >= uint32(len(_p_.runq)) {
  throw("runqsteal: runq overflow")
 }
 // 更新_p_本地队列队尾的位置
 atomic.StoreRel(&_p_.runqtail, t+n) 
 return gp
}

// runqgrab从_p_的本地运行队列中选择一批g放入到batch中
func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 {
 for {
  // 获取_p_本地队列队头的位置,原子操作
  h := atomic.LoadAcq(&_p_.runqhead) 
  // 获取_p_本地队列队尾的位置,原子操作
  t := atomic.LoadAcq(&_p_.runqtail) 
  // n为_p_的本地队列中有多少个可待运行的g
  n := t - h
  // n为队列中g数量的一半
  n = n - n/2
  if n == 0 {
   // 走进这里,说明n为0,什么时候n为0呢,就是h=t的时候,也就是_p_的本地队列中没有g的时候
   // 如果设置了从_p_.runnext(stealRunNextG为true)获取,尝试从runnext获取g
   if stealRunNextG {
    // 将_p_.runnext中的g放入到batch中
    if next := _p_.runnext; next != 0 {
     if _p_.status == _Prunning {
      if GOOS != "windows" {
       usleep(3)
      } else {
       osyield()
      }
     }
     if !_p_.runnext.cas(next, 0) {
      continue
     }
     batch[batchHead%uint32(len(batch))] = next
     return 1
    }
   }
   // 说明_p_.runnext中也没有g,直接返回
   return 0
  }
  // 进一步判断n不能超过_p_本地队列中g数量的一半,为什么要做这个判断呢?
  // 因为在前面读取runqhead和runqtail的两个操作整体看不是原子性的,虽然它们单独是原子性的
  // 也就是说在这两个操作的中间有可能有其他的线程也在偷取g或向里面放g,导致计算不一致,所以这里
  // 进一步检查
  if n > uint32(len(_p_.runq)/2) { 
   continue
  }
  // 从_p_本地队列搬n个g到batch中
  for i := uint32(0); i < n; i++ {
   g := _p_.runq[(h+i)%uint32(len(_p_.runq))]
   batch[(batchHead+i)%uint32(len(batch))] = g
  }
  // 更新_p_本地队列队头的位置,向后移动n个位置
  if atomic.CasRel(&_p_.runqhead, h, h+n) { 
   return n
  }
 }
}

分析完了怎么选取运行g的问题,现在在回到schedule调度函数,分析它是如何执行要运行g的,也就是对应到schedule中的execute处理逻辑。

execute

execute切换到选择的要运行的g的栈执行,设置g为运行状态和是否被抢占等信息之后,调用gogo函数从当前g0栈切换到要运行g的栈,真正开始执行用户程序。

代码语言:javascript
复制
func execute(gp *g, inheritTime bool) {
 // _g_为g0
 _g_ := getg()

 // 设置当前m运行用户程序的g位gp,即将执行用户程序的g和m绑定
 _g_.m.curg = gp
 // 设置执行用户程序的g(gp)的m为_g_.m,这样gp和当前的m相互绑定
 gp.m = _g_.m
 // 将gp的状态从_Grunnable修改为_Grunning,意味着gp马上会得到执行
 casgstatus(gp, _Grunnable, _Grunning)
 gp.waitsince = 0
 // 设置gp的抢占标志为false
 gp.preempt = false
 gp.stackguard0 = gp.stack.lo + _StackGuard
 if !inheritTime {
  _g_.m.p.ptr().schedtick++
 }

 hz := sched.profilehz
 if _g_.m.profilehz != hz {
  setThreadCPUProfiler(hz)
 }

 if trace.enabled {
  if gp.syscallsp != 0 && gp.sysblocktraced {
   traceGoSysExit(gp.sysexitticks)
  }
  traceGoStart()
 }
 // gogo将从g0栈切换到执行用户程序的gp,真正开始执行用户程序
 gogo(&gp.sched)
}

gogo函数是用汇编语言实现的,具体源码解析如下,切换原理就是将要运行g的调度信息g.sched从内存中恢复到CPU寄存器,设置SP和IP等寄存器的值,跳转到要运行的位置开始执行指令。总之一句话,gogo函数完成了从g0到用户g的切换,即CPU执行权的转让以及栈的切换。

代码语言:javascript
复制
TEXT runtime·gogo(SB), NOSPLIT, $16-8
    // 0(FP)表示第一个参数,即buf=&gp.sched
    // BX=buf
 MOVQ buf+0(FP), BX  // gobuf
 // DX=buf.g=&gp.sched.g
 MOVQ gobuf_g(BX), DX
 MOVQ 0(DX), CX  // make sure g != nil
 get_tls(CX)
 // 把g放入到tls[0],即把要运行的g的指针放进线程本地存储,后面的代码可以通过本地线程存储
 // 获取到当前正在执行的goroutine的地址,在这之前,本地线程存储中存放的是g0的地址
 MOVQ DX, g(CX)
 // SP=buf.sp=&gp.sched.sp,即把CPU的栈顶寄存器SP设置为gp.sched.sp,成功完成从
 // g0栈切换到gp栈
 MOVQ gobuf_sp(BX), SP // restore SP
 // 恢复调度上下文到CPU对应的寄存器
 // 将系统调用的返回值放入到AX寄存器中
 MOVQ gobuf_ret(BX), AX
 MOVQ gobuf_ctxt(BX), DX
 MOVQ gobuf_bp(BX), BP
 // 前面已经将调度相关的值都放入到CPU的寄存器中了,将gp.sched中的值清空,这样可以减轻gc的工作量
 MOVQ $0, gobuf_sp(BX) 
 MOVQ $0, gobuf_ret(BX)
 MOVQ $0, gobuf_ctxt(BX)
 MOVQ $0, gobuf_bp(BX)
 // 把gp.sched.pc的值放入到BX寄存器,对于main goroutine,sched.pc中存储的是runtime包
 // 中main()函数的地址
 MOVQ gobuf_pc(BX), BX
 // JMP把BX寄存器中的地址值放入到CPU的IP寄存器中,然后CPU跳转到该地址的位置开始执行指令
 // 即跳转到main()函数执行代码
 JMP BX

execute调用gogo从g0栈切换到用户g栈,在刚开始时,队列中只有一个main goroutine,所以第一个被运行的用户g就是main goroutine.执行完gogo后会跳转到runtime.main(main goroutine的function)运行。

runtime.main函数会调用我们main包中的main函数,在调用main包中main函数前,会做一些其他工作,主要有:

  1. 新建一个线程执行sysmon函数,sysmon的工作是系统后台监控,通过轮询的方式判断是否需要进行垃圾回收和调度抢占
  2. 启动gc清扫的goroutine
  3. 执行runtime包的初始化,main包以及main包import的所有包的初始化

执行完上述准备逻辑之后,才开始真正执行main.main函数,从main.main函数返回后调用exit(0)系统调用退出进程.如果exit(0)进程没退出,通过for循环一直访问非法地址,正常情况下,一但出现非法地址访问,系统就会把该进程杀死,用这样的方法来确保进程退出。

可以看到runtime.main执行完main.main函数之后,此时函数调用链是schedule()-->execute()-->gogo()-->runtime.main()-->main.main(),直接调用exit系统调用结束进程了。读到这里有同学可能疑问?前面不是说在创建goroutine的时候伪造成g是被goexit函数调用的,按理说执行完g逻辑之后,要返回goexit函数继续执行。这是对main goroutine以外的其他goroutine准备的,对于非main goroutine执行完之后会返回到goexit函数继续执行,而main goroutine执行完直接结束整个进程了,因为main goroutine执行完意味着程序结束了,所以不用再做其他处理了,也就无需再回到goexit函数继续执行。

代码语言:javascript
复制
func main() {
 // 这里的g位于main goroutine
 g := getg()
 g.m.g0.racectx = 0
 // 设置栈大小的最大值,对于32系统,栈的大小不能超过250M,对于64位系统,栈的大小不超过1G
 if sys.PtrSize == 8 {
  maxstacksize = 1000000000
 } else {
  maxstacksize = 250000000
 }
 // mainStarted为全局变量,表示main goroutine现在已经开始运行了
 // 现在可以允许其他创建的goroutine启动新的m来运行创建的g了
 mainStarted = true

 // wasm架构不支持线程,所以也就没有监控线程sysmon
 if GOARCH != "wasm" { 
  // 切换到g0栈,启动一个监控线程
  systemstack(func() {
   // 传递给newm的第二参数p为nil,说明该线程不需要与p绑定,即sysmon工作线程
   // m不需要调度器的调度,独立于调度器,因为该m只干一件事就是执行sysmon函数,不会
   // 执行用户程序的g
   newm(sysmon, nil)
  })
 }
 ...
 runtimeInitTime = nanotime()
 //启动垃圾清理的goroutine
 gcenable()
    ...
 // 执行package main 中的init函数, main包中引用的依赖包中的init函数也会执行
 doInit(&main_inittask)

 close(main_init_done)

 needUnlock = false
 unlockOSThread()

 //使用 -buildmode=c-archive 或 c-shared 编译的程序有一个 main,但它不会被执行。
 if isarchive || islibrary {
  return
 }
 // 进行间接调用,因为链接器在放置运行时不知道主包的地址
 fn := main_main 
 // 执行main包中的main函数
 fn()
    ...
 // 通过exit系统调用退出进程,可以看到main goroutine并没有返回,直接进入系统调用退出程序
 exit(0)

 for {
  var x *int32
  // x是一个空指针,是一个非法的地址,给他赋值会导致程序崩溃,操作系统就会把该进程杀死,确保程序一定会退出
  *x = 0
 }
}

前面分析了main goroutine的退出流程,它执行完main.main()函数后,直接调用exit系统调用结束进程了。而对于非main goroutine来说,执行完用户逻辑之后会回到goexit函数,因为在创建非main goroutine的时候伪造成g是被goexit函数调用的.下面开始分析回到goexit函数做了哪些处理。

执行完用户逻辑回到goexit+PCQuantum,也就是goexit的第二条指令,就是CALL runtime·goexit1(SB). 它直接调用goexit1函数。

代码语言:javascript
复制
TEXT runtime·goexit(SB),NOSPLIT,$0-0
 BYTE $0x90 // NOP
 // 用户程序执行完返回goexit的第二条指令就是这里
 CALL runtime·goexit1(SB) 
 BYTE $0x90 // NOP

goexit1函数处理一些data race等检查逻辑之后,调用了mcall函数,mcall是汇编实现的,具体实现细节见下面的注解。mcall函数的功能是从当前用户程序g切换到g0上运行,然后在g0栈上执行goexit0函数。概括起来,mcall完成两个主要逻辑:

  1. 保存当前的g的调度信息到内存中,通过当前的g,找到与它绑定的m,在通过m找到m中保存的g0,然后将g0保存到tls中,修改CPU寄存器的值为g0栈的内容
  2. 切换到g0栈,执行goexit0函数

可以看到mcall完成的功能与前面介绍的gogo函数功能完全相反。gogo函数实现从g0栈到用户程序g的切换,而这里的mcall恰好实现从用户程序g到g0的切换,所以通过gogo和mcall函数,我们可以在runtime代码和用户程序代码之间来回切换。

代码语言:javascript
复制
func goexit1() {
    // 这里是检查data race逻辑
    ...
 mcall(goexit0)
}

// 传给mcall函数的参数是一个指向funcval对象的指针
TEXT runtime·mcall(SB), NOSPLIT, $0-8
    // 取出参数的值也就是goexit0的地址,放入到DI寄存器中,
 MOVQ fn+0(FP), DI

    // 获取当前的g,这里的g还是用户程序的g
 get_tls(CX)
 // 设置AX寄存器的值为g
 MOVQ g(CX), AX 
 // mcall返回地址放入到BX寄存器中
 MOVQ 0(SP), BX 
 // 将当前的调度信息也就是CPU寄存器的值保存到g的sched字段中
 // 因为接下来将发送goroutine的切换,从用户程序g切换到g0

 // 保存PC
 MOVQ BX, (g_sched+gobuf_pc)(AX)
 LEAQ fn+0(FP), BX 
 // 保存SP
 MOVQ BX, (g_sched+gobuf_sp)(AX)
 // 保存g
 MOVQ AX, (g_sched+gobuf_g)(AX)
 // 保存BP
 MOVQ BP, (g_sched+gobuf_bp)(AX)

 // 将g保存到BX寄存器中
 MOVQ g(CX), BX
 // 将g.m保存到BX寄存器中
 MOVQ g_m(BX), BX
 // 将g.m.g0保存到寄存器SI中
 MOVQ m_g0(BX), SI
 // 经过上面3步操作,成功拿到了g0的地址,非常完美
 // 比较g0是否与g相等,理论上不应该相等,如果出现相等,一定是有问题
 CMPQ SI, AX 
 JNE 3(PC)
 MOVQ $runtime·badmcall(SB), AX
 JMP AX
 // SI中保存的是g0的地址,经过这步操作将g0的地址放到了线程本地存储中
 MOVQ SI, g(CX) // g = m->g0
 // 恢复g0的栈顶指针到CPU的SP寄存器中,成功将g0栈放入到CPU上,完成了从
 // 用户程序g栈到g0的栈切换
 MOVQ (g_sched+gobuf_sp)(SI), SP 
 // 将goexit0的参数g入栈
 PUSHQ AX
 // 将funcval对象的指针保存到DX
 MOVQ DI, DX
 // 获取funcval对象的第一个成员也就是goexit0的地址
 MOVQ 0(DI), DI
 // 调用goexit0(g)函数
 CALL DI
 POPQ AX
 MOVQ $runtime·badmcall2(SB), AX
 JMP AX
 RET

goexit0函数把gp(用户程序g)状态从_Grunning修改为_Gdead,然后清理gp对象中保存内容,其次通过函数dropg解除gp和m之间的绑定关系,然后将gp放入到P的freeg队列中缓存起来,以便后续复用,最后调用schedule,进行新一轮调度.

代码语言:javascript
复制
// goexit0函数是在g0上执行的,入参gp是用户程序g
func goexit0(gp *g) {
 // _g_为g0
 _g_ := getg()
 // 将gp的状态从_Grunning修改为_Gdead
 casgstatus(gp, _Grunning, _Gdead)
 if isSystemGoroutine(gp, false) {
  atomic.Xadd(&sched.ngsys, -1)
 }
 // 清理gp对象中保存内容
 gp.m = nil
    ...

 // 如果当前在进行垃圾回收,将gp赋值标记的信息刷新到全局信用池中
 if gcBlackenEnabled != 0 && gp.gcAssistBytes > 0 {
  scanCredit := int64(gcController.assistWorkPerByte * float64(gp.gcAssistBytes))
  atomic.Xaddint64(&gcController.bgScanCredit, scanCredit)
  gp.gcAssistBytes = 0
 }
 // 解除gp和m之间的绑定关系
 dropg()
    ...
 // 将gp放入到p的freeg队列中,以便下次可以服用,不用new一个g对象,避免重新申请内存
 gfput(_g_.m.p.ptr(), gp)
 if locked {
  if GOOS != "plan9" { 
   gogo(&_g_.m.g0.sched)
  } else {
   _g_.m.lockedExt = 0
  }
 }
 // 再次调用schedule,进行新一轮调度
 schedule()
}

至此,我们已经分析完了main goroutine和非main goroutine的整个调度流程,下图概括了调度流中函数调度栈。对于非main goroutine来说,从shedule开始,经过一些列函数调用执行用户的代码,最后又会回到shedule进行新一轮调度,这个过程会进行无数轮这样的循环,而函数的每一次调用会占用一定的内存空间,这样一直循环调度下去那不是会耗尽g0栈的所有空间,太危险了?这里是不会耗尽的,因为每次执行完mcall切换到g0栈时都是从一个固定的g0.sched.sp即固定的g0栈顶位置开始的,所以会重复使用上一轮调度时使用过的栈内存空间,这时没有问题的,因为前一轮调度已经结束了,覆盖掉之前空间的内容是没有问题的。

调度时机

前面分析了调度器的初始化,main goroutine的创建与运行,调度选g的策略等,下面看在什么时候什么情况下会发生调度,也就是调度的时机。根据调度方式的不同,调度时机分为3种,分别是主动调度、被动调度和抢占调度。

主动调度

在用户代码中通过调用runtime.Gosched()函数发生主动调度。Gosched函数会调用mcall从当前g的栈切换到m的g0栈执行gosched_m函数,mcall细节前面已经分析过了。下面直接看gosched_m函数的实现,gosched_m调用goschedImpl挂起用户协程g. goschedImpl主要完成以下4个功能:

  1. 将用户协程g状态从_Grunning修改为_Grunnable
  2. 调用dropg解除m和gp的互相绑定,分别将m.curg和gp.m设置为nil
  3. 将当前的用户协程g放入到全局运行队列中
  4. 调用schedule进入新一轮调度

主动调度的逻辑还是很好理解的,就是当前的g放弃CPU执行权,将其放入到全局运行队列中,因为它还是可以继续运行的,只是我们主动放弃了,等待下次被调度程序执行。

代码语言:javascript
复制
// Gosched提供给用户程序主动让出调度的接口
func Gosched() {
 // checkTimeouts为空实现
 checkTimeouts()
 // 切换到当前m的g0栈执行gosched_m函数
 mcall(gosched_m)
}

func gosched_m(gp *g) {
 if trace.enabled {
  traceGoSched()
 }
 // gp为准备挂起的用户协程g,即调用runtime.Gosched()函数所在的协程g
 goschedImpl(gp)
}

func goschedImpl(gp *g) {
 // 获取准备挂起g(gp)的状态,它的状态为目前处于_Grunning,因为它正在运行
 status := readgstatus(gp)
 // 检查gp的状态是否合法,如果不为_Grunning状态说明状态出现了异常
 if status&^_Gscan != _Grunning {
  dumpgstatus(gp)
  throw("bad g status")
 }
 // 将gp的状态从_Grunning修改为_Grunnable
 casgstatus(gp, _Grunning, _Grunnable)
 // 解除m和gp的互相绑定,分别将m.curg和gp.m设置为nil
 dropg()
 lock(&sched.lock)
 // 把gp放入全局的运行队列中
 globrunqput(gp)
 unlock(&sched.lock)

 // 进行新一轮调度
 schedule()
}
被动调度

被动调度就是一个g被迫让出CPU执行权,有哪些情况会让出CPU使用权,我们常见的有channel发生阻塞、网络IO阻塞、执行垃圾回收而暂停用户程序执行等。那为什么要让出CPU的使用权,为了提供CPU的使用率,与其发生阻塞导致CPU干等不如让出CPU给其他可以执行的程序使用。下面以channel阻塞为例分析被动调度的处理流程。通道的接收操作<-c底层调用的是chanrecv1函数,本文主要分析channnel操作时是如果让出调度又是如何恢复调度的,下面只截取了与之相关的关键的代码,详细channel源码分析见说说channel哪些事-上篇说说channel哪些事-下篇. chanrecv1只是一个包装函数,真正调用的是chanrecv函数。chanrecv会先判断channel上是否有数据可读,如果有数据直接读取并返回,如果没有数据,则把当前的g放到当前channel对象的读取队列(recvq)上,然后调用goparkunlock函数阻塞当前g的运行。

代码语言:javascript
复制
// 接收操作入口<-c, 有2个入参,c表示通道结构体指针
// elem是接收通道元素的变量地址,即<-c左边的接收者
func chanrecv1(c *hchan, elem unsafe.Pointer) {
 chanrecv(c, elem, true)
}

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    ...
 // 把协程g挂在channel c的读取队列上,调用goparkunlock函数阻塞当前的g
 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
    ...
}

gopark函数最后会调用mcall(park_m)函数,切换到g0栈上执行park_m函数。park_m函数先将用户程序协程gp的状态从_Grunning修改为_Gwaiting,然后调用dropg函数解除gp和m的互相绑定关系,最后调用schedule()进入新一轮调度循环。

代码语言:javascript
复制
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
 ...
 // 切换到g0栈上执行park_m函数
 mcall(park_m)
}

func park_m(gp *g) {
 // _g_为g0
 _g_ := getg()

 if trace.enabled {
  traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
 }
 // 将用户程序协程gp的状态从_Grunning修改为_Gwaiting
 casgstatus(gp, _Grunning, _Gwaiting)
 // 解除gp与m的绑定
 dropg()
    ...
 // 进行新一轮调度
 schedule()
}

分析完了channel读取操作中调度相关的操作,下面接着看channel发送操作中调度相关的代码。发送操作c<-x底层调用的是chansend1函数,chansend1直接调用了chansend函数,chansend函数会先检查是否有因channel没有数据可读而挂起的g,如果有直接发数据给sg。如果发送阻塞,出现阻塞原因是要么是非缓冲channel的还没有读取的g,要么是缓存channel buffer满了,这时与接收操作类似调用gopark挂起当前的发送协程g.

代码语言:javascript
复制
// 发送操作c<-x调用入口
func chansend1(c *hchan, elem unsafe.Pointer) {
 chansend(c, elem, true, getcallerpc())
}

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
 ...
 // 查看是否有因channel没有数据可读而挂起的g,如果有直接发数据给sg
 if sg := c.recvq.dequeue(); sg != nil {
  send(c, sg, ep, func() { unlock(&c.lock) }, 3)
  return true
 }
 ...
 c.sendq.enqueue(mysg)
 // 走到这里,说明要么是非缓冲channel的还没有读取的g,要么是缓存channel buffer满了
 // 都是直接发送不了数据,需要挂起当前的g
 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
 ...
}

如果不阻塞,调用send函数直接发送数据,send函数最后会调用goready函数,goready会切换到g0栈上执行ready函数。ready函数把要唤醒的g的状态设置为_Grunnable并将它加入运行队列,如果有空闲的p并且没有工作线程m处于自旋状态,也就是没有别的m正在进行偷取g的工作,则需要唤醒或新建一个m来运行可运行的g.

代码语言:javascript
复制
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
 ...
 goready(gp, skip+1)
}

// goready完成的功能是将一个g状态修改为_Grunnable,并将它加入待运行队列,
// 等待被调度程序运行
func goready(gp *g, traceskip int) {
 // 切换到g0栈执行ready
 systemstack(func() {
  ready(gp, traceskip, true)
 })
}

// ready函数把要唤醒的g的状态设置为_Grunnable并将它加入运行队列
func ready(gp *g, traceskip int, next bool) {
 if trace.enabled {
  traceGoUnpark(gp, traceskip)
 }
 // status为即将唤醒的g(gp)的状态
 status := readgstatus(gp)

 // Mark runnable.
 _g_ := getg()
 mp := acquirem() 
 // 检查gp的状态是不是_Gwaiting,如果不是说明gp的状态出现了异常
 if status&^_Gscan != _Gwaiting {
  dumpgstatus(gp)
  throw("bad g->status in ready")
 }

 // 设置gp的状态为可以运行状态(_Grunnable)
 casgstatus(gp, _Gwaiting, _Grunnable)
 // 将gp放入运行队列
 runqput(_g_.m.p.ptr(), gp, next)
 // 有空闲的p并且没有工作线程m处于自旋状态,也就是没有别的m正在进行偷取g的工作,则需要唤醒或新建一个m来运行可
 // 运行的g
 if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 {
  wakep()
 }
 releasem(mp)
}

// 尝试获取一个m来运行可以运行的g,在新创建一个g(newproc函数)和唤醒一个g(ready)函数中
// 会调用wakep
func wakep() {
 // 原子性检查当前是否已经有线程m在进行自旋了,因为很可能也有别的线程执行了goready,已唤醒了m
 // 有一个自旋的m会努力找g运行,就不用再找一个m来工作了
 if !atomic.Cas(&sched.nmspinning, 0, 1) {
  return
 }
 startm(nil, true)
}

// startm函数功能是获取一个m,在获取m之前先获取一个空闲的p,如果获取不到p则返回
// 获取p后,在从全局空闲m队列中获取一个m,如果没有获取到则新创建一个m
func startm(_p_ *p, spinning bool) {
 lock(&sched.lock)
 if _p_ == nil {
  // 如果p为nil,则从全局空闲p队列中获取一个空闲的p
  _p_ = pidleget()
  if _p_ == nil {
   unlock(&sched.lock)
   if spinning {
    // 没有获取到p,也不会获取m了,因为在调用函数中spinning为true,已对sched.nmspinning
    // 进行了+1操作,这里需要减一进行还原
    if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
     throw("startm: negative nmspinning")
    }
   }
   // 没有拿到空闲的p,不会获取m,直接返回
   return
  }
 }
 // 从空闲的m队列中获取一个空闲(休眠)的工作线程
 mp := mget()
 unlock(&sched.lock)
 if mp == nil {
  // 空闲队列中没有休眠的m
  var fn func()
  if spinning {
   fn = mspinning
  }
  // 创建一个新的工作线程m
  newm(fn, _p_)
  return
 }
 // 走到这里,表明mp是从空闲m队列获取的,这里的m都是处于休眠状态,它的状态不可能是spinning
 if mp.spinning {
  throw("startm: m is spinning")
 }
 // 空闲m队列中获取的m的nextp不可能为非0,mp.nextp将会和前面获取的p关联绑定
 if mp.nextp != 0 {
  throw("startm: m has p")
 }
 // spinning为true,需要进行自旋,就是找不到g了,但是本地队列中还存在等待运行的g,
 // 是相互矛盾的
 if spinning && !runqempty(_p_) {
  throw("startm: p has runnable gs")
 }
 // 设置mp的自旋状态,是自旋还是非自旋
 mp.spinning = spinning
 // 将p保存到m的nextp中
 mp.nextp.set(_p_)
 // 唤醒处于休眠状态的工作线程
 notewakeup(&mp.park)
}
抢占调度

为了确保每个g都有机会被调度执行,保证调度的公平性,在初始化的时候会启动一个特殊的线程来执行监控任务(sysmon函数),sysmon在runtime.main函数被启动,详情看前面runtime.main代码分析。下面看sysmon具体处理逻辑。sysmon函数是一个死循环函数,在第一轮循环的时候休眠20微妙,之后每轮循环中休眠时间加倍,直到最大休眠时间达到10毫秒。在循环中会检查是否有准备就绪的网络,并将其放入到全局队列中,也会进行抢占处理,按时间强制执行gc等操作。这里主要关注抢占处理retake函数,具体怎么抢占都是由该函数实现的。

代码语言:javascript
复制
// 监控线程工作函数
func sysmon() {
 lock(&sched.lock)
 sched.nmsys++
 // 检查程序是否已经死锁
 checkdead()
 unlock(&sched.lock)

 lasttrace := int64(0)
 idle := 0 
 delay := uint32(0)
 ...
  lastpoll := int64(atomic.Load64(&sched.lastpoll))
  // 如果超过10ms没有进行netpoll,强制进行一次netpoll,如果获取到了可运行的g,插入g的全局列表
  if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
   atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
   list := netpoll(0) 
   if !list.empty() {
    incidlelocked(-1)
    injectglist(&list)
    incidlelocked(1)
   }
  }
  if next < now {
   startm(nil, false)
  }
  
  // 重新获取在系统调用中阻塞的p并抢占长时间运行的g
  if retake(now) != 0 {
   idle = 0
  } else {
   idle++
  }
  
  // 检查是否需要启动垃圾回收程序
  if t := (gcTrigger{kind: gcTriggerTime, now: now}); t.test() && atomic.Load(&forcegc.idle) != 0 {
   lock(&forcegc.lock)
   forcegc.idle = 0
   var list gList
   list.push(forcegc.g)
   injectglist(&list)
   unlock(&forcegc.lock)
  }
    ...
 }
}

retake函数重新获取在系统调用中阻塞的p并抢占长时间运行的g。retake是一个大循环,检查所有的p,所有的p保存在全局变量allp中。对于p只对它的两种情况进行处理,分别是_Prunning和_Psyscall,因为只有这两种状态的p与之关联的g正在执行,需要判断是否进行抢占。下面对两种情况分别进行细致的介绍。

代码语言:javascript
复制
func retake(now int64) uint32 {
 n := 0
 lock(&allpLock)
 // 我们不能在 allp 上使用范围循环,因为我们可能会暂时删除 allpLock。因此,
 // 我们每次都需要在循环中重新获取allp

 // 遍历所有的p,因为m运行g需要绑定一个p,检查每个与p绑定的m中运行的g是否需要被抢占
 for i := 0; i < len(allp); i++ {
  _p_ := allp[i]
  if _p_ == nil {
   // 如果procresize已经增长了allp但还没有创建新的Ps,就会发生这种情况。
   continue
  }

  // _p_.sysmontick用于监控线程(sysmon)记录被监控的p的运行时间和系统调用时间
  // 以及运行g时的计数器值和进行系统调用时计数器值
  pd := &_p_.sysmontick
  // 获取p的状态保存到s中
  s := _p_.status
  sysretake := false
  // 如果当前的p正在运行或者处于系统调用中,需要检查是否需要进行抢占
  if s == _Prunning || s == _Psyscall {
   // 每进行一次调度,_p_.schedtick会+1,也就是每切换一个g运行,schedtick会+1
   t := int64(_p_.schedtick)
   // 如果pd.schedtick和t不等,也就是pd.schedtick和_p_.schedtick不等
   // 说明p上有进行过调度操作,即切换过g运行,重新更新pd的值schedtick和schedwhen
   // 为下一次判断做准备
   if int64(pd.schedtick) != t {
    pd.schedtick = uint32(t)
    pd.schedwhen = now
   } else if pd.schedwhen+forcePreemptNS <= now {
    // 走到这里说明pd.schedtick与t相等,说明从pd.schedwhen到now这段时间
    // 没有发生过调度,也就是在这段时间,同一个g一直在运行,检查这个g运行的时间
    // 是否超过了10毫秒,就是schedwhen+forcePreemptNS比当前时间小,说明已经超过了

    // 对与_p_绑定的m中运行的g进行抢占
    preemptone(_p_)
    // 在系统调用的情况下,preemptone()不起作用, 因为没有m绑定到p
    sysretake = true
   }
  }

  // p处在系统调用中,需要检查是否进行抢占
  if s == _Psyscall {
   // _p_.syscalltick 用于记录系统调用的次数,在完成系统调用之后加 1
   t := int64(_p_.syscalltick)
   // 如果sysretake为false并且p的pd中记录的系统调用tick和当前p的系统调用tick不等
   // 说明进行过调度,不用进行抢占,直接更新pd的值
   if !sysretake && int64(pd.syscalltick) != t {
    pd.syscalltick = uint32(t)
    pd.syscallwhen = now
    continue
   }
   
   // 同时满足下面3个条件,并进行抢占:
   // 1. _p_的本地运行队列和runnext都没有g
   // 2. 当前进行自旋的m数量+当前空闲的p的数量之和大于0
   // 3. 进入系统调用的时间到现在还不够10毫秒
   if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
    continue
   }
   unlock(&allpLock)
   incidlelocked(-1)
   // 将_p_的状态从_Psyscall修改为_Pidle
   if atomic.Cas(&_p_.status, s, _Pidle) {
    if trace.enabled {
     traceGoSysBlock(_p_)
     traceProcStop(_p_)
    }
    // n记录处于在系统调用中的p并且需要被抢占的总数
    n++
    // 系统调用tick+1
    _p_.syscalltick++
    // 将与进入系统调用的m绑定的p分离
    handoffp(_p_)
   }
   incidlelocked(1)
   lock(&allpLock)
  }
 }
 unlock(&allpLock)
 return uint32(n)
}

p在_Prunning状态,说明与p关联的m上的g正在被运行,判断g运行时间是否超过10毫秒,如果超过了调用preemptone进行抢占。哪是怎么判断当前的g运行时间是否超过了10毫秒呢?p中有一个schedtick字段,每进行一次调度,schedtick的值会+1,也就是每切换一个g运行,schedtick的值加1. p中还有一个sysmontick字段,该字段是一个复合结构,它有四个成员,见下面的定义。这里我们先只关心前两个,schedtick和schedwhen分别保存上一次p调度的时候计数器值和调度的时间。p的schedtick保存的是当前的计数器值,如果两者相等,说明这段时间没有进行g的切换,那么就比较当前的时间与sysmontick中schedwhen时间差是否大于10毫秒,如果大于10毫秒,需要进行抢占。如果p的schedtick值与sysmontick与schedtick值不等,说明这段时间进行了g的切换,直接更新sysmontick计数器的值为当前p的schedtick值,更新sysmontick调度时间为当前时间。

代码语言:javascript
复制
type sysmontick struct {
 // 调度的计数器
 schedtick   uint32
 // 保存调度的时间
 schedwhen   int64
 // 进行系统调用的计数器
 syscalltick uint32
 // 进入系统调用时的时间
 syscallwhen int64
}

下面看进行真正抢占的处理逻辑函数preemptone,此函数处理比较简单,设置当前被抢占的g的preempt字段为true,并将stackguard0字段设置为0xfffffade之后很快就返回了,并没有看到将当前g暂停执行的逻辑。猜测很可能这是一个异步处理,即被抢占的g的执行函数时可能会检查这里设置的标志位,判断如果是设置了抢占,执行某些处理。接下来进行对猜测进行验证。我们看哪些处理逻辑涉及到了这里的stackguard0和preempt关键词,在runtime文件夹中执行grep查找,嗯,很快找到了newstack()函数用到了这里的关键词。继续追溯newstack函数在哪里被使用了,找到了一条这样的函数调用关系。morestack_noctxt()调用了morestack(),morestack中调用了newstack()。

代码语言:javascript
复制
func preemptone(_p_ *p) bool {
 mp := _p_.m.ptr()
 if mp == nil || mp == getg().m {
  return false
 }
 // gp为当前被抢占的g
 gp := mp.curg
 if gp == nil || gp == mp.g0 {
  return false
 }
 // 设置gp的是否可被抢占标志为true,表示可以被抢占
 gp.preempt = true

 // 设置gp.stackguard0值为0xfffffade,这是一个很大的数,用于区分判断
 gp.stackguard0 = stackPreempt
 if preemptMSupported && debug.asyncpreemptoff == 0 {
  _p_.preempt = true
  preemptM(mp)
 }

 return true
}

为了验证上面的程序是否真的会执行上面的函数调用关系,这里通过举个例子进行验证。下面是一个简单的输出hello world简单程序(main.go),然后查看它的汇编代码。

代码语言:javascript
复制
package main

import "fmt"

func main() {
 fmt.Println("hello world")
}

对上面的程序执行go tool compile -S main.go得到汇编代码。

Go在函数调用的时候会设置安全点,这个是编译器为我们插入的代码。在函数调用的时候会检查stackguard0的大小,而决定是否调用runtime.morestack_noctxt函数。morestack_noctxt是汇编编写的,实现如下,它直接调用morestack函数,morestack会将当前调度信息保存起来,然后切换到g0栈执行newstack函数。

代码语言:javascript
复制
TEXT runtime·morestack_noctxt(SB),NOSPLIT,$0
 MOVL $0, DX
 JMP runtime·morestack(SB)
    
TEXT runtime·morestack(SB),NOSPLIT,$0-0
    ...

 // 将当前的调度信息保存到g.sched字段中,具体是将PC、SP、
 // BP寄存器的值和当前g的地址值保存到内存中
 MOVQ 0(SP), AX // f's PC
 MOVQ AX, (g_sched+gobuf_pc)(SI)
 MOVQ SI, (g_sched+gobuf_g)(SI)
 LEAQ 8(SP), AX // f's SP
 MOVQ AX, (g_sched+gobuf_sp)(SI)
 MOVQ BP, (g_sched+gobuf_bp)(SI)
 MOVQ DX, (g_sched+gobuf_ctxt)(SI)

 // 下面开始从当前的g切换到g0
 // 将m.g0的地址拷贝到BX寄存器中
 MOVQ m_g0(BX), BX
 // 将寄存器BX中的值拷贝到tls中,即设置tls中的g为g0
 MOVQ BX, g(CX)
 // 把g0栈的栈顶寄存器的值恢复到CPU的寄存器SP中,顺利的切换到g0栈
 MOVQ (g_sched+gobuf_sp)(BX), SP
 // 调用newstack函数
 CALL runtime·newstack(SB)
 CALL runtime·abort(SB) // crash if newstack returns
 RET

newstack函数主要完成两项工作:一是检查是否响应sysmon发起的抢占请求,二是检查栈是否需要进行扩容,下面只抽取了与响应抢占相关的代码。newstack检查被抢占g的状态,如果处于抢占状态,调用gopreempt_m将被抢占的gp切换出去放入到全局g运行队列。gopreempt_m是goschedImpl的简单包装,真正处理逻辑在goschedImpl函数,此函数在前面的主动调度中已分析过了,详情见前面的分析。

代码语言:javascript
复制
func newstack() {
 // thisg为g0
 thisg := getg()
 ...
 // gp为用户程序g,就是需要进行抢占或者需要扩容的g
 gp := thisg.m.curg
    ...
 // 设置了抢占
 if preempt {
  // 检查被抢占g的状态,如果不是真正处于抢占状态
  if !canPreemptM(thisg.m) {
   // 将stackguard0的值恢复为原来的正常值,表示已经处理过该抢占请求了
   gp.stackguard0 = gp.stack.lo + _StackGuard
   // 实际并不需要进行抢占,调用gogo函数继续运行当前的用户程序g
   gogo(&gp.sched) 
  }
 }
   ...
 if preempt {
     ...
  // 调用gopreempt_m将被抢占的gp切换出去放入到全局g运行队列
  gopreempt_m(gp) // never return
 }
    
 // 栈扩容的逻辑
 ...
}

func gopreempt_m(gp *g) {
 if trace.enabled {
  traceGoPreempt()
 }
 // 调用goschedImpl将gp的状态从_Grunning修改为_Grunnable,并把gp放入到全局g运行队列
 goschedImpl(gp)
}

p在_Psyscall状态,说明与p关联的m上的g正在进行系统调用。只要下面的三个条件有任何一个不满足,就会对处于_Psyscall状态的p进行抢占。

  1. p的运行队列中还有等待运行的g,需要及时的让p的本地队列中的g得到调度,而当前的g又处在系统调用中,无法执行其他的g,因此将当前的p抢占过来运行其他的g.
  2. 当前没有空闲的p(sched.npidle为0),也没有自旋的m(sched.nmspinning为0),说明大家都在忙,这时抢占当前正处于系统调用中而实际上用不上的p,分配给其他线程用于调度执行其他的g.
  3. 从上一次监控线程留意到p对应的m处于系统调用中到现在已经超过了10毫秒,说明系统调用已花费了比较长的时间,需要进行抢占。使得retake函数的返回值不等于0,让sysmon线程保持20微妙的轮询检查周期,提高监控的实时性。

当上述三个条件有任何一个不满足时,会将p的状态从_Psyscall修改为_Pidle,然后调用handoffp函数,将与进入系统调用的m绑定的p分离。handoff会对当前的条件进行检查,如果满足下面的条件则会调用startm函数启动新的工作线程来与当前的p进行关联,执行可运行的g.具体条件有以下几种:

  1. _p_的本地队列或全局队列中有运行的g,立即启动工作线程对g进行调度
  2. 当前处于在垃圾回收标记阶段,启动线程运行gc work
  3. 当前没有进行自旋的m也没有空闲的p,启动一个线程m进行自旋
  4. 这是最后一个运行的p并且没有人在轮询网络,则需要唤醒另一个m来轮询网络

最后,如果上述三个条件都满足,说明当前比较空闲,将p放入到P的全局空闲队列中即可。

代码语言:javascript
复制
func handoffp(_p_ *p) {
 // 如果_p_的本地队列或全局队列中有运行的g,立即启动工作线程对g进行调度
 if !runqempty(_p_) || sched.runqsize != 0 {
  startm(_p_, false)
  return
 }
 
 // 如果处于在垃圾回收标记阶段,启动线程运行gc work goroutine
 if gcBlackenEnabled != 0 && gcMarkWorkAvailable(_p_) {
  startm(_p_, false)
  return
 }

 // 走到这里,说明_p_的本地队列和全局队列都没有可运行的g,也没有赋值垃圾回收标记的g需要运行
 // 如果当前没有进行自旋的m也没有空闲的p,启动一个线程m进行自旋
 if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 && atomic.Cas(&sched.nmspinning, 0, 1) { 
  startm(_p_, true)
  return
 }
 lock(&sched.lock)
 // 当前在gc阶段,需要STW,将_p_的状态切换到__Pgcstop
 // 暂停_p_提供g资源
 if sched.gcwaiting != 0 {
  _p_.status = _Pgcstop
  sched.stopwait--
  if sched.stopwait == 0 {
   notewakeup(&sched.stopnote)
  }
  unlock(&sched.lock)
  return
 }
 // _p_.runSafePointFn被设置为1,需要调用安全点函数safePointFn处理一些gc相关的内容
 if _p_.runSafePointFn != 0 && atomic.Cas(&_p_.runSafePointFn, 1, 0) {
  sched.safePointFn(_p_)
  sched.safePointWait--
  if sched.safePointWait == 0 {
   notewakeup(&sched.safePointNote)
  }
 }
 // 如果全局队列中有g,启动一个线程执行g
 if sched.runqsize != 0 {
  unlock(&sched.lock)
  startm(_p_, false)
  return
 }
 
 // 如果这是最后一个运行的p并且没有人在轮询网络,则需要唤醒另一个m来轮询网络。
 if sched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) != 0 {
  unlock(&sched.lock)
  startm(_p_, false)
  return
 }
 if when := nobarrierWakeTime(_p_); when != 0 {
  wakeNetPoller(when)
 }
 // 将p放入全局空闲队列中
 pidleput(_p_)
 unlock(&sched.lock)
}

总结

协程是Go语言中非常精华的部分,我们知道Go语言可以轻松在单机上运行成千上万个协程,得益于调度器很好的设计.例如通过GMP模型实现了M的复用,引入P让各个处理器可以并行,充分发挥多核的优势,通过优先级队列保证调度的公平性等。小编在这次深入细致的学习调度器原理和实现中收获多多。

golang-scheduler-1-history[1]runtime-bootstrap[2]Go语言goroutine调度器初始化[3]Go语言调度器之调度main goroutine[4]深入golang runtime的调度[5]锲而不舍 —— M 是怎样找工作的[6]

Reference

[1]

golang-scheduler-1-history: http://lessisbetter.site/2019/03/10/golang-scheduler-1-history/

[2]

runtime-bootstrap: https://blog.altoros.com/golang-internals-part-5-runtime-bootstrap-process.html

[3]

Go语言goroutine调度器初始化: https://mp.weixin.qq.com/s?__biz=MzU1OTg5NDkzOA==&mid=2247483769&idx=1&sn=3d77609a293d87e64639afc8d2219e1c&scene=19#wechat_redirect

[4]

Go语言调度器之调度main goroutine: https://mp.weixin.qq.com/s?__biz=MzU1OTg5NDkzOA==&mid=2247483783&idx=1&sn=1128dbd7794d7d53c37abab94771a7d7&scene=19#wechat_redirect

[5]

深入golang runtime的调度: https://zboya.github.io/post/go_scheduler/

[6]

锲而不舍 —— M 是怎样找工作的: https://mp.weixin.qq.com/s/6sNtrdlKtwfJIvBA8UPnKg

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-09-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 数据小冰 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Go程序入口
  • 调度器的初始化
    • osinit
      • schedinit
        • main goroutine的创建与运行
          • mstart
          • 调度策略
            • 从全局队列中获取G
              • 从P的本地队列中获取G
                • 从其他P的本地队列中偷取G
                • execute
                • 调度时机
                  • 主动调度
                    • 被动调度
                      • 抢占调度
                      • 总结
                      • Reference
                      相关产品与服务
                      腾讯云代码分析
                      腾讯云代码分析(内部代号CodeDog)是集众多代码分析工具的云原生、分布式、高性能的代码综合分析跟踪管理平台,其主要功能是持续跟踪分析代码,观测项目代码质量,助力维护团队卓越代码文化。
                      领券
                      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档