前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【并发编程】atomic 如何保证原子操作?分别用那几个方法?

【并发编程】atomic 如何保证原子操作?分别用那几个方法?

作者头像
了凡银河系
发布2022-08-22 13:48:15
9790
发布2022-08-22 13:48:15
举报
文章被收录于专栏:了凡的专栏了凡的专栏

前言

之前学习了一些并发原语,已经认为差不多可以应对很多场景了,但是为什么还要学习原子操作呢?原来,在一些场景中,使用并发原语可能更加复杂,为了更轻松地实现底层的优化。

原子操作是什么?

一个原子在执行的时候,其他线程不会看到执行一半的操作结果。其它线程看来,原子操作要么执行完了,要么还没有执行,就像一个最小的粒子-原子一样,不可分割。

CPU提供了基础的原子操作,不过,不同架构的系统的原子操作是不一样的。

对于单处理器单核系统来说,如果一个操作是由一个CPU指令来实现的,那么它就是原子操作,比如它的XCHG和INC等指令。如果操作是基于多条指令来实现的,那么,执行的过程中可能会被中断,并执行上下文切换,这样的话,原子性的保证就被打破了,因为这个时候,操作可能只执行了一半。

在多处理器多核系统中,原子操作的实现就比较复杂了。

由于cache的存在,单个核上的单个指令进行原子操作的时候,要确保其它处理器或者核不访问此原子操作的地址,或者是确保其他处理器或者核总是访问原子操作之后的最新的值。X86架构中提供了指令前缀LOCK,LOCK保证了指令(比如LOCK CMPXCHG op1、op2)不会受其它处理器或CPU核的影响,有些指令(比如XCHG)本身就提供Lock的机制。不同的CPU架构提供的原子操作指令的方式也是不同的,比如对于多核的MIPS和ARM,提供了LL/SC(Load Link/Store Cond)指令,可以帮助实现原子操作(ARMLL/SC指令LDREX和STREX)。

因为不同的CPU框架甚至不同的版本提供的原子操作的指令是不同的,所以,要用一种编程语言实现支持不同架构的原子操作是相当有难度的。但是有个好消息就是Go提供了一个通用的原子操作的API,将更底层的不同的架构下的实现封装成atomic包,提供了修改类型的原子操作(RMW)和加载存储类型的原子操作的API。

有的代码也会因为框架的不同而不同。例如:

代码语言:javascript
复制
const x int64 = 1 + 1<<33

func main(){
    var i = x
    _ = i
}

所以,如果想要保证原子操作,一定要使用atomic提供的方法。

atomic原子操作的应用场景

使用atomic的一些方法,可以实现更底层的一些优化。如果使用Mutext等并发原语进行这些优化,虽然可以解决问题,但是这些并发原语的实现逻辑比较复杂,对性能还是有一定的影响的。

假如你想在程序中使用一个标志(flag,比如一个bool变型的变量),来标识一个定时任务是否已经启动执行了,你会怎么做呢?

先看看加锁的方法。如果使用Mutext和RWMutex,在读取和设置这个标志的时候加锁,是可以做到互斥的、保证同一时刻只有一个定时任务在执行的,所以使用Mutex或者RWMutex是一种解决方案。

这个场景中的问题不涉及到对资源复杂的竞争逻辑,只是会并发地读写这个标志,这类场景就适合使用atomic的原子操作。具体怎么做呢?可以使用一个uint32类型的变量,如果这个变量的值是0,就标识没有任务在执行,如果它的值是1,就标识已经有任务在完成了。

例如,另外一个,假设你在开发应用程序的时候,需要从配置服务器中读取一个节点的配置信息。而且,在这个节点的配置发生变更的时候,你需要重新从配置服务器中拉取一份新的配置并更新。程序中可能有多个goroutine都依赖这个配置,涉及到对这个配置对象的并发读写,可以使用读写锁实现对配置对象的保护。大多数情况,也可以利用atomic实现配置对象的更新和加载。

有时候,也可以使用atomic实现自己定义的基本并发原语,比如CondMutex、Mutex.LockContext、WaitGroup.Go等,可以使用atomic或者基于它的更高一级的并发原语去实现。之前很多的并发原语例如Mutex,就是通过atomic的方法实现的。

atomic提供的方法

atomic操作的对象是一个地址,你需要把可寻址的变量的地址作为参数传递给方法,而不是变量的值传递给方法。

Add

Add方法的签名:

代码语言:javascript
复制
func AddInt32(addr *int32, delta int32) (new int32)

