前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Go 语言并发编程系列(十四)—— 通过 context 包实现多协程之间的协作

Go 语言并发编程系列(十四)—— 通过 context 包实现多协程之间的协作

作者头像
学院君
发布2019-09-30 11:10:02
7440
发布2019-09-30 11:10:02
举报
文章被收录于专栏:学院君的专栏

上篇教程学院君介绍了如何通过 sync.WaitGroup 类型优化通道对多协程协调的处理,但是现在有一个问题,就是我们在启动子协程之前都已经明确知道子协程的总量,如果不知道的话,该怎么实现呢?

一种解决方案是通过 sync.WaitGroup 分批启动子协程,具体实现代码如下:

代码语言:javascript
复制
package main

import (
    "fmt"
    "sync"
)

func addNum(a, b int, deferFunc func())  {
    defer func() {
        deferFunc()
    }()
    c := a + b
    fmt.Printf("%d + %d = %d\n", a, b, c)
}

func main() {
    total := 10
    step := 2
    fmt.Println("启动子协程...")
    var wg sync.WaitGroup
    for i := 0; i < total; i = i + step {
        wg.Add(step)
        for j := 0; j < step; j++ {
            go addNum(i + j, 1, wg.Done)
        }
        wg.Wait()
    }
    fmt.Println("所有子协程执行完毕.")
}

这里我们采用分批次启动子协程的方法,每次通过 wg.Add() 函数设置当前批次启动的子协程数量,另外需要注意的是 wg.Wait() 函数最好和 wg.Add() 函数配对使用,否则可能会引起 panic。

除此之外,我们还可以通过另一种工具实现类似需求,这就是我们今天要介绍的 context 包,这个包为我们提供了以下方法和类型:

我们可以先通过 withXXX 方法返回一个从父 Context 拷贝的新的可撤销子 Context 对象和对应撤销函数 CancelFuncCancelFunc 是一个函数类型,调用它时会撤销对应的子 Context 对象,当满足某种条件时,我们可以通过调用该函数结束所有子协程的运行,主协程在接收到信号后可以继续往后执行。

这么说有点迷糊,下面我们结合示例代码来解释这个包的具体使用:

代码语言:javascript
复制
package main

import (
    "context"
    "fmt"
    "sync/atomic"
    "time"
)

func AddNum(a *int32, b int, deferFunc func())  {
    defer func() {
        deferFunc()
    }()
    for i := 0; ; i++ {
        curNum := atomic.LoadInt32(a)
        newNum := curNum + 1
        time.Sleep(time.Millisecond * 200)
        if atomic.CompareAndSwapInt32(a, curNum, newNum) {
            fmt.Printf("number当前值: %d [%d-%d]\n", *a, b, i)
            break
        } else {
            //fmt.Printf("The CAS operation failed. [%d-%d]\n", b, i)
        }
    }
}

func main() {
    total := 10
    var num int32
    fmt.Printf("number初始值: %d\n", num)
    fmt.Println("启动子协程...")
    ctx, cancelFunc := context.WithCancel(context.Background())
    for i := 0; i < total; i++ {
        go AddNum(&num, i, func() {
            if atomic.LoadInt32(&num) == int32(total) {
                cancelFunc()
            }
        })
    }
    <- ctx.Done()
    fmt.Println("所有子协程执行完毕.")
}

在这段代码中,我们先通过 context.WithCancel 方法返回一个新的 cxtcancelFunc,并且通过 context.Background() 方法传入父 Context,该 Context 没有值,永远不会取消,可以看作是所有 Context 的根节点,比如这里的 cxt 就是从父 Context 拷贝过来的可撤销的子 Context。然后我们在一个 for 循环中依次启动子协程,并且只有在 atomic.LoadInt32(&num) == int32(total)(所有子协程执行完毕)时调用 cancelFunc() 方法撤销对应子 Context 对象 cxt,这样,处于阻塞状态的 cxt.Done() 对应通道被关闭,我们可以接收到通道数据然后退出主程序。

注:cxt.Done() 方法返回一个通道,该通道会在调用 cancelFunc 函数时关闭,或者在父 context 撤销时也会被关闭。

WithDeadlineWithTimeout 分别比 WithCancel 多了一个 deadlinetimeout 时间参数,表示子 Context 存活的最长时间,如果超过了该时间,会自动撤销对应的子 Context。相应的,在调用 <-cxt.Done() 等待子协程执行结束时,如果没有调用 cancelFunc 函数的话它们会等待过期时间到达自动关闭,不过我们通常还是会主动调用 cancelFunc 函数以便更好的控制程序运行。

此外,context 包还提供了一个 TODO 方法,该方法用于在不知道使用哪种 Context 时使用,不过目前基本用不到,还有一个 withValue 方法用于返回包含上下文信息的 Context 对象,当我们需要通过 Context 传递上下文数据时可以使用该方法返回 Context:

代码语言:javascript
复制
ctx, cancelFunc := context.WithTimeout(context.Background(), 10 * time.Second)
valueCtx := context.WithValue(ctx, "key", "value")
defer cancelFunc()
for i := 0; i < total; i++ {
    go AddNum(&num, i, func() {
        if atomic.LoadInt32(&num) == int32(total) {
            fmt.Println("key:", valueCtx.Value("key"))
            cancelFunc()
        }
    })
}
<- ctx.Done()
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-09-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 极客书房 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档