前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >go 并发安全map之concurrent-map

go 并发安全map之concurrent-map

原创
作者头像
tabbyzhou
发布2021-12-05 23:03:27
5.1K2
发布2021-12-05 23:03:27
举报
文章被收录于专栏:设计与思考设计与思考

源码解读

concurrent-map的readme中说,这是一个高性能的并发安全的map,一起看源码来解读下他是如何实现高性能的。

https://github.com/orcaman/concurrent-map/blob/master/concurrent_map.go

源码相当精简,只有区区343行。先看如何设计的数据结构

代码语言:txt
复制
var SHARD_COUNT = 32

// A "thread" safe map of type string:Anything.
// To avoid lock bottlenecks this map is dived to several (SHARD_COUNT) map shards.
type ConcurrentMap []*ConcurrentMapShared

// A "thread" safe string to anything map.
type ConcurrentMapShared struct {
	items        map[string]interface{}
	sync.RWMutex // Read Write mutex, guards access to internal map.
}
// Creates a new concurrent map.
func New() ConcurrentMap {
    m := make(ConcurrentMap, SHARD_COUNT)
    for i := 0; i < SHARD_COUNT; i++ {
        m[i] = &ConcurrentMapShared{items: make(map[string]interface{})}
    }
    return m
}

ConcurrentMap是真正的对外的结构,其内部是一个预设为32个元素的ConcurrentMapShared,而ConcurrentMapShared其内部封装这一个匿名读写锁sync.RWMutex和一个原生的map。

看到这里大致可猜出他是如何实现并发时的高性能的了。对于一个非并发安全的map,要实现并发安全,肯定要加一个全局锁。而这里使用32个map结构,32个锁,通过降低锁的粒度,来减小锁等待。

基本接口

ConcurrentMap提供了map应有的基本接口

代码语言:txt
复制
// 获取分区key
func (m ConcurrentMap) GetShard(key string) *ConcurrentMapShared
// 合并map
func (m ConcurrentMap) MSet(data map[string]interface{})
// 添加一个元素
func (m ConcurrentMap) Set(key string, value interface{})
// 获取一个元素
func (m ConcurrentMap) Get(key string) (interface{}, bool)
// 计算有多少元素
func (m ConcurrentMap) Count() int
//判断元素是否存
func (m ConcurrentMap) Has(key string) bool
// 移除指定元素
func (m ConcurrentMap) Remove(key string)
// 获取并移除指定的元素
func (m ConcurrentMap) Pop(key string) (v interface{}, exists bool)
// 判断是否是map
func (m ConcurrentMap) IsEmpty() bool
// 清空map
func (m ConcurrentMap) Clear()
set接口

先来看set接口:

代码语言:txt
复制
func (m ConcurrentMap) Set(key string, value interface{}) {
	// Get map shard.
	shard := m.GetShard(key)
	shard.Lock()
	shard.items[key] = value
	shard.Unlock()
}

根据请求key找到这个key所在的分区,对分区加锁,然后再set,最后解锁。通过对key做哈希,将应该设置的全局锁分散到32个细粒度的分区锁,降低获取锁时的等待概率,从而提高并发量。

代码语言:txt
复制
// GetShard returns shard under given key
func (m ConcurrentMap) GetShard(key string) *ConcurrentMapShared {
	return m[uint(fnv32(key))%uint(SHARD_COUNT)]
}

而分区的选择也相对简单,对请求key做一次hash,对其取模找到对应的分区。

get接口
代码语言:txt
复制
// Get retrieves an element from map under given key.
func (m ConcurrentMap) Get(key string) (interface{}, bool) {
	// Get shard
	shard := m.GetShard(key)
	shard.RLock()
	// Get item from shard.
	val, ok := shard.items[key]
	shard.RUnlock()
	return val, ok
}

get接口和set接口基本上是一样的,只是加锁的粒度不一样,set接口是加的写锁,保证写时串行。而get接口是加的读锁,在没有写操作时,任意协程均可以获取到读锁,进行读取数据。

count接口
代码语言:txt
复制
func (m ConcurrentMap) Count() int {
	count := 0
	for i := 0; i < SHARD_COUNT; i++ {
		shard := m[i]
		shard.RLock()
		count += len(shard.items)
		shard.RUnlock()
	}
	return count
}

count接口的实现是单独对遍历分区加锁后,累加这个分区内的元素个数。个人认为在高并发情况下,这个count的值是不准确的。这里的不准确是只调用count时map真实元素数量和调用结束后map的真实元素数量可能不同。因为锁是加在分区上的,当在遍历2号分区时,1号分区写入了一个新元素,由于对1号分区写入数据并不影响2号分区,因此,此时1号分区的真实数量和已经累加过的1号分区的数量就有差别了。当然在高并发场景下也不必纠结于count的准确性。

其他的一些基本接口和上面的都比较类似,就是读取类接口,获取分区后加读锁,保证读不互斥;修改类操作,获取分区后,加写锁,保证一致性。

高级接口

还提供了一些高级接口,比如回调函数接口,

1. 插入-更新回调
代码语言:txt
复制
type UpsertCb func(exist bool, valueInMap interface{}, newValue interface{}) interface{}

