上一章我们学到,互斥锁是一个很有用的同步工具,它可以保证每一时刻进入临界区的goroutine只有一个。通过对互斥锁的合理使用,我们可以使一个goroutine在执行临界区中的代码时,不被其他的goroutine打扰,但是它仍然可能会被中断(interruption)。
那什么是原子操作呢?我们已经知道,原子操作即是进行过程中不能被中断的操作。也就是说,针对某个值的原子操作在被进行的过程当中,CPU绝不会再去进行其它的针对该值的操作。为了实现这样的严谨性,原子操由 CPU 提供芯片级别的支持,所以绝对有效,即使在拥有多 CPU 核心,或者多 CPU 的计算机系统中,原子操作的保证也是不可撼动的。这使得原子操作可以完全地消除竞态条件,并能够绝对地保证并发安全性,它的执行速度要比其他的同步工具快得多,通常会高出好几个数量级。
不过它的缺点也很明显,正因为原子操作不能被中断,所以它需要足够简单,并且要求快速。你可以想象一下,如果原子操作迟迟不能完成,而它又不会被中断,那么将会给计算机执行指令的效率带来多么大的影响,所以操作系统层面只对针对二进制位或整数的原子操作提供了支持。
因此,我们可以结合实际情况,来判断是否可以将锁替换成原子操作。
func AddInt32(addr *int32, delta int32) (new int32)func AddInt64(addr *int64, delta int64) (new int64)func AddUint32(addr *uint32, delta uint32) (new uint32)func AddUint64(addr *uint64, delta uint64) (new uint64)func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)
var opts int64 = 0for i := 0; i < 50; i++ {// 注意第一个参数必须是地址atomic.AddInt64(&opts, 3) //加操作//atomic.AddInt64(&opts, -1) 减操作time.Sleep(time.Millisecond)}time.Sleep(time.Second)fmt.Println("opts: ", atomic.LoadInt64(&opts))
^uint32(-N-1))func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)
var value int64func atomicAddOp(tmp int64) {for {oldValue := valueif atomic.CompareAndSwapInt64(&value, oldValue, oldValue+tmp) {return}}}
for {if atomic.CompareAndSwapInt32(&num2, 10, 0) {fmt.Println("The second number has gone to zero.")break}time.Sleep(time.Millisecond * 500)}
func LoadInt32(addr *int32) (val int32)func LoadInt64(addr *int64) (val int64)func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)func LoadUint32(addr *uint32) (val uint32)func LoadUint64(addr *uint64) (val uint64)func LoadUintptr(addr *uintptr) (val uintptr)
10.2.4 存储
func StoreInt32(addr *int32, val int32)func StoreInt64(addr *int64, val int64)func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)func StoreUint32(addr *uint32, val uint32)func StoreUint64(addr *uint64, val uint64)func StoreUintptr(addr *uintptr, val uintptr)
我们不能把nil作为参数值传入原子值的Store方法,否则就会引发一个panic。这里要注意,如果有一个接口类型的变量,它的动态值是nil,但动态类型却不是nil,那么它的值就不等于nil,这样一个变量的值是可以被存入原子值,这块知识可以在接口这一章中查看。
我们向原子值存储的第一个值,决定了它今后能且只能存储哪一个类型的值。例如,我第一次向一个原子值存储了一个string类型的值,那我在后面就只能用该原子值来存储字符串了。如果我又想用它存储结构体,那么在调用它的Store方法的时候就会引发一个panic,这个panic会告诉我,这次存储的值的类型与之前的不一致。
func SwapInt32(addr *int32, new int32) (old int32)func SwapInt64(addr *int64, new int64) (old int64)func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)func SwapUint32(addr *uint32, new uint32) (old uint32)func SwapUint64(addr *uint64, new uint64) (old uint64)func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)
不要把内部使用的原子值暴露给外界。比如,声明一个全局的原子变量并不是一个正确的做法,这个变量的访问权限最起码也应该是包级私有的。
如果不得不让包外,或模块外的代码使用你的原子值,那么可以声明一个包级私有的原子变量,然后再通过一个或多个公开的函数,让外界间接地使用到它。注意,这种情况下不要把原子值传递到外界,不论是传递原子值本身还是它的指针值。
如果通过某个函数可以向内部的原子值存储值的话,那么就应该在这个函数中先判断被存储值类型的合法性。若不合法,则应该直接返回对应的错误值,从而避免 panic 的发生。
如果可能的话,我们可以把原子值封装到一个数据类型中,比如一个结构体类型。这样,我们既可以通过该类型的方法更加安全地存储值,又可以在该类型中包含可存储值的合法类型信息。
var box6 atomic.Valuev6 := []int{1, 2, 3}box6.Store(v6)v6[1] = 4 // 注意,此处的操作不是并发安全的!
store := func(v []int) {replica := make([]int, len(v))copy(replica, v)box6.Store(replica)}store(v6)v6[2] = 5 // 此处的操作是安全的。
// 环形队列type RingBuffer struct {err errorcount int32size int32head int32tail int32buf []unsafe.Pointer}// Get方法从buf中取出对象func (r *RingBuffer) Get() interface{} {// 在高并发开始的时候,队列容易空,直接判断空性能最优if atomic.LoadInt32(&r.count) <= 0 {return nil}// 当扣减数量后没有超,就从队列里取出对象if atomic.AddInt32(&r.count, -1) >= 0 {idx := (atomic.AddInt32(&r.head, 1) - 1) % r.sizeif obj := atomic.LoadPointer(&r.buf[idx]); obj != unsafe.Pointer(nil) {o := *(*interface{})(obj)atomic.StorePointer(&r.buf[idx], nil)return o}} else {// 当减数量超了,再加回去atomic.AddInt32(&r.count, 1)}return nil}// Put方法将对象放回到buf中。如果buf满了,返回falsefunc (r *RingBuffer) Put(obj interface{}) bool {// 在高并发结束的时候,队列容易满,直接判满性能最优if atomic.LoadInt32(&r.count) >= r.size {return false}// 当增加数量后没有超,就将对象放到队列里if atomic.AddInt32(&r.count, 1) <= r.size {idx := (atomic.AddInt32(&r.tail, 1) - 1) % r.sizeatomic.StorePointer(&r.buf[idx], unsafe.Pointer(&obj))return true}// 当加的数量超了,再减回去atomic.AddInt32(&r.count, -1)return false}