mutex

CAS原子操作。
需要有一种阻塞和唤醒机制。
尽量减少阻塞和唤醒切换成本。
锁尽量公平,后来者要排队。即使被后来者插队了,也要照顾先来者,不能有“饥饿”现象。
先看3,4点。再看2,1点。最后是源码。

尽量减少阻塞和唤醒切换成本
减少切换成本的方法就是不切换,简单而直接。



不切换的方式就是让竞争者自旋。自旋一会儿,然后抢锁。不成功就再自旋。到达上限次数才阻塞。



自旋就是CPU空转一定的时钟周期



不同平台上自旋所用的指令不一样。例如在amd64平台下,汇编的实现如下



TEXT runtime·procyield(SB),NOSPLIT,$0-0
MOVL cycles+0(FP), AX
again:
// 自旋cycles次,每次自旋执行PAUSE指令
PAUSE
SUBL $1, AX
JNZ again
RET
是否允许自旋的判断是严格的。而且最多自旋四次,每次30个CPU时钟周期。










能不能自旋全由这个条件语句决定if old&(mutexLocked mutexStarving) == mutexLocked && runtime_canSpin(iter)。


翻译下,就是下面的条件都满足,才允许自旋。



锁已被占用,并且锁不处于饥饿模式。



积累的自旋次数小于最大自旋次数(active_spin=4)。



cpu核数大于1。



有空闲的P。



当前goroutine所挂载的P下,本地待运行队列为空。



可以看到自旋要求严格,毕竟在锁竞争激烈时,还无限制地自旋就肯定会影响其他goroutine。



const active_spin = 4
func sync_runtime_canSpin(i int) bool {
// 自旋次数不能大于 active_spin(4) 次
// cpu核数只有一个,不能自旋
// 没有空闲的p了,不能自旋
if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
return false
}
// 当前g绑定的p里面本地待运行队列不为空,不能自旋
if p := getg().m.p.ptr(); !runqempty(p) {
return false
}
return true
}



锁模式介绍
上面的出现了两个常量,mutexStarving和mutexLocked。它们与锁对象结构有关。比较基础,这里介绍一下。



type Mutex struct {
// [阻塞的goroutine个数, starving标识, woken标识, locked标识]
state int32
sema uint32
}
Mutex结构简单的就只有两个成员变量。sema是信号量,下文会介绍到。这里主要介绍state的结构。



一个32位的变量,被划分成上图的样子。右边的标识也有对应的常量



const (
mutexLocked = 1 « iota // mutex is locked
mutexWoken

mutexStarving
mutexWaiterShift = iota
)



含义如下:



mutexLocked对应右边低位第一个bit。值为1,表示锁被占用。值为0,表示锁未被占用。



mutexWoken对应右边低位第二个bit。值为1,表示打上唤醒标记。值为0,表示没有唤醒标记。



mutexStarving对应右边低位第三个bit。值为1,表示锁处于饥饿模式。值为0,表示锁存于正常模式。



mutexWaiterShift是偏移量。它值为3。用法是state»=mutexWaiterShift之后,state的值就表示当前阻塞等待锁的goroutine个数。最多可以阻塞2^29个goroutine。



Mutex锁分为两种模式,正常模式 和 饥饿模式。



正常模式下,对于新来的goroutine而言,它有两种选择,要么抢到了锁,直接执行;要么抢不到锁,追加到阻塞队列尾部,等待被唤醒的。



饥饿模式下,对于新来的goroutine,它只有一个选择,就是追加到阻塞队列尾部,等待被唤醒的。而且在该模式下,所有锁竞争者都不能自旋。



除了这两种模式。还有一个Woken(唤醒标记)。它主要用于自旋状态的通知和锁公平性的保证。分两个角度理解:



一、新的goroutine申请锁时,发现锁被占用了。但自己满足自旋条件,于是自己自旋,并设置上的Woken标记。此时占用锁的goroutine在释放锁时,检查Woken标记,如果被标记。哪怕现在锁上面的阻塞队列不为空,也不做唤醒。直接return,让自旋着的goroutine有更大机会抢到锁。



if old»mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
二、释放锁时,检查Woken标记为空。而阻塞队列里有goroutine需要被唤醒。那么在唤醒时,同时标记锁Woken。这里可能有疑问,原来没有Woken标记,为什么在唤醒一个goroutine要主动标记呢?目的是保证锁公平。



考虑这样的场景:现在阻塞队列里只有一个goroutine。把它唤醒后,还得等调度器运行到它,它自己再去抢锁。但在调度器运行到它之前,很可能新的竞争者参与进来,此时锁被抢走的概率就很大。



这有失公平,被阻塞的goroutine是先到者,新的竞争者是后来者。应该尽量让它们一起竞争。



// 唤醒一个阻塞的goroutine,并把锁的Woken标记设置上
new = (old - 1«mutexWaiterShift) | mutexWoken
设置Woken标记后,state就肯定不为零。此时新来的竞争者,在执行Lock()的fast-path时会失败,接下来就只能乖乖排队了。



func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
// Woken标记设置后,这里的CAS就会为false
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
// …
return
}
// 接下来在阻塞里排队
}
小总结:为了减少切换成本,短暂的自旋等待是简单的方法。而竞争者在自旋时,要主动设置Woken标记。这样释放者才能感知到。



锁尽量公平
为什么不是绝对公平?要绝对公平的粗暴做法就是在锁被占用后,其它所有竞争者,包括新来的,全部排队。



