前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Golang 并发 与 context标准库

Golang 并发 与 context标准库

作者头像
李海彬
发布2019-05-08 11:23:14
7570
发布2019-05-08 11:23:14
举报
文章被收录于专栏:Golang语言社区Golang语言社区

作者:Ovenvan 来源:简书

这篇文章将:介绍context工作机制;简单说明接口和结构体功能;通过简单Demo介绍外部API创建并使用context标准库;从源码角度分析context工作流程(不包括mutex的使用分析以及timerCtx计时源码)。

context是一个很好的解决多goroutine下通知传递和元数据的Go标准库。由于Go中的goroutine之间没有父子关系,因此也不存在子进程退出后的通知机制。多个goroutine协调工作涉及 通信,同步,通知,退出 四个方面:

通信:chan通道是各goroutine之间通信的基础。注意这里的通信主要指程序的数据通道。 同步:可以使用不带缓冲的chan;sync.WaitGroup为多个gorouting提供同步等待机制;mutex锁与读写锁机制。 通知:通知与上文通信的区别是,通知的作用为管理,控制流数据。一般的解决方法是在输入端绑定两个chan,通过select收敛处理。这个方案可以解决简单的问题,但不是一个通用的解决方案。 退出:简单的解决方案与通知类似,即增加一个单独的通道,借助chan和select的广播机制(close chan to broadcast)实现退出。

但由于Go之间的goroutine都是平等的,因此当遇到复杂的并发结构时处理退出机制则会显得力不从心。因此Go1.7版本开始提供了context标准库来解决这个问题。他提供两个功能:退出通知和元数据传递。他们都可以传递给整个goroutine调用树的每一个goroutine。同时这也是一个不太复杂的,适合初学Gopher学习的一段源码。

工作机制

第一个创建Context的goroutine被称为root节点:root节点负责创建一个实现Context接口的具体对象,并将该对象作为参数传递至新拉起的goroutine作为其上下文。下游goroutine继续封装该对象并以此类推向下传递。

interface

Context接口:作为一个基本接口,所有的Context对象都要实现该接口,并将其作为使用者调度时的参数类型:

代码语言:javascript
复制
1type Context interface{
2    Deadline()(deadline time.Time, ok bool)  
3//如果Context实现了超时控制,该方法返回 超时时间,true。否则ok为false
4    Done() <-chan struct{}
5//依旧使用<-chan struct{}来通知退出,供被调用的goroutine监听。
6    Err() error
7//当Done()返回的chan收到通知后,防卫Err()获知被取消的原因
8    Value(key interface{}) interface
9}

canceler接口:拓展接口,规定了取消通知的Context具体类型需要实现的接口:

代码语言:javascript
复制
1type canceler interface {
2    cancel(removeFromParent bool, err error)
3//通知后续创建的goroutine退出
4    Done() <-chan struct{}
5//作者对这个Done()方法的理解是多余的
6}

struct

emptyCtx:实现了一个不具备任何功能的Context接口,其存在的目的就是作为Context对象树的root节点:

代码语言:javascript
复制
 1type emptyCtx int
 2func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
 3    return
 4}
 5func (*emptyCtx) Done() <-chan struct{} {
 6    return nil
 7}
 8func (*emptyCtx) Err() error {
 9    return nil
10}
11func (*emptyCtx) Value(key interface{}) interface{} {
12    return nil
13}
14//......
15var (
16    background = new(emptyCtx)
17    todo       = new(emptyCtx)
18)
19func Background() Context {
20    return background
21}
22func TODO() Context {
23    return todo
24}
25//这两者返回值是一样的,文档上建议main函数可以使用Background()创建root context

cancelCtx:可以认为他与emptyCtx最大的区别在于,具体实现了cancel函数。即他可以向子goroutine传递cancel消息。 timerCtx:另一个实现Context接口的具体类型,内部封装了cancelCtx类型实例,同时拥有deadline变量,用于实现定时退出通知。 valueCtx:实现了Context接口的具体类型,内部分装cancelCtx类型实例,同时封装了一个kv存储变量,用于传递通知消息。

API

除了root context可以使用Background()创建以外,其余的context都应该从cancelCtx,timerCtx,valueCtx中选取一个来构建具体对象: func WithCancel(parent Context) (Context, CancelFunc):创建cancelCtx实例。 func WithDeadline(parent Context, deadline time.Time)(Context, CancelFunc)与func WithTimeout(parent Context, timeout time.Duration)(Context, CancelFunc):两种方法都可以创建一个带有超时通知的Context具体对象timerCtx,具体差别在于传递绝对或相对时间。 func WithValue(parent Context, key, val interface{}) Context:创建valueCtx实例。


1、创建root context并构建一个WithCancel类型的上下文,使用该上下文注册一个goroutine模拟运行:

