前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Go语言中常见100问题-#72 Forgetting about sync.Cond

Go语言中常见100问题-#72 Forgetting about sync.Cond

作者头像
数据小冰
发布2022-08-15 15:27:21
1.2K0
发布2022-08-15 15:27:21
举报
文章被收录于专栏:数据小冰
不要忽视sync.Cond

Go标准库中的sync包提供了常用的同步原语功能,该包中有一个结构我们可能很少使用也容易忽视,它就是sync.Cond,但是它有一个特色功能,能够实现通道(channel)不能实现的功能,所以我们不要忽视它的存在。本文将通过一个具体的例子来了解sync.Cond用在什么场合下以及如何使用它。

本文的例子模拟描述的是一个捐赠流程,当收到特定的捐款金额时,应用程序会产生告警通知。有一个goroutine负责增加余额,我们称它为更新操作goroutine.相反,其他goroutine将接收更新并在达到特定余额时打印一条消息,我们称这些goroutine为监听goroutine. 例如,一个goroutine正在等待捐赠金额为10美元的目标,另一个goroutine正在等待捐赠金额为15美元的目标。

第一个可能想到的实现方法是使用互斥锁。更新操作goroutine每秒增加一次余额,监听goroutine会循环读取余额信息,直到余额满足期望的目标值。程序实现如下:

代码语言:javascript
复制
type Donation struct {
        mu             sync.RWMutex
        balance int
}
donation := &Donation{}

// Listener goroutines
f := func(goal int) {
        donation.mu.RLock()
        for donation.balance < goal {
                donation.mu.RUnlock()
                donation.mu.RLock()
        }
        fmt.Printf("$%d goal reached\n", donation.balance)
        donation.mu.RUnlock()
}
go f(10)
go f(15)

// Updater goroutine
go func() {
        for {
                time.Sleep(time.Second)
                donation.mu.Lock()
                donation.balance++
                donation.mu.Unlock()
        }
}()

上述程序使用读写锁保护共享变量donation.balance,与我们预期的效果一致,运行结果如下:

代码语言:javascript
复制
go run example1.go                                                                                             
$10 goal reached
$15 goal reached

这段程序的主要问题在监听goroutine中的循环部分,每个监听goroutine都会一直循环,直到达到它们预期的目标值。在未达到目标值前会不停地检查,这将导致大量CPU空转浪费。下面是上述程序运行期间在我的机器上CPU使用情况,可以看到CPU使用率高达163.5%. 这种情况是不能接受的,需要想更好的解决方法。

现在再来审视我们的目标,看看我们需要什么。每当余额更新时,需要一个从更新goroutine发送信号通知的方法,发给监听goroutine,告诉它们余额有更新,可以检查下余额是否满足自己的目标值。在Go语言中,这种通知信号可用通道(channel)实现,下面是用通道实现的版本。

代码语言:javascript
复制
type Donation struct {
        balance int
        ch      chan int
}

donation := &Donation{ch: make(chan int)}

// Listener goroutines
f := func(goal int) {
        for balance := range donation.ch {
                if balance >= goal {
                        fmt.Printf("$%d goal reached\n", balance)
                        return
                }
        }
}
go f(10)
go f(15)

// Updater goroutine
for {
        time.Sleep(time.Second)
        donation.balance++
        donation.ch <- donation.balance
}

