Golang Mutex源码分析
Golang标准库中提供了互斥锁的原语来解决并发资源竞争,这篇文章探讨了标准库 中Mutex的实现原理
基础知识
信号量
信号量 是计算机科学家 Dijkstra 发明的数据结构,是在多线程环境下使用的一种设施,是可以用来保证两个或多个关键代码段不被并发调用。其本质是一个整数,有两个基本操作:
申请acquire(也称为 wait、decrement 或 P 操作):
将信号量减 1,如果结果值为负则挂起协程,直到其他线程进行了信号量累加为正数才能恢复。如结果为正数,则继续执行。
释放release(也称 signal、increment 或 V 操作):
将信号量加 1,如存在被挂起的协程,此时唤醒他们中的一个协程。
CAS 操作
CAS操作是CPU指令提供的一个原子操作, 其全程为 Compare And Swap,在Go标准库 sync/atomic 中实现了这个方法:
1// CompareAndSwapInt32 executes the compare-and-swap operation for an int32 value.
2func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
这个函数会首先比较指针addr指向的值是否和old是否相等,如果相等则将addr指向的值替换为new,并返回true,
否则不做任何操作,并返回false。
Mutex的第一次提交
在2008年 Russ Cox 提交了第一版的Mutex实现[1],
当时的实现比较简单,我们先从这一版开始了解Mutex的底层原理,由于当时Go还未发布1.0版本,也没有sync/atomic包,与现在的实现有很大的区别,但这并不影响我们理解其中的逻辑
Mutex的定义
1export type Mutex struct {
2 key int32; // 记录当前锁是否被某个goroutine持有
3 sema int32; // 信号量
4}
初版的Mutex定义非常简单,使用key记录当前锁是否被持有,sema记录当前信号量
请求锁的实现
1func (m *Mutex) Lock() {
2 if xadd(&m.key, 1) == 1 { // 将标记加1,判断是否有其他goroutine持有锁
3 // changed from 0 to 1; we hold lock
4 return; // 当前 goroutine 持有锁,直接返回
5 }
6 sys.semacquire(&m.sema); // 挂起当前goroutine,等待调度
7}
这个函数实现的很简单,在加锁时,首先将当前锁标记为已持有,如果当前锁没有被其他goroutine持有,则直接返回,
否则,挂起当前goroutine,等待信号量调度。
1func xadd(val *int32, delta int32) (new int32) {
2 for { // 不断自旋操作
3 v := *val;
4 if cas(val, v, v+delta) { // 判断是否被其他goroutine修改
5 return v+delta; // 返回新值
6 }
7 }
8 panic("unreached")
9}
xadd通过自旋CAS操作,将val的值加delta,就相当于现在Go语言的atomic.AddInt32
释放锁的实现
释放锁的实现也很简洁:
1func (m *Mutex) Unlock() {
2 if xadd(&m.key, -1) == 0 { // 将标记减1
3 // changed from 1 to 0; no contention
4 return; // 如果没有其他goroutine持有锁,直接返回
5 }
6 sys.semrelease(&m.sema); // 通过信号量唤醒被挂起的goroutine
7}
释放锁时,首先将当前标记减一,如果当前锁没有被其他goroutine持有,则直接返回,
否则,通过信号量通知运行时,调度被挂起的goroutine。
在这个版本,Mutex已经实现了基本的功能,但是这个版本有一个问题,所有goroutine会排队等待
运行时的调度,虽然这保证了公平性,所有的goroutine都会有机会参与调度,但是从性能的角度上看,
这会导致频繁的上下文切换,如果我们把锁直接交给新人(未挂起的goroutine),这样就可以避免上下文的切换,
于是Go 团队再 Go1.0 正式版时对Mutex进行了较大的调整。
加入唤醒机制
在Go 1.0版本[2]中,
Mutex的结构体字段进行了调整
1type Mutex struct {
2 state int32 // 复合数据,下文进行解释
3 sema uint32 // 信号量
4}
5
6const (
7 mutexLocked = 1 << iota // state的第一位,代表当前锁是否被持有
8 mutexWoken // state的第二位,唤醒标记
9 mutexWaiterShift = iota // 位移
10)
在这一版中,将Mutex的第一个字段由key改为state,其含义发生了很大的变化,state的第一位表示当前锁是否被持有,
相当于之前的key,state的第二位是唤醒标记,代表当前是否有唤醒的goroutine正在请求锁,剩下的30位用来表示等待中
的Waiter数量。
Go1.0 请求锁
在这一版中,代码相比第一版有很大变化,其主要优化是给新来的goroutine一些机会,让新goroutine能不参与休眠就获取锁
1func (m *Mutex) Lock() {
2 // Fast path: grab unlocked mutex.
3 if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
4 return // 当前锁未被持有
5 }
6
7 awoke := false
8 for {
9 old := m.state
10 new := old | mutexLocked // 新状态加上锁
11 if old&mutexLocked != 0 {
12 new = old + 1<<mutexWaiterShift // 如果已经有goroutine持有锁,则等待的 Waiter + 1
13 }
14 if awoke {
15 // The goroutine has been woken from sleep,
16 // so we need to reset the flag in either case.
17 new &^= mutexWoken // 清空唤醒标记
18 }
19 if atomic.CompareAndSwapInt32(&m.state, old, new) { // 置新状态
20 if old&mutexLocked == 0 {
21 break // 旧状态锁已释放,获取当前锁
22 }
23 runtime_Semacquire(&m.sema) // 请求信号量
24 awoke = true
25 }
26 }
27}
我们可以用下面这个状态图表示请求锁的过程
我们可以发现,在这版的Mutex中,给新来的goroutine会在加入等待队列前就去尝试获取锁,
如果失败则加入等待队列中
Go1.0 释放锁
同时在Go 1.0中,释放锁的代码也变得更加复杂了
1func (m *Mutex) Unlock() {
2 // Fast path: drop lock bit.
3 new := atomic.AddInt32(&m.state, -mutexLocked) // 去掉持有锁标记
4 if (new+mutexLocked)&mutexLocked == 0 {
5 panic("sync: unlock of unlocked mutex") // 重复Unlock时panic
6 }
7
8 old := new
9 for {
10 // 如果没有其他的waiter
11 // 或者已经有唤醒的goroutine
12 if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 {
13 return
14 }
15 // 挑选一个waiter唤醒
16 new = (old - 1<<mutexWaiterShift) | mutexWoken // 打上唤醒标记
17 if atomic.CompareAndSwapInt32(&m.state, old, new) { // 置新状态
18 runtime_Semrelease(&m.sema)
19 return
20 }
21 old = m.state
22 }
23}
- 在释放锁时,首先会将锁标记为未锁状态,如果当前锁已经是未锁状态,则会
panic(第5行) - 如果当前已经有唤醒的
goroutine或者没有等待中的waiter,我们就什么都不用做,其他goroutine会自己抢夺这把锁 - 如果没有唤醒的
goroutine,就从等待队列中唤醒一个goroutine
给新 Goroutine 更多机会
在 Go1.5 版本中,Go团队又对Mutex进行了一次优化[3]
在某些临界区,代码执行的速度很快,如果加入等待队列就会浪费很多时间,Go 团队针对这一情景对Mutex进行了优化
1func (m *Mutex) Lock() {
2 // Fast path: grab unlocked mutex.
3 if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
4 if raceenabled { // race detector 相关
5 raceAcquire(unsafe.Pointer(m))
6 }
7 return
8 }
9
10 awoke := false
11 iter := 0 // 自旋次数
12 for {
13 old := m.state
14 new := old | mutexLocked
15 if old&mutexLocked != 0 { // 当前锁被持有
16 if runtime_canSpin(iter) { // 检测是否可以自旋
17 // 如果当前的没有其他唤醒的goroutine
18 // 尝试将当前goroutine置为唤醒状态
19 // 提醒 Unlock 不去唤醒其他goroutine
20 if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
21 atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
22 awoke = true // 设置当前唤醒状态
23 }
24 runtime_doSpin()
25 iter++
26 continue // 自旋再次请求锁
27 }
28 new = old + 1<<mutexWaiterShift // 加入等待队列
29 }
30 if awoke {
31 // The goroutine has been woken from sleep,
32 // so we need to reset the flag in either case.
33 if new&mutexWoken == 0 {
34 panic("sync: inconsistent mutex state")
35 }
36 new &^= mutexWoken
37 }
38 if atomic.CompareAndSwapInt32(&m.state, old, new) {
39 if old&mutexLocked == 0 {
40 break
41 }
42 runtime_Semacquire(&m.sema)
43 awoke = true
44 iter = 0 // 清空自旋计数
45 }
46 }
47
48 if raceenabled { // race detector 相关
49 raceAcquire(unsafe.Pointer(m))
50 }
51}
在某些临界区,代码执行速度很快,如果通过自旋几次就能获取到锁的所有权,就可以避免加入等待队列, 这样就可以减少上下文的切换,在某些情况下能很好的提高性能。
饥饿机制
进过几次优化后,Mutex的性能已经十分好了,但是由于自旋的存在,在特定情况下,有可能出现新人不断
地获取锁,而等待队列中的goroutine一直没有机会获取到锁,Go 团队针对这种情况加入了饥饿机制。
我们拿最新的 Go 1.15 中的源码进行分析
1const (
2 mutexLocked = 1 << iota // mutex is locked
3 mutexWoken // 唤醒标记
4 mutexStarving // 饥饿标记
5 mutexWaiterShift = iota // 偏移量
6
7 starvationThresholdNs = 1e6 // 1ms
8)
在这个版本中, state的第三位被用作饥饿标记
1func (m *Mutex) lockSlow() {
2 var waitStartTime int64
3 starving := false // 初始饥饿标记
4 awoke := false
5 iter := 0
6 old := m.state
7 for {
8 // Don't spin in starvation mode, ownership is handed off to waiters
9 // so we won't be able to acquire the mutex anyway.
10 if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
11 // Active spinning makes sense.
12 // Try to set mutexWoken flag to inform Unlock
13 // to not wake other blocked goroutines.
14 if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
15 atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
16 awoke = true
17 }
18 runtime_doSpin()
19 iter++
20 old = m.state
21 continue
22 }
23 new := old
24 // Don't try to acquire starving mutex, new arriving goroutines must queue.
25 if old&mutexStarving == 0 {
26 new |= mutexLocked
27 }
28 if old&(mutexLocked|mutexStarving) != 0 {
29 new += 1 << mutexWaiterShift
30 }
31 // The current goroutine switches mutex to starvation mode.
32 // But if the mutex is currently unlocked, don't do the switch.
33 // Unlock expects that starving mutex has waiters, which will not
34 // be true in this case.
35 if starving && old&mutexLocked != 0 {
36 new |= mutexStarving
37 }
38 if awoke {
39 // The goroutine has been woken from sleep,
40 // so we need to reset the flag in either case.
41 if new&mutexWoken == 0 {
42 throw("sync: inconsistent mutex state")
43 }
44 new &^= mutexWoken
45 }
46 if atomic.CompareAndSwapInt32(&m.state, old, new) {
47 if old&(mutexLocked|mutexStarving) == 0 {
48 break // locked the mutex with CAS
49 }
50 // If we were already waiting before, queue at the front of the queue.
51 queueLifo := waitStartTime != 0
52 if waitStartTime == 0 {
53 waitStartTime = runtime_nanotime()
54 }
55 runtime_SemacquireMutex(&m.sema, queueLifo, 1)
56 starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
57 old = m.state
58 if old&mutexStarving != 0 {
59 // If this goroutine was woken and mutex is in starvation mode,
60 // ownership was handed off to us but mutex is in somewhat
61 // inconsistent state: mutexLocked is not set and we are still
62 // accounted as waiter. Fix that.
63 if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
64 throw("sync: inconsistent mutex state")
65 }
66 delta := int32(mutexLocked - 1<<mutexWaiterShift)
67 if !starving || old>>mutexWaiterShift == 1 {
68 // Exit starvation mode.
69 // Critical to do it here and consider wait time.
70 // Starvation mode is so inefficient, that two goroutines
71 // can go lock-step infinitely once they switch mutex
72 // to starvation mode.
73 delta -= mutexStarving
74 }
75 atomic.AddInt32(&m.state, delta)
76 break
77 }
78 awoke = true
79 iter = 0
80 } else {
81 old = m.state
82 }
83 }
84
85 if race.Enabled {
86 race.Acquire(unsafe.Pointer(m))
87 }
88}
(咕了,过几天再写