代码语言:javascript
复制
 1func main(){
 2    ctxa, cancel := context.WithCancel(context.Background())
 3    go work(ctxa, "work1")
 4}
 5func work(ctx context.Context, name string){
 6    for{
 7        select{
 8        case <-ctx.Done():
 9            println(name," get message to quit")
10            return
11        default:
12            println(name," is running")
13            time.Sleep(time.Second)
14        }
15    }
16}

2、使用WithDeadline包装ctxa,并使用新的上下文注册另一个goroutine:

代码语言:javascript
复制
1func main(){
2    ctxb, _ := context.WithTimeout(ctxa, time.Second * 3)
3    go work(ctxb, "work2")
4}

3、使用WithValue包装ctxb,并注册新的goroutine:

代码语言:javascript
复制
 1func main(){
 2    ctxc := context.WithValue(ctxb, "key", "custom value")
 3    go workWithValue(ctxc, "work3")
 4}
 5func workWithValue(ctx context.Context, name string){
 6    for{
 7        select {
 8        case <-ctx.Done():
 9            println(name," get message to quit")
10            return
11        default:
12            value:=ctx.Value("key").(string)
13            println(name, " is running with value", value)
14            time.Sleep(time.Second)
15        }
16    }
17}

4、最后在main函数中手动关闭ctxa,并等待输出结果:

代码语言:javascript
复制
 1func main(){
 2    time.Sleep(5*time.Second)
 3    cancel()
 4    time.Sleep(time.Second)
 5}
 6
 7至此我们运行程序并查看输出结果:
 8work1  is running
 9work3  is running with value custom value
10work2  is running
11work1  is running
12work2  is running
13work3  is running with value custom value
14work2  is running
15work3  is running with value custom value
16work1  is running
17//work2超时并通知work3退出
18work2  get message to quit
19work3  get message to quit
20work1  is running
21work1  is running
22work1  get message to quit

可以看到,当ctxb因超时而退出之后,会通知由他包装的所有子goroutine(ctxc),并通知退出。各context的关系结构如下:

Background() -> ctxa -> ctxb -> ctxc

源码分析

我们主要研究两个问题,即各Context如何保存父类和子类上下文;以及cancel方法如何实现通知子类context实现退出功能。

context的数据结构

1、emptyCtx只是一个uint类型的变量,其目的只是为了作为第一个goroutine ctx的parent,因此他不需要,也没法保存子类上下文结构。

2、cancelCtx的数据结构:

代码语言:javascript
复制
1type cancelCtx struct {
2    Context
3
4    mu       sync.Mutex            // protects following fields
5    done     chan struct{}         // created lazily, closed by first cancel call
6    children map[canceler]struct{} // set to nil by the first cancel call
7    err      error                 // set to non-nil by the first cancel call
8}

Context接口保存的就是父类的context。children map[canceler]struct{}保存的是所有直属与这个context的子类context。done chan struct{}用于发送退出信号。 我们查看创建cancelCtx的APIfunc WithCancel(…)…:

代码语言:javascript
复制
1func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
2    c := newCancelCtx(parent)
3    propagateCancel(parent, &c)
4    return &c, func() { c.cancel(true, Canceled) }
5}
6func newCancelCtx(parent Context) cancelCtx {
7    return cancelCtx{Context: parent}
8}

propagateCancel函数的作用是将自己注册至parent context。我们稍后会讲解这个函数。

3、timerCtx的数据结构:

代码语言:javascript
复制
1type timerCtx struct {
2    cancelCtx
3    timer *time.Timer // Under cancelCtx.mu.
4
5    deadline time.Time
6}

timerCtx继承于cancelCtx,并为定时退出功能新增自己的数据结构。

代码语言:javascript
复制
 1func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
 2    if cur, ok := parent.Deadline(); ok && cur.Before(d) {
 3        // The current deadline is already sooner than the new one.
 4        return WithCancel(parent)
 5    }
 6    c := &timerCtx{
 7        cancelCtx: newCancelCtx(parent),
 8        deadline:  d,
 9    }
10    propagateCancel(parent, c)
11//以下内容与定时退出机制有关,在本文不作过多分析和解释
12    dur := time.Until(d)
13    if dur <= 0 {
14        c.cancel(true, DeadlineExceeded) // deadline has already passed
15        return c, func() { c.cancel(true, Canceled) }
16    }
17    c.mu.Lock()
18    defer c.mu.Unlock()
19    if c.err == nil {
20        c.timer = time.AfterFunc(dur, func() {
21            c.cancel(true, DeadlineExceeded)
22        })
23    }
24    return c, func() { c.cancel(true, Canceled) }
25}
26func newCancelCtx(parent Context) cancelCtx {
27    return cancelCtx{Context: parent}
28}

timerCtx查看parent context的方法是timerCtx.cancelCtx.Context。

4、valueCtx的数据结构:

代码语言:javascript
复制
1type valueCtx struct {
2    Context
3    key, val interface{}
4}