// AddUint32 atomically adds delta to *addr and returns the new value.
// To subtract a signed positive constant value c from x, do AddUint32(&x, ^uint32(c-1)).
// In particular, to decrement x, do AddUint32(&x, ^uint32(0)).
func AddUint32(addr *uint32, delta uint32) (new uint32)

// AddInt64 atomically adds delta to *addr and returns the new value.
func AddInt64(addr *int64, delta int64) (new int64)

// AddUint64 atomically adds delta to *addr and returns the new value.
// To subtract a signed positive constant value c from x, do AddUint64(&x, ^uint64(c-1)).
// In particular, to decrement x, do AddUint64(&x, ^uint64(0)).
func AddUint64(addr *uint64, delta uint64) (new uint64)

// AddUintptr atomically adds delta to *addr and returns the new value.
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)

其实,Add方法就是给第一个参数地址中的值增加一个delta值。

对于有符号的整数来说,delta可以是一个负数,相当于减去一个值。对于无符号的整数和uinptr类型来说,怎么实现减去一个值呢?毕竟,atomic并没有提供单独的减法操作。

可以利用计算机补码的规则,把减法变成加法。以uint32类型为例:

代码语言:javascript
复制
Adduint32(&x, ^uint32(c-1))

如果要uint64的值操进行操作,uint32换成uint64即可。

尤其是减1这种特殊的操作,可以简化为:

代码语言:javascript
复制
AddUint32(&x,^uint32(0))

CAS

在CAS的方法签名中,需要提供操作的地址、元数据值、新值,如下所示:

代码语言:javascript
复制
// CompareAndSwapInt32 executes the compare-and-swap operation for an int32 value.
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)

// CompareAndSwapInt64 executes the compare-and-swap operation for an int64 value.
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)

这个方法会比较当前addr地址里的值是不是old,如果不等于old,就返回false;如果等于old,就把此地址的值替换成new值,返回true。这就相当于“判断相等才替换”。

如果使用伪代码来表示这个原子操作

代码语言:javascript
复制
if  *addr == old {
    *addr = new
    return true
}
return false

它支持的类型和方法如图所示:

代码语言:javascript
复制
// CompareAndSwapInt32 executes the compare-and-swap operation for an int32 value.
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)

// CompareAndSwapInt64 executes the compare-and-swap operation for an int64 value.
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)

// CompareAndSwapUint32 executes the compare-and-swap operation for a uint32 value.
func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)

// CompareAndSwapUint64 executes the compare-and-swap operation for a uint64 value.
func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)

// CompareAndSwapUintptr executes the compare-and-swap operation for a uintptr value.
func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)

// CompareAndSwapPointer executes the compare-and-swap operation for a unsafe.Pointer value.
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)

Swap

如果不需要比较旧值,只是比较粗暴地替换的话,就可以使用Swap方法,它替换后还可以返回旧值。

代码语言:javascript
复制
old = *addr
*addr = new
return old

它支持的数据类型和方法如图所示:

代码语言:javascript
复制
// SwapInt32 atomically stores new into *addr and returns the previous *addr value.
func SwapInt32(addr *int32, new int32) (old int32)

// SwapInt64 atomically stores new into *addr and returns the previous *addr value.
func SwapInt64(addr *int64, new int64) (old int64)

// SwapUint32 atomically stores new into *addr and returns the previous *addr value.
func SwapUint32(addr *uint32, new uint32) (old uint32)

// SwapUint64 atomically stores new into *addr and returns the previous *addr value.
func SwapUint64(addr *uint64, new uint64) (old uint64)

// SwapUintptr atomically stores new into *addr and returns the previous *addr value.
func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)

// SwapPointer atomically stores new into *addr and returns the previous *addr value.
func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)

Load

Load方法会取出addr地址中的值,即使在多处理器、多核、有CPU cache的情况下,这个操作也能保证Load是一个原子操作。

它支持的数据类型和方法如图所示:

代码语言:javascript
复制
// LoadInt32 atomically loads *addr.
func LoadInt32(addr *int32) (val int32)

// LoadInt64 atomically loads *addr.
func LoadInt64(addr *int64) (val int64)

// LoadUint32 atomically loads *addr.
func LoadUint32(addr *uint32) (val uint32)

// LoadUint64 atomically loads *addr.
func LoadUint64(addr *uint64) (val uint64)

// LoadUintptr atomically loads *addr.
func LoadUintptr(addr *uintptr) (val uintptr)

// LoadPointer atomically loads *addr.
func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)

Store

Store方法会把一个值存入到指定的addr地址中,即使在多处理器、多核、有CPU cache的情况下,这个操作也能保证Store是一个原子操作。别的goroutine通过Load读取出来,不会看到存取了一半的值。支持的数据类型和方法如下:

