一、引言
Go语言的并发编程是其最核心的特性之一。Go的并发模型通过goroutine
和channels
让并发编程变得简单而高效。然而,在并发环境下共享数据仍然是一个挑战,尤其是当涉及到共享状态的同步时。
在Go中,内置的map类型不是并发安全的,这意味着在没有额外同步机制的情况下,多个goroutine
同时读写一个map可能会导致竞态条件。运行时可能会发生下列panic(或不可预测的行为):
fatal error: concurrent map read and map write
传统的解决方案是使用互斥锁(sync.Mutex
)或读写锁(sync.RWMutex
)来同步对map的访问。无论是访问、存储都使用锁进行保护;当操作频繁且要求性能的情况下,锁的优化已无法满足业务需求,
考虑到互联网应用通常是读多写少的场景,Golang的标准库提供了一个特殊的并发安全的map实现——sync.Map
,专为读多写少的并发场景设计的,提供了优于加锁map的性能。
sync.Map
简介sync.Map
是在Go语言的sync
包中提供的一个并发安全的map类型。它通过内部的同步机制保证了在多个goroutine
并发访问时的安全性。
sync.Map
有以下几个关键的特点:
sync.Map
内部使用了锁和其他同步原语来保证并发访问的安全性。sync.Map
不需要像内置map那样使用make
函数初始化,可以直接声明后使用。sync.Map
提供了特定的方法如Load
, Store
, LoadOrStore
, Delete
, 和Range
,而不是使用内置map
的语法。sync.Map
sync.Map
的设计主要是为了满足以下两种常见的使用场景,其中内置map
加锁的方式效率不高:
Key
的集合基本不变,但是Value
会并发更新:在这种场景下,sync.Map
通过将热点数据分离出来,减少了锁的争用,提高了性能。Key-Value
对的添加和删除操作比较少,但是读操作非常频繁:sync.Map
在读取操作上做了优化,读操作通常无需加锁,这大大提高了并发读的性能。sync.Map
的设计目标是为了提供一个高效的并发安全的map,特别是在读多写少的场景下。它的设计考虑了以下目标:
sync.Map
适用于以下场景:sync.Map
的基本用法sync.Map
对象sync.Map
不需要初始化,可以直接声明后使用。它的声明方式如下:
var m1 sync.Map // 也可以使用 m := new(sync.Map) 来创建一个sync.Map实例
Load()
方法Load
方法用于从sync.Map
中检索一个键的值。如果该键存在于map中,Load
将返回键对应的值和true
;如果不存在,将返回nil
和false
。
value, ok := m.Load(key)
if ok {
// key存在,value是对应的值
} else {
// key不存在
}
Store()
方法Store
方法用于将键值对保存到sync.Map
中。如果键已经存在,它的值将被覆盖。
m.Store(key, value)
LoadOrStore()
方法LoadOrStore
方法将尝试从sync.Map
中加载一个键的值。如果键不存在,它将存储键值对到map中。该方法返回加载到的值(或存储的值)和一个布尔值,表示值是否被加载。
actual, loaded := m.LoadOrStore(key, value)
if loaded {
// 键已经存在,actual是已存在的值
} else {
// 键不存在,已存储提供的值,actual是提供的值
}
Delete()
方法Delete
方法用于从sync.Map
中删除一个键及其对应的值。
m.Delete(key)
Range()
方法Range
方法用于迭代sync.Map
中的所有键值对。它接受一个函数作为参数,该函数会被调用每个键值对。如果该函数返回false
,迭代将停止。
m.Range(func(key, value interface{}) bool {
// 使用key和value
// 如果要停止迭代,返回false
return true
})
请注意,Range
方法不保证每次迭代的顺序,且在迭代过程中如果有其他goroutine
修改map
,迭代器可能会反映这些修改。
这些基本方法提供了对sync.Map
进行并发安全操作的能力,无需担心在多goroutine
环境下的竞态条件。在使用sync.Map
时,应当注意它的特定用例和性能特性,以确保它适合你的应用场景。
sync.Map
设计原理与源码分析sync.Map
的数据结构分析sync.Map
采用了 装饰器 模式,对普通的map加以修饰,实现读写分离
和接近Lock-Free
的访问。
sync.Map
的结构体定义:
// sync/map.go
type Map struct {
mu Mutex // 互斥锁,用于保护dirty字段和misses字段。
read atomic.Value // readOnly, 一个atomic.Value类型的字段,存储了一个readOnly结构体,用于存储只读数据。
dirty map[interface{}]*entry // 一个map,存储了所有可写的键值对。
misses int // 一个计数器,记录了从read读取失败的次数,用于触发将数据从dirty迁移到read的决策。
}
type readOnly struct {
m map[interface{}]*entry // 实际存储键值对的map。
amended bool // 标记位,如果dirty中有read中没有的键,那么为true
}
sync.Map
使用两个原生的map
(本质上是map[interface{}]*entry
)来作为数据的存储空间分别是:
read
: 只读字典, 使用atomic.Value
来承载,保证原子性和高性能, 但不保证数据的完整性(不保证拥有全部的Key),相当于某个时间的Key-value对
的快照。dirty
: 脏字典, 用互斥锁Map.mu
来保护,保证了并发安全。 如果 m.dirty!=nil
,则dirty
包含了所有的Key-Value
对。 当新增一个Key时,会先存放在dirty
中,然后等满足一定条件后再同步给read
。展开后数据如下图所示:
read
和dirty
的数据并非实时同步的,只有在满足一定触发条件(或达到某些临界值)才会进行数据的同步(或转换),因此两者数据在一些时间段内会有差异:
read
: 存储(部分)真实的Key-Value对数据, 和被软删除的数据dirty
:要么为nil, 要么存储全部Key-Value对entry
是对实际数据的封装
type entry struct {
p unsafe.Pointer // *interface{} 一个指向实际数据的指针。
}
var expunged = unsafe.Pointer(new(any))
entry
中的p
的值有三种情况:
e.p == nil
:entry
已经被标记删除,不过因为还未进行read=>dirty
的同步,因此dirty
中可能还存在该entrye.p == expunged
:entry
已经被标记删除,且已经完成read=>dirty
同步,因而不属于dirty
,仅仅属于read
,下一次dirty=>read
升级,会被彻底清理。 延迟删除的思想e.p
为正常值:m.read.m
中,如果 m.dirty!=nil
则也存在于 m.dirtysync.Map
读操作分析:含Load()
源码分析sync.Map的读操作包含下面基本步骤:
Map.read
原子查找Key
。如果找到可以直接返回;如果找不到,则下一步。Map.dirty
中是否有新增的Key
,通过Map.read.amend
判断,如果没有则返回nil;如果有则下一步。Map.dirty
中查找Key
。 返回查找结果。并进行Map.misses计数。
map.misses
达到阈值,触发Map.dirty⇒Map.read
的同步Load()源码分析:
// Load returns the value stored in the map for a key, or nil if no
// value is present.
// The ok result indicates whether value was found in the map.
func (m *Map) Load(key any) (value any, ok bool) {
// 尝试从 Map.read 中查找数据
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
// Map.read 中找不到 Key, 且dirty中有新增的key, 则尝试从 Map.dirty 中查找
if !ok && read.amended {
m.mu.Lock()
// 避免并发时,在上一次读 m.read.Load() 和 m.mu.Lock() 的时间区间内发生 dirty=>read 的升级
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
// 计数器加1, 达到阈值后,触发dirty=>read的升级
m.missLocked()
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
return e.load()
}
func (e *entry) load() (value any, ok bool) {
p := atomic.LoadPointer(&e.p)
// p == nil || p == expunged 都可以确定entry已经被删除,
if p == nil || p == expunged {
return nil, false
}
return *(*any)(p), true
}
sync.Map
写操作分析:含Store()
源码分析sync.Map的读操作包含下面基本步骤:
m.read.m
中存在要操作的key,可以直接尝试通过m.read.m
的entry
来修改value
。这里有一个例外是e.p == expunged
。因为e.p == expunged
表示m.read⇒m.dirty
的同步已经完成,该key在m.dirty
已经不存在,而我们需要保证m.dirty
持有全量的可写key
。
e.p == expunged
,需要先对e.p进行unexpungeLocked
,然后在m.dirty
中补齐key
,在修改容器e
中的value
key
在m.read
中不存在,但是在m.dirty
中存在,则直接操作m.dirty
就好。
key
在m.read
中和m.dirty
中都不存在,此时相当于要新增一个key。需要进行一次m.read⇒m.dirty
的同步,并设置设置m.read.amended
,之后再在m.dirty
中新增一个entry
Store()
源码:
// Store sets the value for a key.
func (m *Map) Store(key, value any) {
read, _ := m.read.Load().(readOnly)
// 如果 key 在 m.read.m 中存在,可以直接先尝试通过m.read.m的entry来修改value。
// 这里有一个例外是 p == expunged,因为此时这个key在dirty中已经不存在了。 而我们需要保证dirty中持有全量的key。 详见tryStore函数
// 因而在这种 p == expunged 需要进一步操作m.dirty
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}
m.mu.Lock() // 加锁
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
// 属于前面说的p == expunged的例外情况,需要先将p设置为nil,然后在m.dirty中补齐当前key
if e.unexpungeLocked() {
m.dirty[key] = e
}
e.storeLocked(&value) // 此时,m.read.m中的p为nil了,重新进行store。
} else if e, ok := m.dirty[key]; ok {
e.storeLocked(&value)
} else {
if !read.amended {
// We're adding the first new key to the dirty map.
// Make sure it is allocated and mark the read-only map as incomplete.
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
}
// tryStore stores a value if the entry has not been expunged.
//
// If the entry is expunged, tryStore returns false and leaves the entry
// unchanged.
func (e *entry) tryStore(i *any) bool {
for {
p := atomic.LoadPointer(&e.p)
if p == expunged {
return false
}
if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
return true
}
}
}
func (m *Map) dirtyLocked() {
if m.dirty != nil {
return
}
read, _ := m.read.Load().(readOnly)
m.dirty = make(map[any]*entry, len(read.m))
// 进行一次 m.read.m 到 m.dirty 同步
for k, e := range read.m {
if !e.tryExpungeLocked() {
m.dirty[k] = e
}
}
}
sync.Map
删除操作分析:含Delete()
源码分析sync.Map
删除操作分析包含以下基本步骤
m.read
中存在要删除的key,有两种情况:m.read
中不存在要删除的key,而 m.dirty
可能存在(m.dirty
有近期新增的key
):dirty⇒read
的同步, 重新进行一次 [m.read](http://m.read)
的读。m.read
中不存在要删除的key,而 m.dirty
可能存在的状态: 如果 m.dirty
中存在要删除的key,则进行删除动作Delete()
源码
// LoadAndDelete deletes the value for a key, returning the previous value if any.
// The loaded result reports whether the key was present.
func (m *Map) LoadAndDelete(key any) (value any, loaded bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
// m.read 中找不到key,且 m.dirty中存在新key
if !ok && read.amended {
m.mu.Lock()
// 排除并发干扰,上一次读之后发生了 dirty=>read 的数据同步,导致 m.read变化,因而需要重新判断
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
delete(m.dirty, key)
// 计数器 加1
m.missLocked()
}
m.mu.Unlock()
}
if ok {
return e.delete()
}
return nil, false
}
// Delete deletes the value for a key.
func (m *Map) Delete(key any) {
m.LoadAndDelete(key)
}
func (e *entry) delete() (value any, ok bool) {
for {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return nil, false
}
if atomic.CompareAndSwapPointer(&e.p, p, nil) {
return *(*any)(p), true
}
}
}
m.read
和m.dirty
的转换时机m.dirty⇒m.read
:m.misses
不断递增达到阈值后,会将m.dirty
的数据升级到m.read
。升级完毕之后,dirty
置nil
, miss
清零 , read.amended
置false
m.read⇒m.dirty
:m.dirty == nil
, 且有新的key
(m.read
中不存在key
)要添加entry
的生命周期和entry.p
的值一个entry的生命周期如下图所示
entry.p的三种值上文已经提到过
entry
中的p
的值有三种情况:
e.p == nil
:entry
已经被标记删除,不过因为还未进行read=>dirty
的同步,因此dirty
中可能还存在该entrye.p == expunged
:entry
已经被标记删除,且已经完成read=>dirty
同步,因而不属于dirty
,仅仅属于read
,下一次dirty=>read
升级,会被彻底清理。 延迟删除的思想e.p
为正常值:m.read.m
中,如果 m.dirty!=nil
则也存在于 m.dirty原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。