go1.7才引入context,译作“上下文”,实际也叫goroutine 的上下文,包含 goroutine 的运行状态、环境、现场等信息、context 主要用来在 goroutine 之间传递上下文信息,包括:取消信号、超时时间、截止时间、k-v 等。与WaitGroup最大的不同点是context对于派生goroutine有更强的控制力,它可以控制多级的goroutine
随着 context 包的引入,标准库中很多接口加上了 context 参数,例如 database/sql 包、http包。context 几乎成为了并发控制和超时控制的标准做法,由于context的源码里用到了大量的mutex锁用于保护子级的context,所以它是并发安全的
context接口只定义了4中方法
type Context interface {
Deadline() (deadline time.Time, ok bool)
Done() <-chan struct{}
Err() error
Value(key interface{}) interface{}
}
case <-context.Done():
整体类图
type emptyCtx int
context包中定义了一个空的context, 名为emptyCtx,用于context的根节点,空的context只是简单的实现了Context,本身不包含任何值,仅用于其他context的父节点
func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
return
}
func (*emptyCtx) Done() <-chan struct{} {
return nil
}
func (*emptyCtx) Err() error {
return nil
}
func (*emptyCtx) Value(key interface{}) interface{} {
return nil
}
func (e *emptyCtx) String() string {
switch e {
case background:
return "context.Background"
case todo:
return "context.TODO"
}
return "unknown empty Context"
}
var (
background = new(emptyCtx)
todo = new(emptyCtx)
)
func Background() Context {
return background
}
func TODO() Context {
return todo
}
emptyCtx是一个int类型的变量,但实现了context的接口。emptyCtx没有超时时间,不能取消,也不能存储任何额外信息,所以emptyCtx用来作为context树的根节点
这是一个可以取消的context
type canceler interface {
cancel(removeFromParent bool, err error)
Done() <-chan struct{}
}
type cancelCtx struct {
Context
mu sync.Mutex
done atomic.Value
children map[canceler]struct{}
err error
}
cancelCtx将接口 Context 作为它的一个匿名字段,因此可以被看成是一个 Context,同时cancelCtx实现了 canceler 接口。children中记录了由此context派生的所有child,此context被cancel时会把其中的所有child都cancel掉,cancelCtx与deadline和value无关
func (c *cancelCtx) Done() <-chan struct{} {
d := c.done.Load()
if d != nil {
return d.(chan struct{})
}
c.mu.Lock()
defer c.mu.Unlock()
d = c.done.Load()
if d == nil {
d = make(chan struct{})
c.done.Store(d)
}
return d.(chan struct{})
}
func (c *cancelCtx) Err() error {
c.mu.Lock()
err := c.err
c.mu.Unlock()
return err
}
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
if err == nil {
panic("context: internal error: missing cancel error")
}
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
return // already canceled
}
c.err = err
d, _ := c.done.Load().(chan struct{})
if d == nil {
c.done.Store(closedchan)
} else {
close(d)
}
for child := range c.children {
// NOTE: acquiring the child's lock while holding parent's lock.
child.cancel(false, err)
}
c.children = nil
c.mu.Unlock()
if removeFromParent {
removeChild(c.Context, c)
}
}
有一个WithCancel方法,会暴露给写代码的人调用:
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
if parent == nil {
panic("cannot create context from nil parent")
}
c := newCancelCtx(parent) //①
propagateCancel(parent, &c) //②
return &c, func() { c.cancel(true, Canceled) } //③
}
具体实现分三步:①初始化一个cancelCtx实例,②如果父节点也可以被cancel,将cancelCtx实例添加到其父节点的children中,③返回cancelCtx实例和cancel方法
第②步调用的函数propagateCancel代码逻辑可以细分为3步:
func propagateCancel(parent Context, child canceler) {
done := parent.Done()
if done == nil {
return // parent is never canceled
}
select {
case <-done:
// parent is already canceled
child.cancel(false, parent.Err())
return
default:
}
if p, ok := parentCancelCtx(parent); ok {
p.mu.Lock()
if p.err != nil {
// parent has already been canceled
child.cancel(false, p.err)
} else {
if p.children == nil {
p.children = make(map[canceler]struct{})
}
p.children[child] = struct{}{}
}
p.mu.Unlock()
} else {
atomic.AddInt32(&goroutines, +1)
go func() {
select {
case <-parent.Done():
child.cancel(false, parent.Err())
case <-child.Done():
}
}()
}
}
type timerCtx struct {
cancelCtx
timer *time.Timer
deadline time.Time
}
func (c *timerCtx) Deadline() (deadline time.Time, ok bool) {
return c.deadline, true
}
func (c *timerCtx) cancel(removeFromParent bool, err error) {
c.cancelCtx.cancel(false, err)
if removeFromParent {
// Remove this timerCtx from its parent cancelCtx's children.
removeChild(c.cancelCtx.Context, c)
}
c.mu.Lock()
if c.timer != nil {
c.timer.Stop()
c.timer = nil
}
c.mu.Unlock()
}
timerCtx是基于cancelCtx的,此外多了timer和deadline,timer指最长存活时间,比如将在多少秒后结束,deadline表示最后期限,需要指定具体的截止日期,由此衍生出了WithDeadline()和WithTimeout()函数
type valueCtx struct {
Context
key, val interface{}
}
valueCtx在Context基础上增加了一个key-value对,用于在各级协程间传递一些数据,valueCtx既不需要cancel,也不需要deadline,只需要实现Value()接口
func (c *valueCtx) Value(key interface{}) interface{} {
if c.key == key {
return c.val
}
return c.Context.Value(key)
}
因此有了WithVaule函数
func WithValue(parent Context, key, val interface{}) Context {
if parent == nil {
panic("cannot create context from nil parent")
}
if key == nil {
panic("nil key")
}
if !reflectlite.TypeOf(key).Comparable() {
panic("key is not comparable")
}
return &valueCtx{parent, key, val}
}
WithVaule可以用来设置键值对
从这些源代码中可以看出使用了大量的锁,保证了执行过程中的并发安全
下面这个例子的流程图如下
假设一个例子,genGreeting在放弃调用locale之前等待一秒——超时时间为1秒,如果printGreeting不成功,就取消对printFare的调用
可以看到系统输出工作正常,由于local设置至少需要运行一分钟,因此genGreeting将始终超时,这意味着main会始终取消printFarewell下面的调用链,
func main() {
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wg.Add(1)
go func() {
defer wg.Done()
if err := printGreeting(ctx); err != nil {
fmt.Printf("cannot print greeting: %v\n", err)
cancel()
}
}()
wg.Add(1)
go func() {
defer wg.Done()
if err := printFarewell(ctx); err != nil {
fmt.Printf("cannot print farewell: %v\n", err)
}
}()
wg.Wait()
}
func printGreeting(ctx context.Context) error {
greeting, err := genGreeting(ctx)
if err != nil {
return err
}
fmt.Printf("%s world!\n", greeting)
return nil
}
func printFarewell(ctx context.Context) error {
farewell, err := genFarewell(ctx)
if err != nil {
return err
}
fmt.Printf("%s world!\n", farewell)
return nil
}
func genGreeting(ctx context.Context) (string, error) {
ctx, cancel := context.WithTimeout(ctx, 1*time.Second) //只等1秒钟就取消
defer cancel()
switch locale, err := locale(ctx); {
case err != nil:
return "", err
case locale == "EN/US":
return "hello", nil
}
return "", fmt.Errorf("unsupported locale")
}
func genFarewell(ctx context.Context) (string, error) {
switch locale, err := locale(ctx); {
case err != nil:
return "", err
case locale == "EN/US":
return "goodbye", nil
}
return "", fmt.Errorf("unsupported locale")
}
func locale(ctx context.Context) (string, error) {
select {
case <-ctx.Done():
return "", ctx.Err()
case <-time.After(1 * time.Minute): //等待一分钟后执行
}
return "EN/US", nil
}
//结果
cannot print greeting: context deadline exceeded
cannot print farewell: context canceled
可以在这个程序上进一步改进:因为已知locale需要大约一分钟的时间才能运行,所以可以在locale中检查是否给出了deadline,如果给出了,则返回一个context包预设的错误——DeadlineExceeded
可以看到最终结果是一样的,但是会马上得出执行结果,而不会被阻塞1秒钟
func main() {
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wg.Add(1)
go func() {
defer wg.Done()
if err := printGreeting(ctx); err != nil {
fmt.Printf("cannot print greeting: %v\n", err)
cancel()
}
}()
wg.Add(1)
go func() {
defer wg.Done()
if err := printFarewell(ctx); err != nil {
fmt.Printf("cannot print farewell: %v\n", err)
}
}()
wg.Wait()
}
func printGreeting(ctx context.Context) error {
greeting, err := genGreeting(ctx)
if err != nil {
return err
}
fmt.Printf("%s world!\n", greeting)
return nil
}
func printFarewell(ctx context.Context) error {
farewell, err := genFarewell(ctx)
if err != nil {
return err
}
fmt.Printf("%s world!\n", farewell)
return nil
}
func genGreeting(ctx context.Context) (string, error) {
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
switch locale, err := locale(ctx); {
case err != nil:
return "", err
case locale == "EN/US":
return "hello", nil
}
return "", fmt.Errorf("unsupported locale")
}
func genFarewell(ctx context.Context) (string, error) {
switch locale, err := locale(ctx); {
case err != nil:
return "", err
case locale == "EN/US":
return "goodbye", nil
}
return "", fmt.Errorf("unsupported locale")
}
func locale(ctx context.Context) (string, error) {
if deadline, ok := ctx.Deadline(); ok { //1
if deadline.Sub(time.Now().Add(1*time.Minute)) <= 0 {
return "", context.DeadlineExceeded
}
}
select {
case <-ctx.Done():
return "", ctx.Err()
case <-time.After(1 * time.Minute):
}
return "EN/US", nil
}
//结果
cannot print greeting: context deadline exceeded
cannot print farewell: context canceled
在并发程序中,由于超时、取消操作或者一些异常情况,往往需要进行抢占操作或者中断后续操作,如下例可以采用channel的方式控制,这里采用主协程main控制通道的关闭,子协程监听done,一旦主协程关闭了channel,那么子协程就可以退出了
因为这个例子还不复杂,所以用通道控制感觉还可以,但是当有多个主协程和多个子协程时,就要定义多个done channel,这将变得非常混乱
func main() {
messages := make(chan int, 10)
done := make(chan bool)
defer close(messages)
// consumer
go func() {
ticker := time.NewTicker(1 * time.Second)
for _ = range ticker.C {
select {
case <-done:
fmt.Println("child process interrupt...")
return
default:
fmt.Printf("send message: %d\n", <-messages)
}
}
}()
// producer
for i := 0; i < 10; i++ {
messages <- i
}
time.Sleep(5 * time.Second)
close(done)
time.Sleep(1 * time.Second)
fmt.Println("main process exit!")
}
//结果
send message: 0
send message: 1
send message: 2
send message: 3
send message: 4
child process interrupt...
main process exit!
可以试试用context改写解决这个问题,如下例,只要让子线程监听主线程传入的ctx
,一旦ctx.Done()
返回空channel
,子线程即可取消执行任务
func main() {
messages := make(chan int, 10)
// producer
for i := 0; i < 10; i++ {
messages <- i
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
// consumer
go func(ctx context.Context) {
ticker := time.NewTicker(1 * time.Second)
for _ = range ticker.C {
select {
case <-ctx.Done():
fmt.Println("child process interrupt...")
return
default:
fmt.Printf("send message: %d\n", <-messages)
}
}
}(ctx)
defer close(messages)
defer cancel()
select {
case <-ctx.Done():
time.Sleep(1 * time.Second)
fmt.Println("main process exit!")
}
}
//结果
send message: 0
send message: 1
send message: 2
send message: 3
send message: 4
child process interrupt...
main process exit!
https://juejin.cn/post/6844904070667321357#heading-14 https://www.topgoer.cn/docs/goquestions/goquestions-1cjh3l2qioavp https://www.topgoer.cn/docs/gozhuanjia/chapter055.3-context https://www.topgoer.cn/docs/concurrency/concurrency-1clit4ftnpbh3