并发是指程序的逻辑结构. 非并发程序只有一个逻辑控制流的顺序执行程序, 在任何时刻, 程序只会处于在这个逻辑流的某个位置. 如果一个程序有多个独立的逻辑控制流, 那么就说这个程序是并发的. 也就是说, 如果把逻辑控制流绘制为时序流程图, 那么允许出现重叠的区间.
并发可以通过以下方式做到:
并行是指程序的运行状态. 如果一个程序在某一时刻可以被多个CPU流水线同时处理, 那么我们就说这个程序是以并行的方式运行. 显然并行是需要硬件支持.
并行可以通过以下方式做到:
一般意义上的线程模型根据用户态和内核态之间的对应关系一般分为三类:
Go Scheduler是参考CSP(Communicating Sequential Process)模型, 使用Goroutine并行执行任务, 并将Channel作为Goroutine之间的通信方式. 虽然使用互斥锁和共享内存也可以在Go中完成Goroutine之间的通信, 但是使用Channel更为推荐.
Don’t communicate by sharing memory, share memory by communicating.
Go Scheduler采用M:N两级线程模型, 即GMP模型.
Go语言中并发的执行单元就是Goroutine, 类似于操作系统中的线程, 但是占用了更小的内存空间, 同时降低了Goroutine切换的开销.
type g struct {
m *m // current m; offset known to arm liblink
sched gobuf
syscallsp uintptr // if status==Gsyscall, syscallsp = sched.sp to use during gc
syscallpc uintptr // if status==Gsyscall, syscallpc = sched.pc to use during gc
param unsafe.Pointer // passed parameter on wakeup
atomicstatus uint32
goid int64
schedlink guintptr
waitsince int64 // approx time when the g become blocked
waitreason waitReason // if status==Gwaiting
preempt bool // preemption signal, duplicates stackguard0 = stackpreempt
lockedm muintptr
writebuf []byte
sigcode0 uintptr
sigcode1 uintptr
sigpc uintptr
gopc uintptr // pc of go statement that created this goroutine
startpc uintptr // pc of goroutine function
waiting *sudog // sudog structures this g is waiting on (that have a valid elem ptr); in lock order
}
结构体g的字段atomicsatatus就存储了Goroutine的状态:
_Gidle | 刚刚被分配并且还没有被初始化 |
_Grunnable | 没有执行代码, 没有栈的所有权, 存储在运行队列中 |
_Grunning | 可以执行代码, 拥有栈的所有权, 被赋予了内核线程 M 和处理器 P |
_Gsyscall | 正在执行系统调用, 拥有栈的所有权, 没有执行用户代码, 被赋予了内核线程 M 但是不在运行队列上 |
_Gwaiting | 由于运行时而被阻塞, 没有执行用户代码并且不在运行队列上, 但是可能存在于 Channel 的等待队列上 |
_Gdead | 没有被使用, 没有执行代码, 可能有分配的栈 |
_Gcopystack | 栈正在被拷贝, 没有执行代码, 不在运行队列上 |
最终我们可以将Goroutine的运行期总结为三种状态:
Go并发模型中M其实表示的是操作系统线程, 默认情况下调度器最多能创建10000个线程, 但是其中绝大多数线程都不会执行用户代码, 最多只有GOMAXPROCS个线程M能够正常运行.
在默认情况下, 一个四核机器会创建四个操作系统线程, 每个线程其实都是一个m结构体, 我们也可以通过runtime.GOMAXPROCS改变最大可运行线程的个数.
在大多数情况下, 我们都会使用Go的默认配置, 也就是#thread = #CPU, 在这种情况下不会触发操作系统级别的线程调度和上下文切换, 所有的调度都会发生在用户态, 由Go语言调度器触发, 能够减少非常多的额外开销.
type m struct {
g0 *g // goroutine with scheduling stack
curg *g // current running goroutine
...
}
g0是持有调度堆栈的Goroutine, curg是当前线程上运行的Goroutine, 这也是作为操作系统唯二关心的Goroutine了.
P是线程需要的上下文环境, 也是用于处理代码逻辑的处理器, 通过处理器的调度, 每个内核线程M都能够执行多个G, 这样就能在G进行一些IO操作的时候及时对他们进行切换, 提高CPU的利用率.
每个Go程序中之所以处理器的数量一定会等于GOMAXPROCS, 是因为调度器在启动时就会创建GOMAXPROCS个处理器P, 这些处理器会绑定到不同的线程M上并为他们调度Goroutine.
type p struct {
id int32
status uint32 // one of pidle/prunning/...
link puintptr
schedtick uint32 // incremented on every scheduler call
syscalltick uint32 // incremented on every system call
sysmontick sysmontick // last tick observed by sysmon
m muintptr // back-link to associated m (nil if idle)
mcache *mcache
runqhead uint32
runqtail uint32
runq [256]guintptr
runnext guintptr
sudogcache []*sudog
sudogbuf [128]*sudog
...v
}
p结构体中的状态status会是以下五种状态之一:
_Pidle | 处理器没有运行用户代码或者调度器, 被空闲队列或者改变其状态的结构持有, 运行队列为空 |
_Prunning | 被线程 M 持有, 并且正在执行用户代码或者调度器 |
_Psyscall | 没有执行用户代码, 当前线程陷入系统调用 |
_Pgcstop | 被线程 M 持有, 当前处理器由于垃圾回收被停止 |
_Pdead | 当前处理器已经不被使用 |
如图所示, 我们现在有两个线程(M), 每个都持有一个上下文环境(P), 同时运行着一个协程(G). 为了执行若干协程指令, 每个线程必须持有一个上下文环境.
灰色的协程表示状态为等待中, 即没有执行代码, 没有栈的所有权, 等待在运行队列中. 当Go程序中执行到go表达式时, 协程会被添加到运行队列的队尾. 当上下文环境到达调度点, 会从运行队列队首弹出一个协程, 为其设置堆栈, 指令指针, 开始执行其指令.
为了减少加锁竞态, 每个上下文环境都有一个自己的运行队列. 在早期, Go scheduler共享一个全局运行队列. 线程需要频繁加锁去获取等待执行的协程. 因此当拥有多核的机器时, 当时的goroutine的性能并不理想.
为什么我们不舍弃上下文(P), 将运行队列直接挂载到线程. 原因在于当执行线程囿于一些原因阻塞时, 我们可以通过保有上下文环境携带运行队列挂载到其他线程下.
一种阻塞的原因是当我们调用系统调用, 因为线程无法同时既执行指令又被系统调用阻塞, 我们需要卸载上下文环境, 令其可以继续保持调度状态.
Go scheduler保证环境中存在足够的线程运行所有的上下文环境. 以上图举例, M1是一个或新创建或从线程池取出的一个线程. 这个系统调用的线程M0会继续持有协程G0完成系统调用.
当系统调用结束, 线程M0会尝试获得上下文环境来运行协程. 通常他会从其他线程中窃取一个上下文环境, 如果窃取失败, 线程会将这个协程G0放到全局的运行队列, 并把自己放回线程池, 切换到休眠状态.
上下文环境发现本地运行队列为空时, 会从这个全局队列中拉取协程. 上下文环境同时也会周期性的检查全局运行队列, 避免有协程出现饿死的情况.
需要注意的是, 如果协程调用网络I/O, 会从线程中脱离出来并放入运行时集成的网络poller中, 当poller表示网络的读写操作完成后, 这个协程会回到上下文环境并完成后续工作.
某个上下文环境执行完了本地运行队列的所有协程后, 会导致若干本地运行队列的不平衡. 因此该上下文环境会首先从全局运行队列中获取协程, 如果没有则会尝试从其他上下文环境中拿走一半的协程.
横轴: 协程数量级. 纵轴: 耗时(ns/op). 红线: 创建单个协程所需要的时间. 绿色: 切换单个协程所需要的时间.
由上图可以看出, 在1e3到1e5的规模下, 协程的创建和切换时间开销都较为稳定:
创建单个协程: ~1500ns/op. 切换单个协程: ~500ns/op.
由上图可以看出, 在1e5到1e6到区间, 单个协程的内存占用都在2500B左右.
// memory usage
package main
import (
"flag"
"fmt"
"os"
"runtime"
"time"
)
var n = flag.Int("n", 1e5, "Number of goroutines to create")
var ch = make(chan byte)
var counter = 0
func f() {
counter++
<-ch // Block this goroutine
}
func main() {
flag.Parse()
if *n <= 0 {
fmt.Fprintf(os.Stderr, "invalid number of goroutines")
os.Exit(1)
}
// Limit the number of spare OS threads to just 1
runtime.GOMAXPROCS(1)
// Make a copy of MemStats
var m0 runtime.MemStats
runtime.ReadMemStats(&m0)
t0 := time.Now().UnixNano()
for i := 0; i < *n; i++ {
go f()
}
runtime.Gosched()
t1 := time.Now().UnixNano()
runtime.GC()
// Make a copy of MemStats
var m1 runtime.MemStats
runtime.ReadMemStats(&m1)
if counter != *n {
fmt.Fprintf(os.Stderr, "failed to begin execution of all goroutines")
os.Exit(1)
}
fmt.Printf("Number of goroutines: %d\n", *n)
fmt.Printf("Per goroutine:\n")
fmt.Printf(" Memory: %.2f bytes\n", float64(m1.Sys-m0.Sys)/float64(*n))
fmt.Printf(" Time: %f µs\n", float64(t1-t0)/float64(*n)/1e3)
}
// time consume
package main
import (
"fmt"
"time"
)
func runCallback(in, out chan int64) {
for n, ok := <-in; ok; n, ok = <-in {
out <- n
}
}
func runTest(round int, coroutineNum, switchTimes int64) {
fmt.Printf("##### Round: %v\n", round)
start := time.Now()
channelsIn, channelsOut := make([]chan int64, coroutineNum), make([]chan int64, coroutineNum)
for i := int64(0); i < coroutineNum; i++ {
channelsIn[i] = make(chan int64, 1)
channelsOut[i] = make(chan int64, 1)
}
end := time.Now()
fmt.Printf("Create %v goroutines and channels cost %v ns, avg %v ns\n", coroutineNum, end.Sub(start).Nanoseconds(), end.Sub(start).Nanoseconds()/coroutineNum)
start = time.Now()
for i := int64(0); i < coroutineNum; i++ {
go runCallback(channelsIn[i], channelsOut[i])
}
end = time.Now()
fmt.Printf("Start %v goroutines and channels cost %v ns, avg %v ns\n", coroutineNum, end.Sub(start).Nanoseconds(), end.Sub(start).Nanoseconds()/coroutineNum)
var sum int64 = 0
start = time.Now()
for i := int64(0); i < switchTimes; i++ {
for j := int64(0); j < coroutineNum; j++ {
channelsIn[j] <- 1
sum += <-channelsOut[j]
}
}
end = time.Now()
fmt.Printf("Switch %v goroutines for %v times cost %v ns, avg %v ns\n", coroutineNum, sum, end.Sub(start).Nanoseconds(), end.Sub(start).Nanoseconds()/sum)
start = time.Now()
for i := int64(0); i < coroutineNum; i++ {
close(channelsIn[i])
close(channelsOut[i])
}
end = time.Now()
fmt.Printf("Close %v goroutines cost %v ns, avg %v ns\n", coroutineNum, end.Sub(start).Nanoseconds(), end.Sub(start).Nanoseconds()/coroutineNum)
}
func main() {
// runTest(1, 1000, 1000)
// runTest(2, 5000, 200)
// runTest(3, 10000, 100)
// runTest(4, 50000, 50)
// runTest(5, 100000, 10)
runTest(6, 500000, 5)
}