代码语言:javascript
复制
// StoreInt32 atomically stores val into *addr.
func StoreInt32(addr *int32, val int32)

// StoreInt64 atomically stores val into *addr.
func StoreInt64(addr *int64, val int64)

// StoreUint32 atomically stores val into *addr.
func StoreUint32(addr *uint32, val uint32)

// StoreUint64 atomically stores val into *addr.
func StoreUint64(addr *uint64, val uint64)

// StoreUintptr atomically stores val into *addr.
func StoreUintptr(addr *uintptr, val uintptr)

// StorePointer atomically stores val into *addr.
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)

Value

以上都是比较常见的类型,但是atomic还提供了一个特殊的类型:Value。它可以原子地址取对象类型,但也只能存取,不能CAS和Swap,常常用在配置变更等场景中。

代码语言:javascript
复制
type Value
    func(v *Value)Load() (x interface{})
    func(v *Value)Store(x interface{})

定义一个Value类型的变量config,用来存储配置信息。

首先,启动一个goroutine,然后随机sleep一段时间,之后变更一下配置,并通过Cond并发原语,通知其t它的reader去加载新的配置。

然后启动一个goroutine等待配置变更的信号,一旦有变更,它就会加载最新的配置。

通过这个例子,可以基本了解Value的Store/Load方法的使用。

代码语言:javascript
复制
import (
   "fmt"
   "math/rand"
   "sync"
   "sync/atomic"
   "time"
)

type Config struct {
   NodeName string
   Addr     string
   Count    int32
}

func loadNewConfig() Config {
   return Config{
      NodeName: "河南",
      Addr:     "120.219.67.23",
      Count:    rand.Int31n(25),
   }
}

func main() {
   var config atomic.Value
   config.Store(loadNewConfig())
   var cond = sync.NewCond(&sync.Mutex{})

   // 设置新的config
   go func() {
      for {
         time.Sleep(time.Duration(5+rand.Int63n(5)) * time.Second)
         config.Store(loadNewConfig())
         cond.Broadcast() // 通知等待着配置已变更
      }
   }()

   go func() {
      for {
         cond.L.Lock()
         cond.Wait()                 // 等待变更信号
         c := config.Load().(Config) // 读取新的配置
         fmt.Printf("new config: %+v\n", c)
         cond.L.Unlock()
      }
   }()

   select {}
}

到这里基本就将atomic提供的方法了解完了。我认为多去看源码是一个很不错的习惯。

总结

到这里如果你学习操作系统计算机专业课程,可能有更深的理解以及使用方式,目前应该对于atomic提供的各种方法你已经有所了解了。

到这里看到晁岳攀老师还提了一个问题转给大家讨论一下:对一个地址的赋值是原子操作吗?

CPU以及编译器,write的地址总是4的倍数,64位的系统总是8的倍数(还记得WaitGroup针对64位系统和32位系统对state1的字段不同的处理吗)。对齐地址的写,不会导致其他人看到只写了一半的数据,因为它通过一个指令就可以实现对地址的操作。如果地址不是对齐的话,那么,处理器就需要分成两个指令去处理如果执行了一个指令,其它人就会看到更新了一半的错误的数据,这被称做撕裂写(torn write)。所以,你可以认为赋值操作是一个原子操作,这个“原子操作”可以认为是保证数据的完整性

但是,对于现代的多处理多核的系统来说,由于cache、指令重排,可见性等问题,对原子操作的意义有了更多的追求。在多核系统中,一个核对地址的值的更改,在更新到主内存中之前,是在多级缓存中存放的。这时,多个核看到的数据可能是不一样的,其它的核可能还没有看到更新的数据,还在使用旧的数据。

多处理器多核心系统为了处理这类问题,使用了一种叫做内存屏障(memory fence或 memory barrier)的方式。一个写内存屏障会告诉处理器,必须要等到它管道中的未完成的操作(特别是写操作)都被刷新到内存中,再进行操作。此操作还会让相关的处理器的CPU缓存失效,以便让它们从主存中拉取最新的值。

atomic包提供的方法会提供内存屏障的功能,所以,atomic不仅仅可以保证赋值的数据完整性,还能保证数据的可见性,一旦一个核更新了该地址的值,其它处理器总是能读取到它的最新值。但是,需要注意的是,因为需要处理器之间保证数据的一致性,atomic 的操作也是会降低性能的。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-01-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 了凡银河系 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 原子操作是什么?
  • atomic原子操作的应用场景
  • atomic提供的方法
    • Add
      • CAS
        • Swap
          • Load
            • Store
              • Value
              • 总结
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档