但排队的问题也很明显,排队阻塞唤醒的切换成本(这是损耗性能的潜在的隐患,下面Mutex的问题有举例)。假如临界区代码执行只需要十几个时钟周期时,让竞争者自旋等待一下,立刻就可以获得锁。减少不必要的切换成本,效率更高。



尽量公平的结果就是阻塞的竞争者被唤醒后,也要与(正在自旋的)新竞争者抢夺锁资源。



go使用三种手段保证Mutex锁尽量公平:



上面介绍的,在锁释放时,主动设置Woken标记,防止新的竞争者轻易抢到锁。



竞争者进阻塞队列策略不一样。新的竞争者,抢不到锁,就排在队列尾部。先来竞争者,从队列中被唤醒后,还是抢不到锁,就放在队列头部。



任何竞争者,被阻塞等待的时间超过指定阀值(1ms)。锁就转为饥饿模式。这时锁释放时会唤醒它们,手递手式把锁资源给它们。别的竞争者(包括新来的)都抢不到。直接把饥饿问题解决掉。



饥饿问题是会积压的。要尽快解决。举个例子解释一下:



蓝色是新竞争者,红色是阻塞等待时间超过阀值的竞争者。每次持锁时间是0.3ms。



只要有竞争者阻塞超时了,锁就会转换为饥饿模式。饥饿模式下,所有的新竞争者都得排队。



时刻4中的G3就是被积压的。如果时刻0中的竞争者更多时,并且抢锁顺序不变。那么时刻4的积压就更严重。



同时反映出一个问题。



Mutex带来的问题
假设在业务某个场景中,对每个请求都需要访问某互斥资源。使用Mutex锁时,如果QPS很高,阻塞队列肯定会很满。虽然QPS可能会降,但请求是持续的。



新来的请求,在访问互斥资源时有可能抢锁成功,后来者胜于先到者。这种情况持续发生的话,就会导致阻塞队列中所有的请求得不到处理,耗时增高,直至超出上游设置的超时时间,一下子失败率突增,上游再影响它的上游,引起连锁反应进而服务故障异常。



解决方案要根据实际业务场景来优化。削减锁的粒度;或者使用CAS的方式进队列,然后阻塞在通道上;或者使用无锁结构等待。



阻塞在通道而不是阻塞的锁上,是因为go的runtime对待锁唤醒和通道唤醒goroutine的效率是不一样的。这也引出了还有一种方案是改runtime,让锁唤醒的goroutine更快地得到执行。毕竟上面问题点是被唤醒的goroutine和新的goroutine在竞争中不能保证稳胜,被唤醒的goroutine会有一个调度耗时,减少耗时就有可能提高竞争成功率。



阻塞和唤醒机制
go的阻塞和唤醒是semacquire和semrelease。虽然命名上是sema,但实际用途却是一套阻塞唤醒机制。



// That is, don’t think of these as semaphores.
// Think of them as a way to implement sleep and wakeup



阻塞和唤醒机制



go的runtime有一个全局变量semtable,它放置了所有的信号量。



var semtable [semTabSize]struct {
root semaRoot
pad [sys.CacheLineSize - unsafe.Sizeof(semaRoot{})]byte
}



func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags)
func semrelease1(addr *uint32, handoff bool)
每个信号量都由一个变量地址指定。Mutex就是用成员sema的地址。



在阻塞时,调用semacquire1,把地址(addr)传给它。



如果addr大于1,并且通过CAS减一成功,那就说明获取信号量成功。不用阻塞。



否则,semacquire1会在semtable数组中找一个元素和它对应上。每个元素都有一个root,这个root是Treap树(ACM同学应该熟悉)。



最后addr变成一个树节点,这个树节点,有自己的一个队列,专门放被阻塞的goroutine。叫它阻塞队列吧。



这个阻塞队列是个双端队列,头尾都可以进。



semacquire1把当前goroutine相关元数据放进阻塞队列之后,就挂起了。



semrelease1是给addr CAS加一。
如果坚持发现当前addr上有阻塞的goroutine时,就取一个出来,唤醒它,让它自己再去semacquire1。这是handoff为false的情况。
但handoff为true的话,就尝试手递手地把信号量送给这个goroutine。等于说goroutine不用再自己去抢了,因为自己再去抢有可能抢不到。
最后semrelease1会把取出来的这个goroutine挂在当前P的本地待运行队列尾部,等待调度执行。
就是这样,在获取不到Mutex锁时,通过信号量来阻塞和唤醒goroutine。



CAS原子操作
CAS就是基本的原子操作。没什么好说的。



例如在amd64上,go的汇编实现:



TEXT ·CompareAndSwapUint32(SB),NOSPLIT,$0-17
MOVV addr+0(FP), R1
MOVW old+8(FP), R2
MOVW new+12(FP), R5
SYNC
cas_again:
MOVV R5, R3
LL (R1), R4
BNE R2, R4, cas_fail
SC R3, (R1)
BEQ R3, cas_again
MOVV $1, R1
MOVB R1, swapped+16(FP)
SYNC
RET
cas_fail:
MOVV $0, R1
JMP -4(PC)



源码
type Mutex struct {
// [阻塞的goroutine个数, starving标识, woken标识, locked标识]
// [0~28, 1, 1, 1]
state int32
sema uint32
}