每个监听goroutine都从共享通道donation.ch中接收消息,一旦余额有更新,更新操作goroutine会向donation.ch中发送余额信息。但是,如果我们运行上述代码,可能得到如下结果。完整代码见(https://github.com/ThomasMing0915/100-go-mistakes-code/tree/main/72).

代码语言:javascript
复制
go run example2.go                                                                                             
$11 goal reached
$15 goal reached

当余额为10美元的时候,不是满足了第一个监听goroutine的目标值吗,为啥没有10 goal reached输出,反而输出的是11 goal reached呢?,为什么会这样?原因是发送到通道中的消息仅能被一个goroutine接收,在本文示例中,如果第一个goroutine在第二goroutine之前从通道接收,则两个通道分别收到的余额值如下图。

多个goroutine从共享通道上接收消息默认是按轮询模式分发的,即上图中两个监听goroutine从通道获取消息的顺序是:第一个goroutine -> 第二个goroutine -> 第一个goroutine -> 第二个goroutine ->...。 如果某个goroutine还没有准备好接收消息(即在通道上不处于等待状态),这种情况,会将消息分发到下一个可用的goroutine上。

无论在什么情况下,发往channel的消息只能被消费一次,也就是上面的每个消息都只有一个goroutine会收到。所以,上面的程序在运行时,第一个goroutine没有收到$10这条消息,是被第二个goroutine接收了。只有关闭channel是广播事件,每个接收的goroutine都会收到关闭通知。但是,这里不能关闭通道,因为如果通道被关闭,更新操作goroutine就不能再发送真正的消息了。

此外,上述程序使用通道还有另一个问题。监听goroutine只要收到目标余额就return返回了,因此更新操作goroutine必须知道在什么时候所有的监听goroutine会全部返回,也就是通道的发送方goroutine必须知道在什么时候所有的通道接收方goroutine全部退出了。否则向没有接收方goroutine的通道中发送消息最终(通道变满)会阻塞发生方goroutine,这会导致goroutine占用的内存泄露。一种可能的解决方法是在Donation结构体中添加一个sync.WaitGroup字段,通过该字段监控所有的接收方goroutine是否已全部退出,但这种解决方法会使程序变得更复杂。

理想的处理方法是,我们希望找到一种方法在余额更新需要发送到多个goroutine时进行广播通知。非常幸运的是Go标准库中提供了sync.Cond(条件原语)可以解决这个问题。下面首先讲述sync.Cond基本知识,然后看看如何使用这个条件原语解决本文的问题。

官方文档(pkg.go.dev/sync)对sync.Cond的定义如下

❝Cond实现了一个条件变量或者说是一个集合点,在这个点所有的goroutine等待或告知事件发生。 ❞

条件变量是等待某个条件线程(本文是协程)的容器。在本文示例中,条件是余额被更新。每次当余额更新时,更新操作goroutine会发生广播通知,监听goroutine在收到通知后检查余额是否满足目标。此外,sync.Cond为了防止数据竞争,实现了sync.Locker接口(底层实现依赖于sync.Mutex或sync.RWMutex)。sync.Cond实现源码解析见条件变量Cond实现, 下面是采用sync.Cond实现的一个版本。

代码语言:javascript
复制
type Donation struct {
        cond    *sync.Cond
        balance int
}

donation := &Donation{
        cond: sync.NewCond(&sync.Mutex{}),
}

// Listener goroutines
f := func(goal int) {
        donation.cond.L.Lock()
        for donation.balance < goal {
                donation.cond.Wait()
        }
        fmt.Printf("%d$ goal reached\n", donation.balance)
        donation.cond.L.Unlock()
}
go f(10)
go f(15)

// Updater goroutine
for {
        time.Sleep(time.Second)
        donation.cond.L.Lock()
        donation.balance++
        donation.cond.L.Unlock()
        donation.cond.Broadcast()
}

在上述程序中,我们使用sync.NewCond创建一个条件变量,并在创建时传入一个 *sync.Mutex对象进行初始化。那后续更新操作goroutine和监听goroutine是如何协作运行的呢?监听goroutine会进行循环,直到余额达到目标值。在循环内部,调用条件变量的Wait方法,该方法会阻塞直到满足条件。「NOTE: 注意这里所说的满足条件中的条件不是指匹配了目标金额值,而是指是否有余额更新, 该单个条件变量被两个监听goroutine共享。」

调用Wait操作必须在临界区内进行(通过donation.cond.L.Lock()加锁),这看起来非常奇怪, 这里加锁后不是会阻止其他goroutine也等待相同的条件吗?不会的,Wait的内部实现如下:

  • 释放锁(本文是互斥锁)
  • 挂起当前的goroutine并等待通知
  • 执行加锁当接收到通知后

因此,在监听goroutine的内部形成了两个临界区。第一个临界区是 for donation.balance < goal, 读取余额判断它是否小于目标值。第二个临界区循环体后面的打印输出fmt.Printf("%d$ goal reached\n", donation.balance). 这样对共享变量donation.balance的所有访问都受到保护。

分析完了监听goroutine处理流程,现在来看看更新操作goroutine过程是什么样的?余额更新操作在临界区内执行,以防止数据竞争问题。然后调用Broadcast方法,该方法会唤醒所有等待余额更新的goroutine(监听goroutine). 运行上面的程序,输出结果与我们预期一致。

代码语言:javascript
复制
go run example3.go                                                                                               
10$ goal reached
15$ goal reached

在我们新版的实现中,条件变量基于的是正在更新的余额。所以,每次在余额被更新之后,监听goroutine都会被唤醒,然后检查余额是否满足各自的目标值。通过这种条件变量方法可以防止监听goroutine重复循环检查,导致CPU空转浪费问题。

在使用sync.Cond时,我们还有一点需要注意。当我们发送一条通知消息的时候,例如一条空消息chan struct,即使没有准备就绪的接收者(goroutine),通知消息也会被缓存,从而保证所有的接收者goroutine会收到通知。使用sync.Cond的Broadcast方法会唤醒所有当前在等待条件的goroutine,如果某个goroutine没有在等待条件,它会错过通知,这一点我们必须在使用时留意。

「NOTE:Broadcast操作不会阻塞,即使没有goroutine在等待从该通道中接收消息。同理,Signal()操作也类似的,也不会阻塞。在只唤醒一个goroutine的时候,我们使用Signal方法。在实现效果层面上,它与以非阻塞方式向channel中发送消息相同,效果代码如下。」

代码语言:javascript
复制
ch := make(chan struct{})
select {
case ch <- struct{}{}:
default:
}

总结,在Go语言中信号通知可以通过channel来实现,但是想要实现广播消息,只能通过关闭channel方式来实现,这一点我们需要知道。并且这种关闭channel方式有局限性,只能广播一次。因此,如果我们需要反复向多个goroutine发送通知,可以采用sync.Cond来实现。该原语基于条件变量,此条件变量会设置一组线程或协程等待特定的条件。使用sync.Cond,可以广播信号,该信号可以唤醒所有等待它的goroutine.

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-06-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 数据小冰 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 不要忽视sync.Cond
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档