目录

Golang Mutex源码分析

Golang标准库中提供了互斥锁Mutex的原语来解决并发资源竞争,这篇文章探讨了标准库 中Mutex的实现原理

基础知识

信号量

信号量Semaphore 是计算机科学家 Dijkstra 发明的数据结构,是在多线程环境下使用的一种设施,是可以用来保证两个或多个关键代码段不被并发调用。其本质是一个整数,有两个基本操作:

  1. 申请acquire(也称为 wait、decrement 或 P 操作):

    将信号量减 1,如果结果值为负则挂起协程,直到其他线程进行了信号量累加为正数才能恢复。如结果为正数,则继续执行。

  2. 释放release(也称 signal、increment 或 V 操作):

    将信号量加 1,如存在被挂起的协程,此时唤醒他们中的一个协程。

CAS 操作

CAS操作是CPU指令提供的一个原子操作, 其全程为 Compare And Swap,在Go标准库 sync/atomic 中实现了这个方法:

1// CompareAndSwapInt32 executes the compare-and-swap operation for an int32 value.
2func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)

这个函数会首先比较指针addr指向的值是否和old是否相等,如果相等则将addr指向的值替换为new,并返回true, 否则不做任何操作,并返回false

Mutex的第一次提交

在2008年 Russ Cox 提交了第一版的Mutex实现[1], 当时的实现比较简单,我们先从这一版开始了解Mutex的底层原理,由于当时Go还未发布1.0版本,也没有sync/atomic包,与现在的实现有很大的区别,但这并不影响我们理解其中的逻辑

Mutex的定义

1export type Mutex struct {
2    key int32; // 记录当前锁是否被某个goroutine持有
3    sema int32; // 信号量
4}

初版的Mutex定义非常简单,使用key记录当前锁是否被持有,sema记录当前信号量

请求锁的实现

1func (m *Mutex) Lock() {
2    if xadd(&m.key, 1) == 1 { // 将标记加1,判断是否有其他goroutine持有锁
3        // changed from 0 to 1; we hold lock
4        return; // 当前 goroutine 持有锁,直接返回
5    }
6    sys.semacquire(&m.sema); // 挂起当前goroutine,等待调度
7}

这个函数实现的很简单,在加锁时,首先将当前锁标记为已持有,如果当前锁没有被其他goroutine持有,则直接返回, 否则,挂起当前goroutine,等待信号量调度。

xadd函数
1func xadd(val *int32, delta int32) (new int32) {
2    for { // 不断自旋操作
3        v := *val;
4        if cas(val, v, v+delta) { // 判断是否被其他goroutine修改
5            return v+delta; // 返回新值
6        }
7    }
8    panic("unreached")
9}

xadd通过自旋CAS操作,将val的值加delta,就相当于现在Go语言的atomic.AddInt32

释放锁的实现

释放锁的实现也很简洁:

1func (m *Mutex) Unlock() {
2    if xadd(&m.key, -1) == 0 { // 将标记减1
3        // changed from 1 to 0; no contention
4        return; // 如果没有其他goroutine持有锁,直接返回
5    }
6    sys.semrelease(&m.sema); // 通过信号量唤醒被挂起的goroutine
7}

释放锁时,首先将当前标记减一,如果当前锁没有被其他goroutine持有,则直接返回, 否则,通过信号量通知运行时,调度被挂起的goroutine

在这个版本,Mutex已经实现了基本的功能,但是这个版本有一个问题,所有goroutine会排队等待 运行时的调度,虽然这保证了公平性,所有的goroutine都会有机会参与调度,但是从性能的角度上看, 这会导致频繁的上下文切换,如果我们把锁直接交给新人(未挂起的goroutine),这样就可以避免上下文的切换, 于是Go 团队再 Go1.0 正式版时对Mutex进行了较大的调整。

加入唤醒机制

在Go 1.0版本[2]中, Mutex的结构体字段进行了调整

 1type Mutex struct {
 2    state int32 // 复合数据,下文进行解释
 3    sema  uint32 // 信号量
 4}
 5
 6const (
 7    mutexLocked = 1 << iota // state的第一位,代表当前锁是否被持有
 8    mutexWoken              // state的第二位,唤醒标记
 9    mutexWaiterShift = iota // 位移
10)