const (
mutexLocked = 1 « iota // mutex is locked
mutexWoken // 唤醒标记
mutexStarving // 饥饿模式
mutexWaiterShift = iota // 位移数



starvationThresholdNs = 1e6 // 阻塞时间阀值1ms
)



func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
// 尝试CAS上锁
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
// 上锁成功,直接返回
return
}



var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
for {



// 进入到这个循环的,有两种角色goroutine
// 一种是新来的goroutine。另一种是被唤醒的goroutine。所以它们可能在这个地方再一起竞争锁
// 如果新来的goroutine抢成功了,那另一个只能再阻塞着等待。但超过1ms后,锁会转换成饥饿模式
// 在这个模式下,所有新来的goroutine必须排在队伍的后面。没有抢锁资格

// 饥饿模式下,不能自旋
// 锁被占用了,不能自旋
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// woken位没有被设置;被阻塞等待goroutine的个数大于0
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
// 可以自旋了,那就设置上woken位,在unlock时,如果发现有别的goroutine在自旋,就立即返回,有被阻塞的goroutine也不唤醒了
awoke = true
}
// runtime_doSpin -> sync_runtime_doSpin
// 每次自旋30个时钟周期,最多120个周期
runtime_doSpin()
iter++
old = m.state
continue
}

// 自旋完了还是等不到锁 或 可以上锁

new := old
// 饥饿模式下的锁不抢
if old&mutexStarving == 0 {
// 非饥饿模式下,可以抢锁
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
// 已经被上锁了,或锁处于饥饿模式下,就阻塞当前的goroutine
new += 1 << mutexWaiterShift
}
if starving && old&mutexLocked != 0 {
// 当前的goroutine已经被饿着了,所以要把锁设置为饥饿模式
new |= mutexStarving
}
if awoke {
// 当前的goroutine有自旋过,但现在已经自旋结束了。所以要取消woken模式
if new&mutexWoken == 0 {
panic("sync: inconsistent mutex state")
}
// 取消woken标志
new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
// 成功上锁
break // locked the mutex with CAS
}

// 主要是为了和第一次调用的Lock的g划分不同的优先级
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 使用信号量阻塞当前的g
// 如果当前g已经阻塞等待过一次了,queueLifo被赋值true
runtime_SemacquireMutex(&m.sema, queueLifo)
// 判断当前g是否被饿着了
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
// 饥饿模式下,被手递手喂信号量唤醒的
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
panic("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift) // -7(111)
if !starving || old>>mutexWaiterShift == 1 {
// 退出饥饿模式
// 饥饿模式会影响自旋
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
// 不是手递手的信号量,那就自己继续竞争锁
// 必须设置为true,这样新一轮的CAS之前,就可以取消woken模式。
// 因为通过信号量释放锁时,为了保持公平性,会同时设置woken模式。
awoke = true
iter = 0
} else {
old = m.state
} }


if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}



func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}



// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
if (new+mutexLocked)&mutexLocked == 0 {
// 不能多次执行unclock()
panic(“sync: unlock of unlocked mutex”)
}
if new&mutexStarving == 0 {
// 非饥饿模式
old := new
for {
// 没有被阻塞的goroutine。直接返回
// 有阻塞的goroutine,但处于woken模式,直接返回
// 有阻塞的goroutine,但被上锁了。可能发生在此for循环内,第一次CAS不成功。因为CAS前可能被新的goroutine抢到锁。直接返回
// 有阻塞的goroutine,但锁处于饥饿模式。可能发生在被阻塞的goroutine不是被唤醒调度的,而是被正常调度运行的。直接返回
if old»mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}



  // 有阻塞的goroutine,唤醒一个或变为没有阻塞的goroutine了就退出
// 这个被唤醒的goroutine还需要跟新来的goroutine竞争
// 如果只剩最后一个被阻塞的goroutine。唤醒它之后,state就变成0。
// 如果此刻来一个新的goroutine抢锁,它有可能在goroutine被重新调度之前抢锁成功。
// 这样就失去公平性了,不能让它那么干,所以这里也要设置为woken模式。
// 因为Lock方法开始的fast path,CAS操作的old值是0。这里设置woken模式成功后,后来者就只能乖乖排队。保持了锁的公平性
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false)
return
}
old = m.state
} } else {
// 饥饿模式
// 手递手唤醒一个goroutine
runtime_Semrelease(&m.sema, true) } }


golang 1.10 mutex互斥锁源码
2018年05月13日 18:10:54 tydhot 阅读数 369
Mutex锁分为normal模式和starvation模式。一开始默认处于normal模式。在normal模式中,每个新加入竞争锁行列的协程都会直接参与到锁的竞争当中来,而处于starvation模式时,所有所有新进入的协程都会直接被放入等待队列中挂起,直到其所在队列之前的协程全部执行完毕。



在normal模式中协程的挂起等待时间如果大于某个值,就会进入starvation模式。



type Mutex struct {
state int32
sema uint32
}
其中,state用来保存mutex的状态量,低一位表示是否上锁,低二位表示当前锁对象是否被唤醒,低三位表示该锁是否处于staration状态,而后几位表示当前正被该锁阻塞的协程数。而sema则是作为信号量来作为阻塞的依据。



Lock()方法进行加锁。



func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}



