Skip to main content

原子操作

一个程序开启运行直到运行完毕后再进行下一个程序的执行的操作称为原子操作(Atomic)。

在Go语言中,多个Goroutine的运行针对同一个变量的操作是竞争态关系(Data Race/Race Status)。

竞争态关系会使得变量的结果变得不可预测,所以在某个Goroutine操作一个变量的时候,需要一种锁机制让系统禁止多个Goroutine操作同一变量,如果不禁止,Goroutine的运行会使得程序执行变得不安全。

互斥锁

Go语言中提供了Mutex的互斥锁。

考虑如下示例:

package main

import (
"fmt"
"sync"
)

var a = 0
var wg sync.WaitGroup
var lk sync.Mutex

func main() {
wg.Add(2)

go plus()
go minus()

wg.Wait()
}

func plus() {
defer wg.Done()
for i := 0; i < 10; i++ {
// 加锁
lk.Lock()
a++
// 解锁
lk.Unlock()
fmt.Println("plus:", a)
}
}

func minus() {
defer wg.Done()
for i := 0; i < 10; i++ {
// 加锁
lk.Lock()
a--
// 解锁
lk.Unlock()
fmt.Println("minus:", a)
}
}

在执行过程中不对a进行加锁,会导致a的值变得不可预测。在加锁之后,虽然输出的顺序不确定,但总能保证同一时间只有一个Goroutine对a进行操作,使得最终的结果可预测。

需要解决竞争态的多个Goroutine才使用同一把锁,因为Mutex会产生Goroutine运行的阻塞,所以如果没有竞争态或不需要解决竞争关系的多个Goroutine不要使用同一把锁。

Atomic

Go语言中提供了Atomic的原子操作,主要针对数字。

该原子操作的Add方法操作的是指针对应的空间,直接修改值。

package main

import (
"fmt"
"sync"
"sync/atomic"
)

var a int32 = 0
var wg sync.WaitGroup

func main() {
wg.Add(2)
go plus()
go minus()
wg.Wait()
}

func plus() {
defer wg.Done()
for i := 0; i < 10; i++ {
atomic.AddInt32(&a, 1)
}
}

func minus() {
defer wg.Done()
for i := 0; i < 10; i++ {
atomic.AddInt32(&a, -1)
}
}

Mutex

Mutex是一个结构体,定义如下:

type Mutex struct {
state int32
sema uint32
}

state

state表示互斥锁的状态:

  1. Locked 锁状态
  2. Woken 唤醒状态
  3. Starving 饥饿状态

WaiterShift记录值:

state中还存在一个记录值,表示等待队列的记录,即有多少携程正在等待。

sema

sema表示信号量(信号锁),表示最大并发数量,初始值为0

sema是一个uint32类型的值,对应一个结构体,定义如下:

type semaRoot struct {
lock mutex
treap *sudog
nwait atomic.Uint32
}
  1. lock 表示对应的锁对象。
  2. treap 一个平衡树(二叉树),正在等待运行的Goroutine。
  3. nwait 表示等待的Goroutine数量。

sema的机制

  1. 当一个Goroutine获取到锁时,sema的值会减1
  2. 当一个Goroutine释放锁时,sema的值会加1
  3. sema0的时候,携程系统就会进入到休眠状态,Goroutine会进入到等待队列中。
  4. 只要有一个Goroutine释放了锁,sema就会有一个并发量,那么系统就会从等待队列里面唤醒一个Goroutine。

底层原理

锁竞争机制

  1. 初始状态下,LOCkEDWOKENSTARVING以及WaiterShift都是0
  2. 当有多个G(如上的G1和G2)竞争锁时,锁最终只可能同时被一个G(如上的G1)拿到,此时LOCKED变为1,G1执行相关任务。
  3. G2在没有竞争到锁的情况下,会自旋拿锁,如果在一段时间内仍然没有拿到锁,那么G2会休眠,WaiterShift1,G2最终进入等待队列treap挂起。
  4. 在G2被挂起的过程中,会检测sema的值,如果sema不为0,那么G2会继续自旋拿锁,否则G2才会真正进入等待队列挂起。
  5. 当有新的G(如上的G3)进入时,在锁被占用的情况下,也会往复G2拿锁的过程。
  6. 如果锁被解开了,系统会从treap中唤醒拿出一个G,WaiterShift1,在sema大于0的情况下,此时该G会拿到锁并运行任务。
  7. 如果一个G在多次被唤醒并且多次没有拿到锁,当其等待拿锁的时间超过10ms,那么会记录STARVING状态。
  8. 如果有STARVING状态的记录,当有新的G进入时,会直接进入等待队列挂起,以减少与唤醒的G的竞争,增加了唤醒的G获得锁的概率。
  9. STARVING状态下,当等待队列的G被清空(全部唤醒执行完成)时,会退出STARVING模式。

RWMutex

RWMutex是一种读写锁,比Mutex更严格。

  1. 读锁时不限制读取,但不能写入。
  2. 写锁时只允许一个写入,不允许读取。

读与写操作时,读一般并发比较大,而写的并发比较小,读写如果没有限制的话,会造成数据结果不可预料,应用性能降低。所以读可并发,而写需要被阻塞,写的时候,限制读取。

RWMutex主要有四个方法:

  • lock.RLock() 读取加锁
  • lock.RUnlock() 读取解锁
  • lock.Lock() 写入解锁
  • lock.Unlock() 写入解锁
package main

import (
"fmt"
"sync"
"time"
)

var lock sync.RWMutex

func main() {
data := 0

go func() {
defer lock.RUnlock()
lock.RLock()
fmt.Println(data)
}()

go func() {
defer lock.Unlock()
lock.Lock()
data += 10
fmt.Println("已写入")
}()

go func() {
defer lock.RUnlock()
lock.RLock()
fmt.Println(data)
}()

time.Sleep(2 * time.Second)
}