Golang 为并发编程提供了多种并发原语(Mutex、RWMutex、sync.Map),用于临界区的数据访问和保护;开发应用时,面对不同的场景如何选择合适的并发原语,使功能正常实现的同时提供更高的性能;在互联网应用中,由并发原语保护的临界区从本质上来说无非三种情况:读多写少、写多读少、读写一致。 上篇文章介绍了 sync.RWMutex并发原语,其适用于写多读少的场景;Mutex、RWMutex 其保护的都是某个临界状态区,无论是访问、存储都使用锁进行保护;当操作频繁且要求性能的情况下,锁的优化已无法满足业务需求,此时就需要考虑额外的方式来提升性能。对于读多写少的场景,最常用的方式是进行读写分离,将访问压力分散至其它组件。
Golang为了支持读多写少的场景,提供了sync.Map并发原语,由普通map、Mutex与原子变量组合而成,作为一个并发安全的map,部分情况下读、写数据通过原子操作避免加锁,从而提高临界区访问的性能,同时在高并发的情况下仍能保证数据的准确性,支持Load、Store、 Delete、 Range等操作,可以实现对sync.Map的遍历以及根据key获取value;sync.Map可以有效地替代锁的使用,提高程序的执行效率。
讨论设计原理前,先观察下sync.Map的结构定义,主要三种元素组成:map、mutex、atomic
type Map struct {
mu Mutex
read atomic.Value // readOnly
dirty map[interface{}]*entry
misses int
}
// readOnly is an immutable struct stored atomically in the Map.read field.
type readOnly struct {
m map[interface{}]*entry
amended bool
}
sync.Map 采用了装饰器 模式,在普通的map上组合其它结构进行修饰,在利用map快速定位某个元素的同时增加了读写分离的功能,当元素在只读组件read中时,使用原子操作避免加锁快速访问其数据,在sync.Map中新增元素或访问不存在的元素时操作dirty组件时使用锁操作进行保护,当只读组件read的命中率降低至一定阈值时触发状态转换,将dirty状态提升为最新的read状态,以降低访问sync.Map时加锁的次数。
下面将介绍sync.Map中每个状态字段的功能及其含义
sync.Map提供了Load、Store、Delete、Range基础操作,以及LoadAndDelete、LoadOrStore复合操作,下面将介绍sync.Map如何实现这些功能、以及在读多写少情况下提供高性能的原因。
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
// 1. 先查询只读的原子状态
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
// 2. 不存在,且amended为true,进入慢路径
if !ok && read.amended {
// 2.1 获取互斥锁
m.mu.Lock()
// 2.2 再次查询只读的原子状态,存在获取互斥锁期间read状态被更新的情况
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
// 2.3 仍然不存在,查询dirty
if !ok && read.amended {
e, ok = m.dirty[key]
// 2.4 将统计数据累加,达到条件时,将dirty提升为read
m.missLocked()
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
// 返回存储的数据
return e.load()
}
missLocked 有两个功能:
func (m *Map) missLocked() {
m.misses++
if m.misses < len(m.dirty) {
return
}
m.read.Store(readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0
}
entry.load: sync.Map的 read、dirty中存储数据时,实际用户的数据是存储在entry.p中,存储了用户对象的地址,当从read或dirty查询到对象时,需要使用load 方法加载出用户对象。
如果entry.p为nil或expunged,则表示用户对象已被删除。
type entry struct {
p unsafe.Pointer // *interface{}
}
func (e *entry) load() (value interface{}, ok bool) {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return nil, false
}
return *(*interface{})(p), true
}
entry.p 存储值逻辑上有3种状态,value(存储了用户对象)、nil、expunged.
将一个新对象记录到sync.Map或更新已有的对象,对于不同的对象,使用不同的方式提供执行效率。
Note: 在存储对象时,虽然某些情况下存在加锁行为,但并未累计加锁数量 misses.
func (m *Map) Store(key, value interface{}) {
// 1. 更新已存在的非擦除态对象
// 使用CAS原子操作,避免加锁
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}
// 2. 加锁
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
// 2.1.1 更新已存在的擦除态对象,先更新它的状态为nil,
// 并将它添加到dirty中
if e.unexpungeLocked() {
// The entry was previously expunged, which implies that there is a
// non-nil dirty map and this entry is not in it.
m.dirty[key] = e
}
// 2.1.2 将nil更新为为new_value
e.storeLocked(&value)
} else if e, ok := m.dirty[key]; ok {
// 2.2 更新存在与dirty中对象,将它的值更新为new_value
e.storeLocked(&value)
} else {
// 2.3.1 添加一个新对象,先更新read.amended,表示dirty中存在
// read不含有得key
if !read.amended {
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
// 2.3.2 将key对应的值存入dirty
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
}
// 将entry中元素的状态由expunged修改为nil
func (e *entry) unexpungeLocked() (wasExpunged bool) {
return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
}
func (m *Map) dirtyLocked() {
if m.dirty != nil {
return
}
// 当dirty提升为read后,dirty会被置为nil
// 因此当未来向dirty中添加元素时,需要先构建新dirty的状态
// 因为dirty要包含所有有效的key元素,所以新构建dirty状态时,
// 将read中存储有效值的数据也存储至dirty中
read, _ := m.read.Load().(readOnly)
m.dirty = make(map[interface{}]*entry, len(read.m))
for k, e := range read.m {
// 将包含有效值的元素存入dirty
if !e.tryExpungeLocked() {
m.dirty[k] = e
}
}
}
// 将entry中状态为nil的元素置为expunged,表示它已被逻辑删除
// 当返回值为true时,表示元素已被删除,否则,元素为一个有效值
func (e *entry) tryExpungeLocked() (isExpunged bool) {
p := atomic.LoadPointer(&e.p)
for p == nil {
if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
return true
}
p = atomic.LoadPointer(&e.p)
}
return p == expunged
}
删除Key对应的元素,对于不同的对象,有不同的删除方式
// Delete deletes the value for a key.
func (m *Map) Delete(key interface{}) {
m.LoadAndDelete(key)
}
func (m *Map) LoadAndDelete(key interface{}) (value interface{}, loaded bool) {
//1. 使用原子操作查询read状态
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
//2. 未查到,加锁查询dirty
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
delete(m.dirty, key)
//3. 累计加锁的数量,触发状态提升
m.missLocked()
}
m.mu.Unlock()
}
// 4. 查询到key,将key的值置为nil,进行逻辑上删除
if ok {
return e.delete()
}
return nil, false
}
// 将元素从逻辑上进行删除,将状态置为nil
func (e *entry) delete() (value interface{}, ok bool) {
for {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return nil, false
}
if atomic.CompareAndSwapPointer(&e.p, p, nil) {
return *(*interface{})(p), true
}
}
}
提供了O(N)的方式遍历sync.Map,用户传入遍历key-value的动作函数,如果函数返回false,则遍历终止;即使在遍历过程中发生key的并发读写操作,每个key也仅会被最多遍历一次。 因为 dirty可能包含read中不存在的key的状态,当遍历操作发生时,如果dirty与read存储的有效key的状态不一致,将dirty提升为read。
func (m *Map) Range(f func(key, value interface{}) bool) {
read, _ := m.read.Load().(readOnly)
// 1. dirty中包含read不存在的状态,
// 将dirty提升为read.
if read.amended {
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if read.amended {
read = readOnly{m: m.dirty}
m.read.Store(read)
m.dirty = nil
m.misses = 0
}
m.mu.Unlock()
}
// 2. 遍历read,当动作函数返回false时,退出遍历。
// 由于遍历过程中可能存在map的并发修改操作,因此当遍历该entry时,
// 实际存储的值被删除时,则不再遍历.
for k, e := range read.m {
v, ok := e.load()
if !ok {
continue
}
if !f(k, v) {
break
}
}
}
func (e *entry) load() (value interface{}, ok bool) {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return nil, false
}
return *(*interface{})(p), true
}
与Delete操作一样,仅增加了返回值传递被删除的对象.
如果key存在,则返回已存在的值,否则将传入的参数存入Map并返回。loaded 为true ,表示返回的值为以前存在的旧值,值为false,返回的值为传入的参数。
执行逻辑与Store类似,也是4种场景
func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) {
// Avoid locking if it's a clean hit.
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
actual, loaded, ok := e.tryLoadOrStore(value)
if ok {
return actual, loaded
}
}
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
m.dirty[key] = e
}
actual, loaded, _ = e.tryLoadOrStore(value)
} else if e, ok := m.dirty[key]; ok {
actual, loaded, _ = e.tryLoadOrStore(value)
m.missLocked()
} else {
if !read.amended {
// We're adding the first new key to the dirty map.
// Make sure it is allocated and mark the read-only map as incomplete.
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
actual, loaded = value, false
}
m.mu.Unlock()
return actual, loaded
}
tryLoadOrStore执行逻辑,entry中存储的值存在三种状态:有效值、nil、expunged
func (e *entry) tryLoadOrStore(i interface{}) (actual interface{}, loaded, ok bool) {
p := atomic.LoadPointer(&e.p)
if p == expunged {
return nil, false, false
}
if p != nil {
return *(*interface{})(p), true, true
}
ic := i
for {
if atomic.CompareAndSwapPointer(&e.p, nil, unsafe.Pointer(&ic)) {
return i, false, true
}
p = atomic.LoadPointer(&e.p)
if p == expunged {
return nil, false, false
}
if p != nil {
return *(*interface{})(p), true, true
}
}
}
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。