首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Go 语言中 sync 包的近距离观察

让我们来看看负责提供同步原语的 Go 包:sync。

sync.Mutex

sync.Mutex可能是sync包中被广泛使用的原语。它允许对共享资源进行互斥操作(即不允许同时访问):

mutex := &sync.Mutex{}

mutex.Lock()

// Update shared variable (e.g. slice, pointer on a structure, etc.)

mutex.Unlock()

必须指出的是sync.Mutex无法被复制(就像sync包中的所有其他原语一样)。如果一个结构体有一个sync字段,必须通过指针进行传递

sync.RWMutex

sync.RWMutex是一个读写锁。它提供了与我们刚刚看到的Lock()和Unlock()相同的方法(因为这两个结构都实现了sync.Locker接口)。然而,它还允许使用RLock()和RUnlock()方法进行并发读取:

mutex := &sync.RWMutex{}

mutex.Lock()

// Update shared variable

mutex.Unlock()

mutex.RLock()

// Read shared variable

mutex.RUnlock()

一个sync.RWMutex允许至少一个读取者正好一个写入者,而一个sync.Mutex则允许正好一个读取者或写入者。

让我们运行一个快速的基准测试来比较这些方法:

func BenchmarkMutexLock(b *testing.B) {

m := sync.Mutex{}

for i := 0; i 

m.Lock()

m.Unlock()

}

}

func BenchmarkRWMutexLock(b *testing.B) {

m := sync.RWMutex{}

for i := 0; i 

m.Lock()

m.Unlock()

}

}

func BenchmarkRWMutexRLock(b *testing.B) {

m := sync.RWMutex{}

for i := 0; i 

m.RLock()

m.RUnlock()

}

}

BenchmarkMutexLock-4       83497579         17.7 ns/op

BenchmarkRWMutexLock-4     35286374         44.3 ns/op

BenchmarkRWMutexRLock-4    89403342         15.3 ns/op

正如我们注意到的那样,读取锁定/解锁sync.RWMutex比锁定/解锁sync.Mutex更快。另一方面,调用Lock()/Unlock()在sync.RWMutex上是最慢的操作。

总的来说,当我们有频繁的读取和不经常的写入时,应该使用sync.RWMutex。

sync.WaitGroup

sync.WaitGroup也经常被使用。它是一个 goroutine 等待一组 goroutine 完成的惯用方式。

sync.WaitGroup拥有一个内部计数器。如果这个计数器等于 0,Wait()方法会立即返回。否则,它会被阻塞,直到计数器变为 0。

要增加计数器,我们可以使用Add(int)方法。要减少计数器,可以使用Done()(将计数器减 1)或者使用带有负值的相同的Add(int)方法。

在以下示例中,我们将启动八个 goroutine 并等待它们完成:

wg := &sync.WaitGroup{}

for i := 0; i 

wg.Add(1)

go func() {

// Do something

wg.Done()

}()

}

wg.Wait()

// Continue execution

每次我们创建一个 goroutine 时,都会使用wg.Add(1)来增加wg的内部计数器。我们也可以在 for 循环外部调用wg.Add(8)。

与此同时,每当一个 goroutine 完成时,它会使用wg.Done()来减少wg的内部计数器。

一旦执行了八个wg.Done()语句,主 goroutine 就会继续执行。

sync.Map

sync.Map是 Go 中的一个并发版本的map,我们可以:

• 使用Store(interface{}, interface{})添加元素

• 使用Load(interface) interface{}检索元素

• 使用Delete(interface{})删除元素

• 使用LoadOrStore(interface{}, interface{}) (interface, bool)检索或添加元素(如果之前不存在)。返回的 bool 值为 true 表示在操作前键存在于 map 中。

• 使用Range在元素上进行迭代

m := &sync.Map{}

// Put elements

m.Store(1, "one")

m.Store(2, "two")

// Get element 1

value, contains := m.Load(1)

if contains {

fmt.Printf("%s\n", value.(string))

}

// Returns the existing value if present, otherwise stores it

value, loaded := m.LoadOrStore(3, "three")

if !loaded {

fmt.Printf("%s\n", value.(string))

}

// Delete element 3

m.Delete(3)

// Iterate over all the elements

m.Range(func(key, value interface{}) bool {

fmt.Printf("%d: %s\n", key.(int), value.(string))

return true

})

Go 在线测试: https://play.golang.org/p/BO8IDVIDwsr

one

three

1: one

2: two

正如你所看到的,Range方法接受一个func(key, value interface{}) bool函数作为参数。如果我们返回 false,则迭代会停止。有趣的是,即使我们在恒定时间之后返回 false(更多信息),最坏情况下的时间复杂度仍然保持为 O(n)。

何时应该使用sync.Map而不是在经典的map上加sync.Mutex呢?

• 当我们有频繁读取和不经常写入时(与sync.RWMutex类似)

• 当多个 goroutine 为不相交的键集合读取、写入和覆盖条目。这具体意味着什么?例如,如果我们有一个分片实现,有 4 个 goroutine 每个负责 25% 的键(没有冲突)。在这种情况下,sync.Map也是首选。