var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
for {
// Don’t spin in starvation mode, ownership is handed off to waiters
// so we won’t be able to acquire the mutex anyway.
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// Active spinning makes sense.
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked goroutines.
if !awoke && old&mutexWoken == 0 && old»mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
new := old
// Don’t try to acquire starving mutex, new arriving goroutines must queue.
if old&mutexStarving == 0 {
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 « mutexWaiterShift
}
// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don’t do the switch.
// Unlock expects that starving mutex has waiters, which will not
// be true in this case.
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
if new&mutexWoken == 0 {
throw(“sync: inconsistent mutex state”)
}
new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// If we were already waiting before, queue at the front of the queue.
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo)
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
// If this goroutine was woken and mutex is in starvation mode,
// ownership was handed off to us but mutex is in somewhat
// inconsistent state: mutexLocked is not set and we are still
// accounted as waiter. Fix that.
if old&(mutexLocked|mutexWoken) != 0 || old»mutexWaiterShift == 0 {
throw(“sync: inconsistent mutex state”)
}
delta := int32(mutexLocked - 1«mutexWaiterShift)
if !starving || old»mutexWaiterShift == 1 {
// Exit starvation mode.
// Critical to do it here and consider wait time.
// Starvation mode is so inefficient, that two goroutines
// can go lock-step infinitely once they switch mutex
// to starvation mode.
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}



if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
一开始会直接通过cas将原本值为0(也就是当前没任何协程占用锁)的state赋为1,表示这个锁已经有人加锁。如果成功,表示这是当前锁第一次加锁并且加锁成功,那么可以直接返回。



如果之前加锁失败,也就是刚刚的cas操作失败,那么说明就需要等待锁的释放,首先判断是否已经加锁并处于normal模式,将原先锁的state与1和4相或的结果相与,如果与1相等,则说明此时处于normal模式并且已经加锁,而后判断当前协程是否可以自旋。如果可以自旋,则通过右移三位判断是否还有协程正在等待这个锁,如果有,并通过低2位判断是否该所处于被唤醒状态,如果并没有,则将其状态量设为被唤醒的状态,之后进行自旋,直到该协程自旋数量达到上限,或者当前锁被解锁,或者当前锁已经处于starvation模式。



if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// Active spinning makes sense.
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked goroutines.
if !awoke && old&mutexWoken == 0 && old»mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
在超过自旋数量上限或者当前锁已经解锁或者当前锁已经处于starvation模式,那么就在循环中进入下面的部分。



new := old
// Don’t try to acquire starving mutex, new arriving goroutines must queue.
if old&mutexStarving == 0 {
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 « mutexWaiterShift
}
// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don’t do the switch.
// Unlock expects that starving mutex has waiters, which will not
// be true in this case.
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
if new&mutexWoken == 0 {
throw(“sync: inconsistent mutex state”)
}
new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// If we were already waiting before, queue at the front of the queue.
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo)
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
// If this goroutine was woken and mutex is in starvation mode,
// ownership was handed off to us but mutex is in somewhat
// inconsistent state: mutexLocked is not set and we are still
// accounted as waiter. Fix that.
if old&(mutexLocked|mutexWoken) != 0 || old»mutexWaiterShift == 0 {
throw(“sync: inconsistent mutex state”)
}
delta := int32(mutexLocked - 1«mutexWaiterShift)
if !starving || old»mutexWaiterShift == 1 {
// Exit starvation mode.
// Critical to do it here and consider wait time.
// Starvation mode is so inefficient, that two goroutines
// can go lock-step infinitely once they switch mutex
// to starvation mode.
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}



首先,如果此时还是由于别的协程的占用无法获得锁或者处于starvation模式,都在其state加8表示有新的协程正在处于等待状态。并且如果之前由于自旋而将该锁唤醒,那么此时将其低二位的状态量赋值为0。之后判断starving是否为true,如果为true说明在上一次的循环中,锁需要被定义为starvation模式,那么在这里就将相应的状态量低三位设置为1表示进入starvation模式。



之后尝试通过cas将新的state状态量赋值给state,如果失败,则重新获得其 state在下一步循环重新重复上述的操作。如果成功,首先判断已经阻塞时间,如果为零,则从现在开始记录。



之后通过runtime_SemacquireMutex()通过信号量将当前协程阻塞。



上述runtime_SemacquireMutex()方法的具体实现在了sema.go中。



func sync_runtime_SemacquireMutex(addr *uint32, lifo bool) {
semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile)
}



func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags) {
gp := getg()
if gp != gp.m.curg {
throw(“semacquire not on the G stack”)
}



// Easy case.
if cansemacquire(addr) {
return
}



// Harder case:
// increment waiter count
// try cansemacquire one more time, return if succeeded
// enqueue itself as a waiter
// sleep
// (waiter descriptor is dequeued by signaler)
s := acquireSudog()
root := semroot(addr)
t0 := int64(0)
s.releasetime = 0
s.acquiretime = 0
s.ticket = 0
if profile&semaBlockProfile != 0 && blockprofilerate > 0 {
t0 = cputicks()
s.releasetime = -1
}
if profile&semaMutexProfile != 0 && mutexprofilerate > 0 {
if t0 == 0 {
t0 = cputicks()
}
s.acquiretime = t0
}
for {
lock(&root.lock)
// Add ourselves to nwait to disable “easy case” in semrelease.
atomic.Xadd(&root.nwait, 1)
// Check cansemacquire to avoid missed wakeup.
if cansemacquire(addr) {
atomic.Xadd(&root.nwait, -1)
unlock(&root.lock)
break
}
// Any semrelease after the cansemacquire knows we’re waiting
// (we set nwait above), so go to sleep.
root.queue(addr, s, lifo)
goparkunlock(&root.lock, “semacquire”, traceEvGoBlockSync, 4)
if s.ticket != 0 || cansemacquire(addr) {
break
}
}
if s.releasetime > 0 {
blockevent(s.releasetime-t0, 3)
}
releaseSudog(s)
}



