在某个数据需要被多个线程共享访问的时候,会出现读者-写者问题(这里的「问题」是复数形式的,因为读者-写者问题有多个变种)。访问共享数据的线程有两种类型:读者和写者。读者只会读取数据,而写者则是修改它。当写者拥有了访问数据的权限后,其它的线程(不管是读者还是写者)都不能访问这个数据。这种约束的需求在现实中是存在的,比如:当写者不能原子性地修改某个数据(例如数据库)时,在修改完成之前,要读取这个数据的读者要被阻塞,以免读者获取到损坏的数据(脏数据)。
代码实现
注意作者编写本文时分析的代码版本(718d6c58)与最新的版本相比可能会有差异。
RWMutex 为读者暴露了两个方法(RLock 和 RUnlock),同时专门为写者也暴露了两个方法( Lock 和 Unlock )。
RLock
为了代码简洁起见,我们跳过那些跟竞态检测器相关的代码(用 … 表示)。
func (rw *RWMutex) RLock() {
…
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
runtime_SemacquireMutex(&rw.readerSem, false)
}
…
}
readerCount 字段的类型是 int32 ,它表示当前启用的读者——包括了所有正在临界区里面的读者或者被写者阻塞的等待进入临界区读者的数量。相当于是当前调用了 RLock 函数并且还没调用 RUnLock 函数的读者的数量。
atomic.AddInt32 是下面代码的原子版本:
*addr += delta
return *addr
其中 addr 是 *int32 类型的而 delta 是 int32 类型的。由于这是个原子性的操作,所以增加 delta 不会有干扰到其它线程的风险。(更多关于 Fetch-and-add 的资料 详见这里)
如果我们完全没有用到写者的话,readerCount 会一直大于或等于 0 (译注:后面会讲到,一旦有写者调用 Lock ,Lock函数就会把 readerCount 设置为负数),并且读者获取锁的过程会走较快的非阻塞的分支,因为这时候读者获取锁的过程只涉及到 atomic.AddInt32 的调用。
信号量(semaphore)
这是一个由 Edsger Dijkstra 提出的数据结构,解决很多关于同步的问题时,它都很好用。它是一个提供了两种操作的整数:
获取(acquire,又称 wait、decrement 或者 P)
释放(release,又称 signal、increment 或者 V)
获取操作把信号量减一,如果减一的结果是非负数,那么线程可以继续执行。如果结果是负数,那么线程将会被阻塞,除非有其它线程把信号量增加回非负数,该线程才有可能恢复运行)。
释放操作把信号量加一,如果当前有被阻塞的线程,那么它们其中一个会被唤醒,恢复执行。
Go 语言的运行时提供了 runtime_SemacquireMutex 和 runtime_Semrelease 函数,像 sync.RWMutex 这些对象的实现会用到这两个函数。
Lock 方法
func (rw *RWMutex) Lock() {
…
rw.w.Lock()
// 通过把 rw.readerCount 设置成负数,来告知读者当前有写者正在等待进入临界区
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false)
}
…
}
Lock 方法让写者可以获得对共享数据的独占访问权:
首先它会获取一个叫 w 的互斥量(mutex),这会使得其它的写者无法访问这个共享数据,这个w 只有在 Unlock 函数快结束的时候,才会被解锁,从而保证一次最多只能有一个写者进入临界区。
然后 Lock 方法会把 readerCount 的值设置成负数,(通过把readerCount 减掉 rwmutexMaxReaders(即1 « 30))。然后接下来任何读者调用 RLock 函数时,都会被阻塞掉了:
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// rw.readerCount 是负数,说明有写者正在等待进入临界区或者正在临界区内,等待写者执行完成
runtime_SemacquireMutex(&rw.readerSem, false)
}
后续来到临界区的读者们将会被阻塞,那正在运行的读者们会怎样呢?readerWait 字段就是用来记录当前有多少读者正在运行。写者阻塞在信号量 rw.writerSem 里,直到最后一个正在运行的读者执行完毕,它调用的 RUnlock 方法会把 rw.writerSem 信号量加一(我后面会讲到),这时写者才能被唤醒、进入临界区。
如果没有正在运行的读者,那么写者就可以直接进入临界区了。
rwmutexMaxReaders
(译注:原文大量使用的 pending 这个词常常被翻译为「挂起」(有暂停的语义),但是在本文中,pending 表示的是「等待进入临界区(这时是线程是暂停的)或者正在临界区里面(这时是线程正在运行的)」这个状态。「挂起」不能很好的表达该语义,所以 pending 保留原文不翻译,但读者要注意 pending 在本文的语义,例如:「一个 pending 的读者」可以理解为是一个调用了 RLock 函数但是还没调用 RUnlock 函数的读者。「一个 pending 的写者」则相应地表示一个调用了Lock 函数但是还没调用 Unlock 函数的写者)
在 rwmutex.go 里面有一个常量:
const rwmutexMaxReaders = 1 « 30
这个 1 « 30 是什么意思、做什么用的呢?
readerCount 字段是 int32 类型的,它的有效范围是:
[-1 « 31, (1 « 31) - 1] 或者说 [-2147483648, 2147483647]
RWMutex 使用这个字段来记录当前 pending 的读者数,并且这个字段还标记着当前是否有写者在 pending 状态。在 Lock 方法里面:
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
readerCount 字段被减掉了 1«30。当 readerCount 的值为负数时,说明当前存在 pending 状态的写者。而 readerCount 再加回 1«30,又能表示当前 pending 的读者的数量。最后,rwmutexMaxReaders 还限制了 pending 读者的数量。如果我们的当前 pending 的读者数量比 rwmutexMaxReaders 还要多的话,那么 readerCount 减去 rwmutexMaxReaders 就不是负数了,这样整个机制都会被破坏掉。从中我们可以知道,pending 的读者数量不能大于 rwmutexMaxReaders - 1 ,它的值超过了 10 亿——1073741823。
RUnlock
func (rw *RWMutex) RUnlock() {
…
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw(“sync: RUnlock of unlocked RWMutex”)
}
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false)
}
}
…
}
这个方法会把 readerCount 减一 (之前是 RLock 方法把这个值增加了的),如果 readerCount 是负数,意味着当前存在 pending 状态的写者,因为正如上面所说的,在写者调用 Lock 方法的时候,readerCount 的值会减掉 rwmutexMaxReaders,从而使 readerCount 变成负数。
然后这个方法会检查当前正在临界区里面的读者数是不是已经是 0 了,如果是的话,意味着等待进入临界区的写者可以获取到 rw.writerSem 信号量、进入临界区了。
Unlock
func (rw *RWMutex) Unlock() {
…
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw(“sync: Unlock of unlocked RWMutex”)
}
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false)
}
rw.w.Unlock()
…
}
要解锁写者拥有的写锁,首先 readerCount 的值要增加 rwmutexMaxReaders,这个操作会使得 readerCount 恢复成非负数,如果这时候 readerCount 大于 0,这意味着当前有读者在等待着写者离开临界区。最后写者释放掉它拥有的 w 这个互斥量(译注:上文说过,这个互斥量是写者用来防止其它写者进入临界区的),这使得其它写者能够有机会再次锁定 w 这个互斥量。
如果读者或写者尝试在一个已经解锁的 RWMutex 上调用Unlock 和 RUnlock 方法会抛出错误(代码):
m := sync.RWMutex{}
m.Unlock()
输出:
fatal error: sync: Unlock of unlocked RWMutex
…
递归地读锁定
文档里面写道:
如果一个 goroutine 拥有一个读锁,而另外一个 goroutine 又调用了 Lock 函数,那么在第一个读锁被释放之前,没有读者可以获得读锁。这尤其限制了我们不能递归地获取读锁,因为只有这样才能确保锁都能变得可用,一个 Lock 的调用会阻止新的读者获取到读锁。(上文已经多次提到这一点了)
因为 RWMutex 就是这么实现的:如果当前有一个 pending 的写者,那么所有尝试调用 RLock 的读者都会被阻塞,即使在这之前已经有读者获取到了读锁(源代码):
package main
import (
“fmt”
“sync”
“time”
)
var m sync.RWMutex
func f(n int) int {
if n < 1 {
return 0
}
fmt.Println(“RLock”)
m.RLock()
defer func() {
fmt.Println(“RUnlock”)
m.RUnlock()
}()
time.Sleep(100 * time.Millisecond)
return f(n-1) + n
}
func main() {
done := make(chan int)
go func() {
time.Sleep(200 * time.Millisecond)
fmt.Println(“Lock”)
m.Lock()
fmt.Println(“Unlock”)
m.Unlock()
done <- 1
}()
f(4)
<-done
}
输出:
RLock
RLock
RLock
Lock
RLock
fatal error: all goroutines are asleep - deadlock!
(译注:上面的代码有两个 goroutine,一个是写者 routine,一个是主 goroutine(也是读者),通过程序的输出可以知道:前三行都是输出 RLock,表示这时候已经有 3 个读者获取到了读锁。后面接着输出了 Lock, 表示这时候写者开始请求写锁,后面接着输出一个 RLock,表示这时又多了一个读者请求读锁。因为 pending 的写者会阻塞掉后续调用 RLock 的读者,所以最后一个 RLock 的调用堵塞了主 routine,而写者的 routine 也在堵塞等待前面三个读者释放它们的读锁,所以两个 goroutine 都堵塞了,因此程序报错:fatal error: all goroutines are asleep - deadlock!)
锁的拷贝
go tool vet 可以检测到是否有锁被按值拷贝了,因为这种情况会导致死锁,具体的情况可以看之前的一篇文章:Detect locks passed by value in Go (译注:GCTT 译文:检测 Go 程序中按值传递的 locks
性能
之前有人提出:随着 CPU 核心数量的增加,RWMutex 的性能会降低,详见:https://github.com/golang/go/issues/17973
锁的争用
Go 1.8 版本开始支持分析 mutex 的争用情况(译注:原文 Contention,参考维基百科#Granularity))(patch 补丁),我们来看看它是怎么用的:
import (
“net/http”
_ “net/http/pprof”
“runtime”
“sync”
“time”
)
func main() {
var mu sync.Mutex
runtime.SetMutexProfileFraction(5)
for i := 0; i < 10; i++ {
go func() {
for {
mu.Lock()
time.Sleep(100 * time.Millisecond)
mu.Unlock()
}
}()
}
http.ListenAndServe(“:8888”, nil)
}
go build mutexcontention.go
./mutexcontention
当程序 mutexcontention 运行时:
go tool pprof mutexcontention http://localhost:8888/debug/pprof/mutex?debug=1
Fetching profile over HTTP from http://localhost:8888/debug/pprof/mutex?debug=1
Saved profile in /Users/mlowicki/pprof/pprof.mutexcontention.contentions.delay.003.pb.gz
File: mutexcontention
Type: delay
Entering interactive mode (type “help” for commands, “o” for options)
(pprof) list main
Total: 57.28s
ROUTINE ======================== main.main.func1 in /Users/mlowicki/projects/golang/src/github.com/mlowicki/mutexcontention/mutexcontention.go
0 57.28s (flat, cum) 100% of Total
. . 14: for i := 0; i < 10; i++ {
. . 15: go func() {
. . 16: for {
. . 17: mu.Lock()
. . 18: time.Sleep(100 * time.Millisecond)
. 57.28s 19: mu.Unlock()
. . 20: }
. . 21: }()
. . 22: }
. . 23:
. . 24: http.ListenAndServe(“:8888”, nil)
上面的 57.28s 是什么,它为什么挨着 mu.Unlock() 呢?
当 goroutine 因为调用 Lock 方法而被阻塞的时候,这个时间点会被记录下来——aquiretime(获取时间)。当其他 goroutine 解锁了这个锁,并且起码有一个 goroutine 在等待获取这个锁的时候。其中一个 goroutine 可以获取到这个锁,这时他会自动调用 mutexevent 函数。函数 mutexevent 根据 SetMutexProfileFraction 函数设定的比率,来确定是否应该保存或忽略掉该事件。这种事件都包含了等待时间(当前时间 - 获取时间)。上述的代码中,所有阻塞在这个锁的 goroutine 的总等待时间会被收集和显示出来,
对于读锁(Rlock 和 RUnlock)争用的分析功能,将会在 Go 1.11 版本加入 (patch 补丁)