// Insert or Update - updates existing element or inserts a new one using UpsertCb
func (m ConcurrentMap) Upsert(key string, value interface{}, cb UpsertCb) (res interface{}) {
    shard := m.GetShard(key)
    shard.Lock()
    v, ok := shard.items[key]
    res = cb(ok, v, value)
    shard.items[key] = res
    shard.Unlock()
    return res
}
2. 移除回调
代码语言:txt
复制
// RemoveCb is a callback executed in a map.RemoveCb() call, while Lock is held
// If returns true, the element will be removed from the map
type RemoveCb func(key string, v interface{}, exists bool) bool

// RemoveCb locks the shard containing the key, retrieves its current value and calls the callback with those params
// If callback returns true and element exists, it will remove it from the map
// Returns the value returned by the callback (even if element was not present in the map)
func (m ConcurrentMap) RemoveCb(key string, cb RemoveCb) bool {
	// Try to get shard.
	shard := m.GetShard(key)
	shard.Lock()
	v, ok := shard.items[key]
	remove := cb(key, v, ok)
	if remove && ok {
		delete(shard.items, key)
	}
	shard.Unlock()
	return remove
}
3. 迭代回调
代码语言:txt
复制
// Iterator callback,called for every key,value found in
// maps. RLock is held for all calls for a given shard
// therefore callback sess consistent view of a shard,
// but not across the shards
type IterCb func(key string, v interface{})

// Callback based iterator, cheapest way to read
// all elements in a map.
func (m ConcurrentMap) IterCb(fn IterCb) {
	for idx := range m {
		shard := (m)[idx]
		shard.RLock()
		for key, value := range shard.items {
			fn(key, value)
		}
		shard.RUnlock()
	}
}

基准测试

代码中和go提供的sync.map做了比较。

代码语言:txt
复制
func BenchmarkSingleInsertPresent(b *testing.B) {
    m := New()
    m.Set("key", "value")
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        m.Set("key", "value")
    }
}

func BenchmarkSingleInsertPresentSyncMap(b *testing.B) {
    var m sync.Map
    m.Store("key", "value")
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        m.Store("key", "value")
    }
}
代码语言:txt
复制
go test -bench=InsertPresent -benchtime 5s
goos: linux
goarch: amd64
pkg: concurrent-map
BenchmarkSingleInsertPresent-8                  172822759               34.9 ns/op
BenchmarkSingleInsertPresentSyncMap-8           65351324                92.9 ns/op

从结果看,set固定元素。concurrent-mapsync.map快约3倍。

再看看插入不同的key时的表现。

代码语言:txt
复制
func benchmarkMultiInsertDifferent(b *testing.B) {
	m := New()
	finished := make(chan struct{}, b.N)
	_, set := GetSet(m, finished)
	b.ResetTimer()
	for i := 0; i < b.N; i++ {
		go set(strconv.Itoa(i), "value")
	}
	for i := 0; i < b.N; i++ {
		<-finished
	}
}

func BenchmarkMultiInsertDifferentSyncMap(b *testing.B) {
	var m sync.Map
	finished := make(chan struct{}, b.N)
	_, set := GetSetSyncMap(&m, finished)

	b.ResetTimer()
	for i := 0; i < b.N; i++ {
		go set(strconv.Itoa(i), "value")
	}
	for i := 0; i < b.N; i++ {
		<-finished
	}
}

func BenchmarkMultiInsertDifferent_1_Shard(b *testing.B) {
	runWithShards(benchmarkMultiInsertDifferent, b, 1)
}
func BenchmarkMultiInsertDifferent_16_Shard(b *testing.B) {
	runWithShards(benchmarkMultiInsertDifferent, b, 16)
}
func BenchmarkMultiInsertDifferent_32_Shard(b *testing.B) {
	runWithShards(benchmarkMultiInsertDifferent, b, 32)
}
func BenchmarkMultiInsertDifferent_256_Shard(b *testing.B) {
	runWithShards(benchmarkMultiGetSetDifferent, b, 256)
}

func runWithShards(bench func(b *testing.B), b *testing.B, shardsCount int) {
	oldShardsCount := SHARD_COUNT
	SHARD_COUNT = shardsCount
	bench(b)
	SHARD_COUNT = oldShardsCount
}
代码语言:txt
复制
go test -bench=InsertDifferent -benchtime 5s
goos: linux
goarch: amd64
pkg: concurrent-map
BenchmarkMultiInsertDifferentSyncMap-8            560900             11996 ns/op
BenchmarkMultiInsertDifferent_1_Shard-8          1000000              7499 ns/op
BenchmarkMultiInsertDifferent_16_Shard-8        10377100               662 ns/op
BenchmarkMultiInsertDifferent_32_Shard-8        10511775               603 ns/op
BenchmarkMultiInsertDifferent_64_Shard-8        11624546               590 ns/op
BenchmarkMultiInsertDifferent_128_Shard-8       11773946               578 ns/op
BenchmarkMultiInsertDifferent_256_Shard-8        7914397               912 ns/op

sync.map在插入不同key时的表现似乎是最差的。在concurrent-map的分区数设置为1时,可以认为是对单个map加了全局读写锁,居然也比sync.map要快。但sync.map和分区为1的concurrent-map在多次测试时差异比较大,有时sync.map快,有时分区为1的concurrent-map快。单都不会比分区为16以上的concurrent-map快。而且并不是分区数越大越快,在分区数为256时,执行速度已经开始变慢了。

参考

  1. concurrent-map git仓库

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 源码解读
    • 基本接口
      • set接口
      • get接口
      • count接口
    • 高级接口
      • 1. 插入-更新回调
      • 2. 移除回调
      • 3. 迭代回调
    • 基准测试
    • 参考
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档