相较于timerCtx而言非常简单,没有继承于cancelCtx struct,而是直接继承于Context接口。

代码语言:javascript
复制
1func WithValue(parent Context, key, val interface{}) Context {
2    if key == nil {
3        panic("nil key")
4    }
5    if !reflect.TypeOf(key).Comparable() {
6        panic("key is not comparable")
7    }
8    return &valueCtx{parent, key, val}
9}

辅助函数

这里我们会有两个疑问,第一,valueCtx为什么没有propagateCancel函数向parent context注册自己。既然没有注册,为何ctxb超时后能通知ctxc一起退出。第二,valueCtx是如何存储children和parent context结构的。相较于同样绑定Context接口的cancelCtx,valueCtx并没有children数据。 第二个问题能解决一半第一个问题,即为何不向parent context注册。先说结论:valueCtx的children context注册在valueCtx的parent context上。函数func propagateCancel(…)负责注册信息,我们先看一下他的构造:

func propagateCancel

代码语言:javascript
复制
 1func propagateCancel(parent Context, child canceler) {
 2    if parent.Done() == nil {
 3        return // parent is never canceled
 4    }
 5    if p, ok := parentCancelCtx(parent); ok {
 6        p.mu.Lock()
 7        if p.err != nil {
 8            // parent has already been canceled
 9            child.cancel(false, p.err)
10        } else {
11            if p.children == nil {
12                p.children = make(map[canceler]struct{})
13            }
14            p.children[child] = struct{}{}
15        }
16        p.mu.Unlock()
17    } else {
18        go func() {
19            select {
20            case <-parent.Done():
21                child.cancel(false, parent.Err())
22            case <-child.Done():
23            }
24        }()
25    }
26}

这个函数的主要逻辑如下:接收parent context 和 child canceler方法,若parent为emptyCtx,则不注册;否则通过funcparentCancelCtx寻找最近的一个*cancelCtx;若该cancelCtx已经结束,则调用child的cancel方法,否则向该cancelCtx注册child。

func parentCancelCtx

代码语言:javascript
复制
 1func parentCancelCtx(parent Context) (*cancelCtx, bool) {
 2    for {
 3        switch c := parent.(type) {
 4        case *cancelCtx:
 5            return c, true
 6        case *timerCtx:
 7            return &c.cancelCtx, true
 8        case *valueCtx:
 9            parent = c.Context
10        default:
11            return nil, false
12        }
13    }
14}

func parentCancelCtx从parentCtx中向上迭代寻找第一个cancelCtx并返回。从函数逻辑中可以看到,只有当parent.(type)为*valueCtx的时候,parent才会向上迭代而不是立即返回。否则该函数都是直接返回或返回经过包装的*cancelCtx。因此我们可以发现,valueCtx是依赖于parentCtx的*cancelCtx结构的。

至于第二个问题,事实上,parentCtx根本无需,也没有办法通过Done()方法通知valueCtx,valueCtx也没有额外实现Done()方法。可以理解为:valueCtx与parentCtx公用一个done channel,当parentCtx调用了cancel方法并关闭了done channel时,监听valueCtx的done channel的goroutine同样会收到退出信号。另外,当parentCtx没有实现cancel方法(如emptyCtx)时,可以认为valueCtx也是无法cancel的。

func (c *cancelCtx) cancel

代码语言:javascript
复制
 1func (c *cancelCtx) cancel(removeFromParent bool, err error) {
 2    if err == nil {
 3        panic("context: internal error: missing cancel error")
 4    }
 5    c.mu.Lock()
 6    if c.err != nil {
 7        c.mu.Unlock()
 8        return // already canceled
 9    }
10    c.err = err
11    if c.done == nil {
12        c.done = closedchan
13    } else {
14        close(c.done)
15    }
16    for child := range c.children {
17        child.cancel(false, err)
18    }
19    c.children = nil
20    c.mu.Unlock()
21
22    if removeFromParent {
23        removeChild(c.Context, c)
24    }
25}

该方法的主要逻辑如下:若外部err为空,则代表这是一个非法的cancel操作,抛出panic;若cancelCtx内部err不为空,说明该Ctx已经执行过cancel操作,直接返回;关闭done channel,关联该Ctx的goroutine收到退出通知;遍历children,若有的话,执行child.cancel操作;调用removeChild将自己从parent context中移除。

func (c *timerCtx) cancel

与cancelCtx十分类似,不作过多阐述。


版权申明:内容来源网络,版权归原创者所有。除非无法确认,我们都会标明作者及出处,如有侵权烦请告知,我们会立即删除并表示歉意。谢谢。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-03-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Golang语言社区 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 工作机制
  • interface
  • struct
  • API
  • 源码分析
  • context的数据结构
  • 辅助函数
  • func propagateCancel
  • func parentCancelCtx
  • func (c *cancelCtx) cancel
  • func (c *timerCtx) cancel
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档