在并发环境下,当要处理的数据存在变化并且是共享的时候,我们一般使用互斥锁(mutex)来保护数据对象读写操作。一个常见的错误是在使用切片和map时没有准确地使用互斥锁操作。下面通过一个具体的例子来说明并分析它存在的问题。
下面的程序定义了一个Cache结构体,用于保存要处理的客户的资金信息。结构体中有一个map字段,它的key为账户id,value为账户余额,字段mu为读写锁,保护并发访问map的安全性。
type Cache struct {
mu sync.RWMutex
balances map[string]float64
}
「NOTE:sync.RWMutex为读写锁,可以保证在有多个读写方对Cache操作时安全性,因为map数据结构不是并发安全的。读写锁支持读读并发,读写互斥。」
Cache结构体有一个AddBalance方法,用来设置某个账户的资金。因为对map有写操作,所以修改map的操作放在临界区中(通过mu加写锁保护)。
func (c *Cache) AddBalance(id string, balance float64) {
c.mu.Lock()
c.balances[id] = balance
c.mu.Unlock()
}
此外,还有一个求资金平均值的方法AverageBalance。考虑为了保持最小临界区,一个可能的实现想法如下。首先创建一个保存资金c.balances的副本,该操作放在临界区内,创建完副本之后在副本上求平均值的操作放在临界区之外。这种实现有什么问题吗?
func (c *Cache) AverageBalance() float64 {
c.mu.RLock()
balances := c.balances
c.mu.RUnlock()
sum := 0.
for _, balance := range balances {
sum += balance
}
return sum / float64(len(balances))
}
如果对上面的代码进行测试,创建两个goroutine,一个调用AddBalance方法,另一个调用AverageBalance方法,然后在运行时添加-race参数,输出结果如下,会报存在数据竞争问题。为什么是这样呢?
go run -race example1.go
==================
WARNING: DATA RACE
Write at 0x00c000124180 by goroutine 7:
...
Previous read at 0x00c000124180 by goroutine 8:
runtime.mapiterinit()
map内部是通过runtime.hmap实现的,该结构包含一些元数据信息(例如计数器)和引用数据桶的指针。因此,balances := c.balances
操作不会对实际数据进行复制,是一个浅拷贝操作。同理切片拷贝也是这样的,也不会对数据进行复制。
s1 := []int{1, 2, 3}
s2 := s1
s2[0] = 42
fmt.Println(s1)
上面的这段程序运行打印的结果为[42 2 3]。尽管修改的是s2,但是s1的值也变了。原因是s2:=s1操作创建的切片s2的length和capacity与s1一模一样,并且共享s1底层的数组,所以对s2的修改会影响到s1.
回到开始的例子,执行balances := c.balances
操作,创建新的map(balances)与原map(c.balances)共享的是相同的数据桶。两个goroutine都是在这个数据集上进行操作,一个修改它,一个访问它,所以存在数据竞争。
那怎么修复数据竞争问题呢?主要有两种处理方法。如果迭代中的操作是一个轻量级操作(像程序中的加法操作就是轻量级操作),应该将迭代处理也放在锁保护的范围内,代码如下。将函数内的所有操作都放在锁内,防止数据竞争。
func (c *Cache) AverageBalance() float64 {
c.mu.RLock()
defer c.mu.RUnlock()
sum := 0.
for _, balance := range c.balances {
sum += balance
}
return sum / float64(len(c.balances))
}
如果迭代中的操作不是轻量级操作,处理方法是对原数据进行深拷贝,这个拷贝的过程会加锁。然后在拷贝的数据上进行处理,这个过程不用加锁,代码如下。
func (c *Cache) AverageBalance() float64 {
c.mu.RLock()
m := make(map[string]float64, len(c.balances))
for k, v := range c.balances {
m[k] = v
}
c.mu.RUnlock()
sum := 0.
for _, balance := range m {
sum += balance
}
return sum / float64(len(m))
}
现在对上述处理过程进行一个梳理,上面的代码对map进行了两次迭代,一次用于复制原map中的数据,一次用于真正的执行操作。只是对map的复制操作处于临界区,因此,该处理方法非常适合执行操作不快的场景。例如,如果一个操作需要调用一个外部的数据库,这种处理方法相比第一种更有效。在实际项目中,是选择第一种还是第二种没有准确的量化方法,因为它取决于很多因素,例如元素的数量、结构体的大小等等。
总结,在处理互斥锁的边界问题时,我们必须小心。本文通过具体的例子说明对map/切片进行赋值操作时,加锁操作范围不当起不到任何保护作用,因为被赋值的新变量和原变量会共享底层的数据集。解决的方法有两种,一种是对整个处理过程加锁,一种是进行深拷贝然后在副本上进行操作。总之,在处理临界区问题时需要小心谨慎,进行合理的设计,准确定义边界。