二. golang 最新版本的 sync.Mutex
你可以大致扫描一下最新版本的实现,如果你第一眼就看的很懂了,每步的操作?为什么这样操作?有没有更加合理的操作?那恭喜你,你的水平已经超过google实现 sync.Mutex 的程序员了,甚至是大部分的程序员,因为这个程序历经几年的演化,才到了今天的样子,你第一眼就能看的如此透彻,那真的是很了不起。下面的章节是为没有看懂的人准备的。
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package sync provides basic synchronization primitives such as mutual
// exclusion locks. Other than the Once and WaitGroup types, most are intended
// for use by low-level library routines. Higher-level synchronization is
// better done via channels and communication.
//
// Values containing the types defined in this package should not be copied.
package sync
import (
“sync/atomic”
“unsafe”
)
// A Mutex is a mutual exclusion lock.
// Mutexes can be created as part of other structures;
// the zero value for a Mutex is an unlocked mutex.
type Mutex struct {
state int32
sema uint32
}
// A Locker represents an object that can be locked and unlocked.
type Locker interface {
Lock()
Unlock()
}
const (
mutexLocked = 1 « iota // mutex is locked
mutexWoken
mutexWaiterShift = iota
)
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if raceenabled {
raceAcquire(unsafe.Pointer(m))
}
return
}
awoke := false
for {
old := m.state
new := old | mutexLocked
if old&mutexLocked != 0 {
new = old + 1<<mutexWaiterShift
}
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&mutexLocked == 0 {
break
}
runtime_Semacquire(&m.sema)
awoke = true
}
}
if raceenabled {
raceAcquire(unsafe.Pointer(m))
} }
// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
//
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() {
if raceenabled {
_ = m.state
raceRelease(unsafe.Pointer(m))
}
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
if (new+mutexLocked)&mutexLocked == 0 {
panic("sync: unlock of unlocked mutex")
}
old := new
for {
// If there are no waiters or a goroutine has already
// been woken or grabbed the lock, no need to wake anyone.
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 {
return
}
// Grab the right to wake someone.
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema)
return
}
old = m.state
} } 三. 有没有更加简洁的实现方法?
有点操作系统知识的都知道,独占锁是一种特殊的PV 操作,就 0 – 1 PV操作。那我想,如果不考虑任何性能问题的话,用信号量应该就可以这样实现Mutex:
type Mutex struct {
sema uint32
}
func NewMutex() *Mutex {
var mu Mutex
mu.sema = 1
return &mu
}
func (m *Mutex) Lock() {
runtime_Semacquire(&m.sema)
}
func (m *Mutex2) Unlock() {
runtime_Semrelease(&m.sema)
}
当然,这个实现有点不符合要求。如果有个家伙不那么靠谱,加锁了一次,但是解锁了两次。第二次解锁的时候,应该报出一个错误,而不是让错误隐藏。于是乎,我们想到用一个变量表示加锁的次数。这样就可以判断有没有多次解锁。于是乎,我就想到了下面的解决方案:
type Mutex struct {
key int32
sema uint32
}
func (m *Mutex) Lock() {
if atomic.AddInt32(&m.key, 1) == 1 {
// changed from 0 to 1; we hold lock
return
}
runtime_Semacquire(&m.sema)
}
func (m *Mutex) Unlock() {
switch v := atomic.AddInt32(&m.key, -1); {
case v == 0:
// changed from 1 to 0; no contention
return
case v == -1:
// changed from 0 to -1: wasn’t locked
// (or there are 4 billion goroutines waiting)
panic(“sync: unlock of unlocked mutex”)
}
runtime_Semrelease(&m.sema)
}
这个解决方案除了解决了我们前面说的重复加锁的问题外,还对我们初始化工作做了简化,不需要构造函数了。注意,这也是golang里面一个常见的设计模式,叫做 零初始化。
表示多线程复杂状态,最好的办法就是抽象出 状态 和 操作,忽略掉线程,让问题变成一个状态机问题。这样的图不仅仅用于分析Mutex
Mutex can be in 2 modes of operations: normal and starvation.
In normal mode waiters are queued in FIFO order, but a woken up waiter does not own the mutex and competes with new arriving goroutines over the ownership. New arriving goroutines have an advantage – they are already running on CPU and there can be lots of them, so a woken up waiter has good chances of losing. In such case it is queued at front of the wait queue. If a waiter fails to acquire the mutex for more than 1ms, it switches mutex to the starvation mode.
In starvation mode ownership of the mutex is directly handed off from the unlocking goroutine to the waiter at the front of the queue. New arriving goroutines don’t try to acquire the mutex even if it appears to be unlocked, and don’t try to spin. Instead they queue themselves at the tail of the wait queue.
If a waiter receives ownership of the mutex and sees that either (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms, it switches mutex back to normal operation mode.
Normal mode has considerably better performance as a goroutine can acquire a mutex several times in a row even if there are blocked waiters.
Starvation mode is important to prevent pathological cases of tail latency.
互斥量可分为两种操作模式:正常和饥饿。
在正常模式下,等待的goroutines按照FIFO(先进先出)顺序排队,但是goroutine被唤醒之后并不能立即得到mutex锁,它需要与新到达的goroutine争夺mutex锁。
因为新到达的goroutine已经在CPU上运行了,所以被唤醒的goroutine很大概率是争夺mutex锁是失败的。出现这样的情况时候,被唤醒的goroutine需要排队在队列的前面。
如果被唤醒的goroutine有超过1ms没有获取到mutex锁,那么它就会变为饥饿模式。
在饥饿模式中,mutex锁直接从解锁的goroutine交给队列前面的goroutine。新达到的goroutine也不会去争夺mutex锁(即使没有锁,也不能去自旋),而是到等待队列尾部排队。
在饥饿模式下,有一个goroutine获取到mutex锁了,如果它满足下条件中的任意一个,mutex将会切换回去正常模式:
Mutex struct {
state int32 // 将一个32位整数拆分为 当前阻塞的goroutine数(29位)|饥饿状态(1位)|唤醒状态(1位)|锁状态(1位) 的形式,来简化字段设计
sema uint32 // 信号量
}
const (
mutexLocked = 1 « iota // 1 0001 含义:用最后一位表示当前对象锁的状态,0-未锁住 1-已锁住
mutexWoken // 2 0010 含义:用倒数第二位表示当前对象是否被唤醒 0-唤醒 1-未唤醒
mutexStarving // 4 0100 含义:用倒数第三位表示当前对象是否为饥饿模式,0为正常模式,1为饥饿模式。
mutexWaiterShift = iota // 3,从倒数第四位往前的bit位表示在排队等待的goroutine数
starvationThresholdNs = 1e6 // 1ms
可以看到Mutex中含有:
一个非负数信号量sema;
state表示Mutex的状态。
常量:
mutexLocked表示锁是否可用(0可用,1被别的goroutine占用)
mutexWoken=2表示mutex是否被唤醒
mutexWaiterShift=4表示统计阻塞在该mutex上的goroutine数目需要移位的数值。
将3个常量映射到state上就是
state: |32|31|…| |3|2|1|
______/ | | |
| | | |
| | | mutex的占用状态(1被占用,0可用)
| | |
| | mutex的当前goroutine是否被唤醒
| |
| 饥饿位,0正常,1饥饿
|
等待唤醒以尝试锁定的goroutine的计数,0表示没有等待者
如果同学们熟悉Java的锁,就会发现与AQS的设计是类似,只是没有AQS设计的那么精致,不得不感叹,JAVA的牛逼。 有同学是否会有疑问为什么使用的是int32而不是int64呢,因为32位原子性操作更好,当然也满足的需求。
Mutex在1.9版本中就两个函数Lock()和Unlock()。 下面我们先来分析最难的Lock()函数:
func (m *Mutex) Lock() {
// 如果m.state=0,说明当前的对象还没有被锁住,进行原子性赋值操作设置为mutexLocked状态,CompareAnSwapInt32返回true
// 否则说明对象已被其他goroutine锁住,不会进行原子赋值操作设置,CopareAndSwapInt32返回false
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked)
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// 开始等待时间戳
var waitStartTime int64
// 饥饿模式标识
starving := false
// 唤醒标识
awoke := false
// 自旋次数
iter := 0
// 保存当前对象锁状态
old := m.state
// 看到这个for {}说明使用了cas算法
for {
// 相当于xxxx...x0xx & 0101 = 01,当前对象锁被使用
if old&(mutexLocked|mutexStarving) == mutexLocked &&
// 判断当前goroutine是否可以进入自旋锁
runtime_canSpin(iter) {
// 主动旋转是有意义的。试着设置mutexwake标志,告知解锁,不要唤醒其他阻塞的goroutines。
if !awoke &&
// 再次确定是否被唤醒: xxxx...xx0x & 0010 = 0
old&mutexWoken == 0 &&
// 查看是否有goroution在排队
old>>mutexWaiterShift != 0 &&
// 将对象锁改为唤醒状态:xxxx...xx0x | 0010 = xxxx...xx1x
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}//END_IF_Lock
// 进入自旋锁后当前goroutine并不挂起,仍然在占用cpu资源,所以重试一定次数后,不会再进入自旋锁逻辑
runtime_doSpin()
// 自加,表示自旋次数
iter++
// 保存mutex对象即将被设置成的状态
old = m.state
continue
}// END_IF_spin
// 以下代码是不使用**自旋**的情况
new := old
// 不要试图获得饥饿的互斥,新来的goroutines必须排队。
// 对象锁饥饿位被改变,说明处于饥饿模式
// xxxx...x0xx & 0100 = 0xxxx...x0xx
if old&mutexStarving == 0 {
// xxxx...x0xx | 0001 = xxxx...x0x1,标识对象锁被锁住
new |= mutexLocked
}
// xxxx...x1x1 & (0001 | 0100) => xxxx...x1x1 & 0101 != 0;当前mutex处于饥饿模式并且锁已被占用,新加入进来的goroutine放到队列后面
if old&(mutexLocked|mutexStarving) != 0 {
// 更新阻塞goroutine的数量,表示mutex的等待goroutine数目加1
new += 1 << mutexWaiterShift
}
// 当前的goroutine将互斥锁转换为饥饿模式。但是,如果互斥锁当前没有解锁,就不要打开开关,设置mutex状态为饥饿模式。Unlock预期有饥饿的goroutine
if starving &&
// xxxx...xxx1 & 0001 != 0;锁已经被占用
old&mutexLocked != 0 {
// xxxx...xxx | 0101 => xxxx...x1x1,标识对象锁被锁住
new |= mutexStarving
}
// goroutine已经被唤醒,因此需要在两种情况下重设标志
if awoke {
// xxxx...xx1x & 0010 = 0,如果唤醒标志为与awoke不相协调就panic
if new&mutexWoken == 0 {
panic("sync: inconsistent mutex state")
}
// new & (^mutexWoken) => xxxx...xxxx & (^0010) => xxxx...xxxx & 1101 = xxxx...xx0x :设置唤醒状态位0,被唤醒
new &^= mutexWoken
}
// 获取锁成功
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// xxxx...x0x0 & 0101 = 0,已经获取对象锁
if old&(mutexLocked|mutexStarving) == 0 {
// 结束cas
break
}
// 以下的操作都是为了判断是否从饥饿模式中恢复为正常模式
// 判断处于FIFO还是LIFO模式
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo)
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
// xxxx...x1xx & 0100 != 0
if old&mutexStarving != 0 {
// xxxx...xx11 & 0011 != 0
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
panic("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
// 保存mutex对象状态
old = m.state
}
}// cas结束
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
} } 看了Lock()函数之后是不是觉得一片懵逼状态,告诉大家一个方法,看Lock()函数时候需要想着如何Unlock。下面就开始看看Unlock()函数。
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// state-1标识解锁
new := atomic.AddInt32(&m.state, -mutexLocked)
// 验证锁状态是否符合
if (new+mutexLocked)&mutexLocked == 0 {
panic("sync: unlock of unlocked mutex")
}
// xxxx...x0xx & 0100 = 0 ;判断是否处于正常模式
if new&mutexStarving == 0 {
old := new
for {
// 如果没有等待的goroutine或goroutine已经解锁完成
if old>>mutexWaiterShift == 0 ||
// xxxx...x0xx & (0001 | 0010 | 0100) => xxxx...x0xx & 0111 != 0
old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// Grab the right to wake someone.
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false)
return
}
old = m.state
}
} else {
// 饥饿模式:将mutex所有权移交给下一个等待的goroutine
// 注意:mutexlock没有设置,goroutine会在唤醒后设置。
// 但是互斥锁仍然被认为是锁定的,如果互斥对象被设置,所以新来的goroutines不会得到它
runtime_Semrelease(&m.sema, true)
} } 在网上还会有一些基于go1.6的分析,但是与go 1.9的差距有点大。