在这一版中,将Mutex的第一个字段由key改为state,其含义发生了很大的变化,state的第一位表示当前锁是否被持有, 相当于之前的keystate的第二位是唤醒标记,代表当前是否有唤醒的goroutine正在请求锁,剩下的30位用来表示等待中 的Waiter数量。

Go1.0 请求锁

在这一版中,代码相比第一版有很大变化,其主要优化是给新来的goroutine一些机会,让新goroutine能不参与休眠就获取锁

 1func (m *Mutex) Lock() {
 2    // Fast path: grab unlocked mutex.
 3    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { 
 4        return // 当前锁未被持有
 5    }
 6
 7    awoke := false 
 8    for {
 9        old := m.state
10        new := old | mutexLocked // 新状态加上锁
11        if old&mutexLocked != 0 {
12            new = old + 1<<mutexWaiterShift // 如果已经有goroutine持有锁,则等待的 Waiter + 1
13        }
14        if awoke {
15            // The goroutine has been woken from sleep,
16            // so we need to reset the flag in either case.
17            new &^= mutexWoken // 清空唤醒标记
18        }
19        if atomic.CompareAndSwapInt32(&m.state, old, new) { // 置新状态
20            if old&mutexLocked == 0 {
21                break // 旧状态锁已释放,获取当前锁
22            }
23            runtime_Semacquire(&m.sema) // 请求信号量
24            awoke = true
25        }
26    }
27}

我们可以用下面这个状态图表示请求锁的过程

我们可以发现,在这版的Mutex中,给新来的goroutine会在加入等待队列前就去尝试获取锁, 如果失败则加入等待队列中

Go1.0 释放锁

同时在Go 1.0中,释放锁的代码也变得更加复杂了

 1func (m *Mutex) Unlock() {
 2    // Fast path: drop lock bit.
 3    new := atomic.AddInt32(&m.state, -mutexLocked) // 去掉持有锁标记
 4    if (new+mutexLocked)&mutexLocked == 0 {
 5        panic("sync: unlock of unlocked mutex") // 重复Unlock时panic
 6    }
 7
 8    old := new
 9    for {
10        // 如果没有其他的waiter
11        // 或者已经有唤醒的goroutine
12        if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 {
13            return
14        }
15        // 挑选一个waiter唤醒
16        new = (old - 1<<mutexWaiterShift) | mutexWoken // 打上唤醒标记
17        if atomic.CompareAndSwapInt32(&m.state, old, new) { // 置新状态
18            runtime_Semrelease(&m.sema)
19            return
20        }
21        old = m.state
22    }
23}
  1. 在释放锁时,首先会将锁标记为未锁状态,如果当前锁已经是未锁状态,则会panic(第5行)
  2. 如果当前已经有唤醒的goroutine或者没有等待中的waiter,我们就什么都不用做,其他goroutine会自己抢夺这把锁
  3. 如果没有唤醒的goroutine,就从等待队列中唤醒一个goroutine

给新 Goroutine 更多机会

在 Go1.5 版本中,Go团队又对Mutex进行了一次优化[3]

在某些临界区,代码执行的速度很快,如果加入等待队列就会浪费很多时间,Go 团队针对这一情景对Mutex进行了优化

 1func (m *Mutex) Lock() {
 2    // Fast path: grab unlocked mutex.
 3    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
 4        if raceenabled { // race detector 相关
 5            raceAcquire(unsafe.Pointer(m))
 6        }
 7        return
 8    }
 9
10    awoke := false
11    iter := 0 // 自旋次数
12    for {
13        old := m.state
14        new := old | mutexLocked
15        if old&mutexLocked != 0 { // 当前锁被持有
16            if runtime_canSpin(iter) { // 检测是否可以自旋
17                // 如果当前的没有其他唤醒的goroutine
18                // 尝试将当前goroutine置为唤醒状态
19                // 提醒 Unlock 不去唤醒其他goroutine
20                if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
21                    atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
22                    awoke = true // 设置当前唤醒状态
23                }
24                runtime_doSpin()
25                iter++
26                continue // 自旋再次请求锁
27            }
28            new = old + 1<<mutexWaiterShift // 加入等待队列
29        }
30        if awoke {
31            // The goroutine has been woken from sleep,
32            // so we need to reset the flag in either case.
33            if new&mutexWoken == 0 {
34                panic("sync: inconsistent mutex state")
35            }
36            new &^= mutexWoken
37        }
38        if atomic.CompareAndSwapInt32(&m.state, old, new) {
39            if old&mutexLocked == 0 {
40                break
41            }
42            runtime_Semacquire(&m.sema)
43            awoke = true
44            iter = 0 // 清空自旋计数
45        }
46    }
47
48    if raceenabled { // race detector 相关
49        raceAcquire(unsafe.Pointer(m))
50    }
51}