首先,在上述的方法中,首先通过semroot()方法根据传入的地址获得semRoot,其具体操作如下。



func semroot(addr *uint32) *semaRoot {
return &semtable[(uintptr(unsafe.Pointer(addr))»3)%semTabSize].root
}



将传入的Mutex的信号量sema的地址右移三位并与251取余,得到的新地址来得到semRoot,做到将semRoot通过信号量sema来与相应的Mutex绑定的目的。



semRoot的结构如下。



type semaRoot struct {
lock mutex
treap *sudog // root of balanced tree of unique waiters.
nwait uint32 // Number of waiters. Read w/o the lock.
}



其中的mutex与之前的Mutex无关,只是一个简单的uintptr来简单的实现并发的线程安全的功能。Treap则是其中平衡二叉树的根节点,nwait则表示证在平衡二叉树阻塞的协程数量。



此时,会对信号量sema的值进行判断,如果为0,则继续,否则尝试减1并返回。



而后通过semRoot中的mutex进行加锁,这里的锁实现很简单,简单来说实则只是对互斥信号量的cas操作。



之后给semRoot的nwait加一,表示新的协程进入等待。



之后通过queue()方法正式将目标协程放入平衡二叉树中等待。



对于这个节点,首先设置该节点中保存的协程为当前协程,并保存当前信号量地址。



首先,如果是第一次根据新的信号量而要加入的节点,那么会直接加入到平衡二叉树中,这颗二叉树中节点的位置通过信号量的地址作为排序的依据,然后插入。



s.ticket = fastrand() | 1
s.parent = last
*pt = s



// Rotate up into tree according to ticket (priority).
for s.parent != nil && s.parent.ticket > s.ticket {
if s.parent.prev == s {
root.rotateRight(s.parent)
} else {
if s.parent.next != s {
panic(“semaRoot queue”)
}
root.rotateLeft(s.parent)
}
}
如果不是第一次的插入,那么首先根据信号量的地址从平衡二叉树根节点开始寻找对应的信号量地址所绑定的节点,通过大小确定寻找的左儿子节点或者右儿子节点,直到找到。



找到之后,之前在将协程准备阻塞之前会判断以等待时间,如果不为0,说明该协程已经进入过该平衡二叉树。那么将新生成的节点取代原本节点在平衡二叉树的位置,并将老节点放置在该信号量绑定节点的等待队列的头部。如果是第一次,那么只需要将新的节点放在等待队列的末尾。



var last *sudog
pt := &root.treap
for t := *pt; t != nil; t = *pt {
if t.elem == unsafe.Pointer(addr) {
// Already have addr in list.
if lifo {
// Substitute s in t’s place in treap.
*pt = s
s.ticket = t.ticket
s.acquiretime = t.acquiretime
s.parent = t.parent
s.prev = t.prev
s.next = t.next
if s.prev != nil {
s.prev.parent = s
}
if s.next != nil {
s.next.parent = s
}
// Add t first in s’s wait list.
s.waitlink = t
s.waittail = t.waittail
if s.waittail == nil {
s.waittail = t
}
t.parent = nil
t.prev = nil
t.next = nil
t.waittail = nil
} else {
// Add s to end of t’s wait list.
if t.waittail == nil {
t.waitlink = s
} else {
t.waittail.waitlink = s
}
t.waittail = s
s.waitlink = nil
}
return
}
last = t
if uintptr(unsafe.Pointer(addr)) < uintptr(t.elem) {
pt = &t.prev
} else {
pt = &t.next
}
}



将当次阻塞加入平衡二叉树中队列之后,就可以先将semRoot中的mutex解锁,并将当前协程挂起。



回到Mutex的Lock()中,当之前调用方法将协程挂起后,如果协程被唤醒,那么就会继续下面的流程。



starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
// If this goroutine was woken and mutex is in starvation mode,
// ownership was handed off to us but mutex is in somewhat
// inconsistent state: mutexLocked is not set and we are still
// accounted as waiter. Fix that.
if old&(mutexLocked|mutexWoken) != 0 || old»mutexWaiterShift == 0 {
throw(“sync: inconsistent mutex state”)
}
delta := int32(mutexLocked - 1«mutexWaiterShift)
if !starving || old»mutexWaiterShift == 1 {
// Exit starvation mode.
// Critical to do it here and consider wait time.
// Starvation mode is so inefficient, that two goroutines
// can go lock-step infinitely once they switch mutex
// to starvation mode.
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}



如果这里协程阻塞而挂起的时间超过了默认值,那么就会将starve设置为true,就会在下一次的循环中将该锁这是为starvation模式。如果已经是这个模式,那么就会将状态量的等待数减1,并判断当前如果已经没有等待的协程,就没有必要继续维持starvation模式,同时也没必要继续执行该循环(当前只有一个协程在占用锁)。



解锁通过Unlock()方法。



func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}



// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
if (new+mutexLocked)&mutexLocked == 0 {
throw(“sync: unlock of unlocked mutex”)
}
if new&mutexStarving == 0 {
old := new
for {
// If there are no waiters or a goroutine has already
// been woken or grabbed the lock, no need to wake anyone.
// In starvation mode ownership is directly handed off from unlocking
// goroutine to the next waiter. We are not part of this chain,
// since we did not observe mutexStarving when we unlocked the mutex above.
// So get off the way.
if old»mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// Grab the right to wake someone.
new = (old - 1«mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false)
return
}
old = m.state
}
} else {
// Starving mode: handoff mutex ownership to the next waiter.
// Note: mutexLocked is not set, the waiter will set it after wakeup.
// But mutex is still considered locked if mutexStarving is set,
// so new coming goroutines won’t acquire it.
runtime_Semrelease(&m.sema, true)
}
}



解锁首先直接将第一位状态量变为0,表示已经解锁。然后根据模式,如果处于normal模式,根据状态量当前是否有协程等待,或者已经有协程已经在自旋等待锁,那么就可以直接结束。否则,就通过runtime_Semrelease()方法尝试唤醒挂起的协程。在runtime_Semrelease()中与之前对应,通过dequeue()方法将寻找到的二叉树节点,也就是循环队列的头部取出,节点中保存的协程作为要唤醒的协程。但是,这里唤醒的携程并不一定会立即获取锁,锁的获取仍旧需要竞争。



而如果处于starvation模式,那么会直接通过runtime_Semrelease()方法尝试唤醒挂起的协程,这里唤醒的协程必定持有锁。



golang的版本是1.12,其中的mutex是增加了普通模式和饥饿模式切换的优化版本,为了便于理解,这里先从上一个版本1.7版本的mutex开始分析,以后再对优化版本进行说明。



Mutex结构说明
定义
最初版本锁的定义如下:



// mutex是互斥锁
// mutex的零值是没有加锁的
//在使用之后不能被拷贝
type Mutex struct {
state int32 //状态标识
sema uint32 //信号量
}



const (
mutexLocked = 1 « iota // mutex is locked
mutexWoken
mutexWaiterShift = iota
)
1
2
3
4
5
6
7
8
9
10
11
12
13
其中state是记录用来记录加锁状态的,将一个整型按位划分来表示不同的含义,从低到高分别为第1位到第32位,



第1位表示是否被锁住,即0表示没有锁住,mutexLocked也就是1表示已经被锁住。
第2表示是否被唤醒,1表示被唤醒,mutexWoken=2表示被唤醒。
第3位到第32位表示等待在mutex上协程数量,mutexWaiterShift=3表示在获取等待协程数量,需要将state右移位3位。
其中sema是信号量,是一个非负数的全局变量,下面对信号量进行简单说明。



信号量
信号量是进程间通信处理同步互斥的机制,通过一个计数器来控制对共享资源的访问次数限制。例如一个办公室有两台打印机,有几十台电脑连上,这是同时只能允许两个电脑进行打印,而其他电脑必须排队等待完成后才能打印。



sema就是信号量,是一个非负数的全局变量,该变量有两个操作P和V,PV操作都是不可中断的。



P(S):
(1)执行S=S-1;
(2)进行以下判断:



如果S < 0,进入阻塞队列,直到满足S>=0
如果S >= 0, 直接返回
因此P操作执行一次意味着分配一个资源,如上打印机意味着是资源,当S小于0意味着没有可用资源了,只能一直等待,直到资源空闲出来时才能继续。
V(S):
(1)执行S=S+1;
(2)进行以下判断:



如果S > 0,直接返回
如果S <= 0, 释放阻塞队列中的第一个等待进程
因此V操作执行一次意味着释放一个资源,当S小于等于0时,意味着还有进程在请求资源,此时释放了一个资源,就需要从等待队列中拿出一个进程来使用此刻释放的资源。
golang中信号量操作
runtime_Semacquire
func runtime_Semacquire(s uint32),P操作,等待s大于等于0,源码在runtime/sema.go中



runtime_Semrelease
func runtime_Semrelease, V操作,阻塞等待被唤醒,目前版本在runtime/sema.go中(定义稍有不同了)。



如果直接用信号量来实现互斥,即新建一个sema=1,然后用PV操作runtime_Semacquire和runtime_Semrelease来实现,也可以做到当一次请求时,拿到资源进行执行,后续请求阻塞,进入等待队列,不考虑性能,按照这样简单的思路实现如下:



为何用信号量实现互斥锁



type Mutex struct {
sema uint32
}



func NewMutex() *Mutex {
var mu Mutex
mu.sema = 1
return &mu
}
func (m *Mutex) Lock() {
runtime_Semacquire(&m.sema)
}



func (m *Mutex2) Unlock() {
runtime_Semrelease(&m.sema)
}



这里会有一点问题是,当加锁一次,代码中解锁了两次, 会导致sema值变化而不提示任何错误,即这时sema=2,资源数量发生了变化,导致后续运行异常,所以多次解锁时需要返回异常。这里,通过多一个变量来表示加锁次数,改进代码如下:



type Mutex struct {
key int32
sema uint32
}



func (m *Mutex) Lock() {
if atomic.AddInt32(&m.key, 1) == 1 {
// changed from 0 to 1; we hold lock
return
}
runtime_Semacquire(&m.sema)
}



func (m *Mutex) Unlock() {
switch v := atomic.AddInt32(&m.key, -1); {
case v == 0:
// changed from 1 to 0; no contention
return
case v == -1:
// changed from 0 to -1: wasn’t locked
// (or there are 4 billion goroutines waiting)
panic(“sync: unlock of unlocked mutex”)
}
runtime_Semrelease(&m.sema)
}
这个解决方案除了解决了我们前面说的重复加锁的问题外,还对我们初始化工作做了简化,不需要构造函数了。执行过程中值变化如下:



初始:key=0, sema = 0
Lock第一次:key+1=1返回,sema=0,即第一次不进行P操作,直接将key加1表示获取了锁。
Lock第二次:key=2,进行P操作,发现sema-1 =-1<0,阻塞等待获取锁。
当执行了一次Lock后,key=1,sema=0,执行以下操作时:



Unlock第一次:key-1=0返回,sema=0,第一次解锁不执行V操作,直接key减1表示释放锁。
Unlock第二次:key-1=-1,表示解锁过了,返回异常。
当执行了两次Lock后,key=2,sema=-1,执行以下操作时:



Unlock第一次:key-1=1,执行V操作runtime_Semrelease,发现sema+1=0,会阻塞直到唤醒了其他协程,然后返回。
简单来说,增加一个key变量后,sema=0表示有一个资源,跟只用信号量时sema=1含义一样,在golang mutex也是基于此实现的



Mutex操作解读
Lock
最初版本的mutex lock如下:



func (m *Mutex) Lock() {
—————–代码块1 start—————–
// Fast path: grab unlocked mutex.

if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
—————–代码块1 end—————–
awoke := false
iter := 0
for {
—————–代码块2 start—————–

old := m.state
new := old | mutexLocked
if old&mutexLocked != 0 {
if runtime_canSpin(iter) {
// Active spinning makes sense.
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked goroutines.
if !awoke && old&mutexWoken == 0 && old»mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
continue
}
new = old + 1«mutexWaiterShift
}
—————–代码块2 end—————–
—————–代码块3 start—————–
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
if new&mutexWoken == 0 {
panic(“sync: inconsistent mutex state”)
}
new &^= mutexWoken
}
—————–代码块3 end—————–
—————–代码块4 start—————–
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&mutexLocked == 0 {
break
}
runtime_Semacquire(&m.sema)
awoke = true
iter = 0
}
—————–代码块4 start—————–
}



if race.Enabled {
race.Acquire(unsafe.Pointer(m))
} }


将上面代码标注为4块,下面依次进行分析。对代码逻辑进行详细分析之前,先介绍下其中用到部分函数。



race.Acquire



    if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}


竞争检测逻辑,。go中使用goroutine比较常见,在大型项目中可能会在多个goroutine中用到某个全局变量,如果有竞争就需要加锁操作。go提供了race检测工具,可以使用go run -race 或者 go build -race来进行竞争检测。



runtime_canSpin
判断是否需要自选,golang中自旋锁并不会一直自旋下去,在runtime包中runtime_canSpin方法做了一些限制, 传递过来的iter大等于4或者cpu核数小等于1,最大逻辑处理器大于1,至少有个本地的P队列,并且本地的P队列可运行G队列为空才会进行自旋。



//go:linkname sync_runtime_canSpin sync.runtime_canSpin
func sync_runtime_canSpin(i int) bool {
if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
return false
}
if p := getg().m.p.ptr(); !runqempty(p) {
return false
}
return true
}
runtime_doSpin
进行自旋操作,会调用procyield函数,该函数也是汇编语言实现。函数内部循环调用PAUSE指令。PAUSE指令什么都不做,但是会消耗CPU时间,在执行PAUSE指令时,CPU不会对它做不必要的优化。



//go:linkname sync_runtime_doSpin sync.runtime_doSpin
func sync_runtime_doSpin() {
procyield(active_spin_cnt)
}
代码块1
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
如果state=0,即没有锁住、没有唤醒且没有等待队列,可直接拿到锁,将状态置为锁住并返回,这相当于是上面demo版中从key=0,sema=0的状态,变为key=1,seme=0的状态。



代码块2
//最新状态
old := m.state

new := old | mutexLocked
//已经被锁住
if old&mutexLocked != 0 {
//判断是否需要自选,这是在for循环中,iter次数可能已经超过不需要自旋了,或者其他条件
if runtime_canSpin(iter) {
// 主动自旋是有意义的,因为会尝试唤醒锁,
//这样上个协程此时unlock的话,就不会唤醒其他协程
if !awoke && old&mutexWoken == 0 && old»mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
//自己没有唤醒,且原状态没有唤醒,
//且有协程在排队且设置唤醒标识成功,
//说明上个的协程此时unlock了,
awoke = true
}
//自旋一段时间
runtime_doSpin()
iter++
continue
}
//不需要自旋,将state的等待队列数据加1
new = old + 1«mutexWaiterShift
}
代码块3
if awoke {
//代码块1中将awoke置为1了,标识被唤醒
//代码块1中只有设置了唤醒标识,awoke才会为true,因此不会new&mutexWoken == 0
if new&mutexWoken == 0 {
panic(“sync: inconsistent mutex state”)
}
//既然当前协程被唤醒了,需要将state置为未唤醒
new &^= mutexWoken
}
代码块4
//这里new有四种值
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&mutexLocked == 0 {
//没有锁住,直接返回
break
}
//当前锁住的,阻塞在此处等待,会让出cpu
runtime_Semacquire(&m.sema)
//从阻塞中返回,设置当前协程被唤醒了
awoke = true
iter = 0
}
这里new可能有四种值:



new := old | mutexLocked
new = old + 1«mutexWaiterShift
new := old | mutexLocked , new &^= mutexWoken
new = old + 1«mutexWaiterShift, new &^= mutexWoken
情况1
new := old | mutexLocked ,协程在开始自旋前或者自旋过程中,原协程已经unlock了,会出线这种情况。假设原协程为a,当前协程为b,执行如下:
a.Lock()
b.Lock()
此时b中lock可能逻辑为:



//state已经被锁住
—————–代码块1 start—————–

—————–代码块1 end—————–
//这里a执行 a.Unlock()将state设置为未锁住状态
awoke := false
for {
old := m.state
new := old | mutexLocked
if old&mutexLocked != 0 {
//不执行此块
}
if awoke {
//不执行此块
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&mutexLocked == 0 {
//执行这里
break
}
//不执行以下几句
runtime_Semacquire(&m.sema)
awoke = true
iter = 0
}
}
或者自旋未结束前a.Unlock(),这时new = old + 1«mutexWaiterShift不会被执行,



//state已经被锁住
—————–代码块1 start—————–

—————–代码块1 end—————–
awoke := false
for {
old := m.state
new := old | mutexLocked
if old&mutexLocked != 0 {
if runtime_canSpin(iter) {
//执行此块,iter还小于active_spin次,回到for开始
//在iter小于active_spin之前,a执行了unlock,此时b会执行到代码块4
if !awoke && old&mutexWoken == 0 && old»mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
//先假设这里未执行,如果执行了,则是情况3的赋值逻辑了
awoke = true
}
continue
}
//不执行这句
new = old + 1«mutexWaiterShift
}
if awoke {
//不执行此块
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&mutexLocked == 0 {
//执行这里
break
}
//不执行以下几句
runtime_Semacquire(&m.sema)
awoke = true
iter = 0
}
}
情况2
new = old + 1«mutexWaiterShift,这是执行完自旋流程,或者不需要执行自旋的情况,即代码运行到代码块2的new = old + 1«mutexWaiterShift,



//state已经被锁住
—————–代码块1 start—————–

—————–代码块1 end—————–
awoke := false
for {
old := m.state
new := old | mutexLocked
if old&mutexLocked != 0 {
if runtime_canSpin(iter) {
//执行此块,iter还小于active_spin次,回到for开始
//在iter小于active_spin之前,a执行了unlock,此时b会执行到代码块4
if !awoke && old&mutexWoken == 0 && old»mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
//先假设这里未执行,如果执行了,则是情况4的赋值逻辑了
awoke = true
}
continue
}
//执行这句
new = old + 1«mutexWaiterShift
}
if awoke {
//不执行此块
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&mutexLocked == 0 {
//不执行这里
break
}
//执行这里阻塞
runtime_Semacquire(&m.sema)
awoke = true
iter = 0
}
}
情况3和情况4
这两种是情况1和2中,执行到下面语句的情况:



if !awoke && old&mutexWoken == 0 && old»mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
此时因为awoke=true,也会执行:



if awoke {
if new&mutexWoken == 0 {
panic(“sync: inconsistent mutex state”)
}
new &^= mutexWoken
}
这种情况是在自旋过程中,设置唤醒标识成功,即本协程可以拿到锁,因此需要将唤醒标识置为0,防止其他协程获取。



Unlock
源码如下:



func (m *Mutex) Unlock() {
//race检测
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}



// 判断是否多次解锁,多次解锁则抛出异常
new := atomic.AddInt32(&m.state, -mutexLocked)
if (new+mutexLocked)&mutexLocked == 0 {
panic("sync: unlock of unlocked mutex")
}

old := new
for {
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 {
return
}
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema)
return
}
old = m.state
} } for循环中,因为old在更新,第一个if语句会在以下两种情况时返回:


当前锁上没有协程在等待
当前锁已经被其他协程lock了或者唤醒了
第二个if语句,cas原子操作将等待协程数目减1,并设置唤醒标识,阻塞在runtime_Semrelease处,直到有其他协程被唤醒才返回。
看到这里,就可以知道,唤醒操作有两种:
(1)lock函数,执行自旋过程中主动唤醒自己,会执行到awoke = true相关代码;
(2)unlock函数,原协程设置唤醒标识,本协程被动唤醒,不会执行awoke = true相关代码。



从执行来看状态变化
假如依次执行:



Mutex mutex
mutex.Lock() // a协程
mutex.Lock() // b协程
mutex.Lock() // c协程
mutex.Unlock() // a协程
mutex.Unlock() // b协程
mutex.Unlock() // c协程
定义lockflag表示加锁位,wokenflag表示唤醒位,waitcount表示等待队列个数。



原始:sema = 0, lockflag = 0, waitcount = 0,wokenflag = 0
a中lock后:sema = 0, lockflag = 1, waitcount = 0,wokenflag = 0
b中lock后:sema = -1, lockflag = 1, waitcount = 1,wokenflag = 0
c中lock后:sema = -2, lockflag = 1, waitcount = 2,wokenflag中间设置为1,后面修改为0
a中unlock后:sema = -1, lockflag = 1, waitcount = 1,wokenflag 中间设置为1,后面被其他协程(这里为b)修改为0。
b中unlock后:sema = 0, lockflag = 1, waitcount = 0,wokenflag 中间设置为1,后面被其他协程(这里为a)修改为0。
a中unlock后:sema = 0, lockflag = 0, waitcount = 0,wokenflag = 0。
这里考虑的是比较简单的情况,不过对于理解代码逻辑已足够。



Category golang