sync.Pool

sync.Pool是一个并发池,负责安全地保存一组对象

其公共方法包括:

•Get() interface{}用于检索一个元素

•Put(interface{})用于添加一个元素

pool := &sync.Pool{}

pool.Put(NewConnection(1))

pool.Put(NewConnection(2))

pool.Put(NewConnection(3))

connection := pool.Get().(*Connection)

fmt.Printf("%d\n", connection.id)

connection = pool.Get().(*Connection)

fmt.Printf("%d\n", connection.id)

connection = pool.Get().(*Connection)

fmt.Printf("%d\n", connection.id)

1

3

2

值得注意的是,就顺序而言是没有保证的。Get方法指定它从池中获取一个任意的项目。

也可以指定一个创建方法:

pool := &sync.Pool{

New: func() interface{} {

return NewConnection()

},

}

connection := pool.Get().(*Connection)

每次调用Get()时,它将返回由传递给pool.New的函数创建的对象(在本例中是一个指针)。

何时应该使用sync.Pool呢?有两种情况:

第一种情况是当我们需要重用共享且长期存在的对象时,比如一个数据库连接。

第二种情况是优化内存分配

让我们考虑一个函数的示例,该函数将数据写入缓冲区并将结果持久化到文件中。使用sync.Pool,我们可以重复使用分配给缓冲区的空间,跨不同的函数调用重复使用同一个对象。

第一步是检索先前分配的缓冲区(或者如果是第一次调用,则创建一个,但这已经被抽象化了)。然后,延迟操作是将缓冲区放回池中。

func writeFile(pool *sync.Pool, filename string) error {

// Gets a buffer object

buf := pool.Get().(*bytes.Buffer)

// Returns the buffer into the pool

defer pool.Put(buf)

// Reset buffer otherwise it will contain "foo" during the first call

// Then "foofoo" etc.

buf.Reset()

buf.WriteString("foo")

return ioutil.WriteFile(filename, buf.Bytes(), 0644)

}

sync.Pool还有一个要提到的重要点。由于指针可以被放入Get()返回的接口值中,无需进行任何分配,因此最好将指针放入池中而不是结构体。

这样,我们既可以有效地重用已分配的内存,又可以减轻垃圾收集器的压力,因为如果变量逃逸到堆上,它就不需要再次分配内存。

sync.Once

sync.Once是一个简单而强大的原语,用于确保一个函数只被执行一次

在这个例子中,将只有一个 goroutine 显示输出消息:

once := &sync.Once{}

for i := 0; i 

i := i

go func() {

once.Do(func() {

fmt.Printf("first %d\n", i)

})

}()

}

我们使用了Do(func())方法来指定只有这部分代码必须被执行一次。

sync.Cond

让我们以最可能最少使用的原语sync.Cond结束。

它用于向 goroutine 发出信号(一对一)或向 goroutine(s) 广播信号(一对多)。

假设我们有一个场景,需要通知一个 goroutine 共享切片的第一个元素已被更新。

创建一个sync.Cond需要一个sync.Locker对象(可以是sync.Mutex或sync.RWMutex):

cond := sync.NewCond(&sync.RWMutex{})

接下来,让我们编写一个函数来显示切片的第一个元素:

func printFirstElement(s []int, cond *sync.Cond) {

cond.L.Lock()

cond.Wait()

fmt.Printf("%d\n", s[0])

cond.L.Unlock()

}

正如你所看到的,我们可以使用cond.L来访问内部互斥锁。一旦锁被获取,我们调用cond.Wait(),它会阻塞直到收到信号。

现在回到主 goroutine。我们将通过传递一个共享切片和之前创建的sync.Cond来创建一个printFirstElement池。然后,我们调用一个get()函数,将结果存储在s[0]中并发出一个信号:

s := make([]int, 1)

for i := 0; i 

go printFirstElement(s, cond)

}

i := get()

cond.L.Lock()

s[0] = i

cond.Signal()

cond.L.Unlock()

这个信号将解除一个创建的 goroutine 的阻塞状态,它将显示s[0]。

然而,如果我们退一步来看,我们可能会认为我们的代码可能违反了 Go 最基本的原则之一:

不要通过共享内存来通信;相反,通过通信来共享内存。

事实上,在这个例子中,最好使用一个通道来传递get()返回的值。

然而,我们也提到了sync.Cond还可以用于广播信号

让我们修改上一个示例的结尾,将Signal()改为Broadcast():

i := get()

cond.L.Lock()

s[0] = i

cond.Broadcast()

cond.L.Unlock()

在这种情况下,所有的 goroutine都会被触发。

众所周知,通道元素只会被一个 goroutine 捕获。唯一模拟广播的方式是关闭一个通道,但这不能重复使用。因此,尽管 颇具争议,这无疑是一个有趣的特性。

还有一个值得提及的sync.Cond使用场景,也许是最重要的一个:

示例的 Go Playground 地址:https://play.golang.org/p/ap5qXF5DAg5

  • 发表于:
  • 原文链接https://page.om.qq.com/page/O3FhEblz39b8GZjVUcn12elA0
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券