在某些临界区,代码执行速度很快,如果通过自旋几次就能获取到锁的所有权,就可以避免加入等待队列, 这样就可以减少上下文的切换,在某些情况下能很好的提高性能。

饥饿机制

进过几次优化后,Mutex的性能已经十分好了,但是由于自旋的存在,在特定情况下,有可能出现新人不断 地获取锁,而等待队列中的goroutine一直没有机会获取到锁,Go 团队针对这种情况加入了饥饿机制。

我们拿最新的 Go 1.15 中的源码进行分析

1const (
2    mutexLocked = 1 << iota // mutex is locked
3    mutexWoken              // 唤醒标记
4    mutexStarving           // 饥饿标记
5    mutexWaiterShift = iota // 偏移量
6
7    starvationThresholdNs = 1e6 // 1ms
8)

在这个版本中, state的第三位被用作饥饿标记

 1func (m *Mutex) lockSlow() {
 2    var waitStartTime int64
 3    starving := false // 初始饥饿标记
 4    awoke := false
 5    iter := 0
 6    old := m.state
 7    for {
 8        // Don't spin in starvation mode, ownership is handed off to waiters
 9        // so we won't be able to acquire the mutex anyway.
10        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
11            // Active spinning makes sense.
12            // Try to set mutexWoken flag to inform Unlock
13            // to not wake other blocked goroutines.
14            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
15                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
16                awoke = true
17            }
18            runtime_doSpin()
19            iter++
20            old = m.state
21            continue
22        }
23        new := old
24        // Don't try to acquire starving mutex, new arriving goroutines must queue.
25        if old&mutexStarving == 0 {
26            new |= mutexLocked
27        }
28        if old&(mutexLocked|mutexStarving) != 0 {
29            new += 1 << mutexWaiterShift
30        }
31        // The current goroutine switches mutex to starvation mode.
32        // But if the mutex is currently unlocked, don't do the switch.
33        // Unlock expects that starving mutex has waiters, which will not
34        // be true in this case.
35        if starving && old&mutexLocked != 0 {
36            new |= mutexStarving
37        }
38        if awoke {
39            // The goroutine has been woken from sleep,
40            // so we need to reset the flag in either case.
41            if new&mutexWoken == 0 {
42                throw("sync: inconsistent mutex state")
43            }
44            new &^= mutexWoken
45        }
46        if atomic.CompareAndSwapInt32(&m.state, old, new) {
47            if old&(mutexLocked|mutexStarving) == 0 {
48                break // locked the mutex with CAS
49            }
50            // If we were already waiting before, queue at the front of the queue.
51            queueLifo := waitStartTime != 0
52            if waitStartTime == 0 {
53                waitStartTime = runtime_nanotime()
54            }
55            runtime_SemacquireMutex(&m.sema, queueLifo, 1)
56            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
57            old = m.state
58            if old&mutexStarving != 0 {
59                // If this goroutine was woken and mutex is in starvation mode,
60                // ownership was handed off to us but mutex is in somewhat
61                // inconsistent state: mutexLocked is not set and we are still
62                // accounted as waiter. Fix that.
63                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
64                    throw("sync: inconsistent mutex state")
65                }
66                delta := int32(mutexLocked - 1<<mutexWaiterShift)
67                if !starving || old>>mutexWaiterShift == 1 {
68                    // Exit starvation mode.
69                    // Critical to do it here and consider wait time.
70                    // Starvation mode is so inefficient, that two goroutines
71                    // can go lock-step infinitely once they switch mutex
72                    // to starvation mode.
73                    delta -= mutexStarving
74                }
75                atomic.AddInt32(&m.state, delta)
76                break
77            }
78            awoke = true
79            iter = 0
80        } else {
81            old = m.state
82        }
83    }
84
85    if race.Enabled {
86        race.Acquire(unsafe.Pointer(m))
87    }
88}

(咕了,过几天再写