go语言sync包的学习(Mutex、WaitGroup、Cond)

//加锁,注意锁要以指针的形式传进来,不然只是拷贝
func total1(num *int, mu *sync.Mutex, ch chan bool) {
mu.Lock();
for i := 0; i < 1000; i++ {
*num += i;
}
ch <- true;
mu.Unlock();
}
//Lock、Unlock与RLock、RUnlock不能嵌套使用
func printNum(num int, cond *sync.Cond) {
cond.L.Lock();
if num < 5 {
//num小于5时,进入等待状态
cond.Wait();
}
//大于5的正常输出
fmt.Println(num);
cond.L.Unlock();
}
//Once.Do()保证多次调用只执行一次
once := sync.Once{};
ch := make(chan bool, 3);
for i := 0; i < 3; i++ {
go func(n int) {
once.Do(func() {
//只会执行一次,因为闭包引用了变量n,最后的值为2
fmt.Println(n)



//读写锁,多了读锁定,和读解锁,让多个goroutine同时读取对象
rwmutex := sync.RWMutex{};
//组等待,等待一组goroutine的结束
wg := sync.WaitGroup{};
//增加计数器
wg.Add(10);
for i:= 0; i< 10; i++ {
go func(n int) {
fmt.Print(n, “ “);
//这里表示该goroutine执行完成
wg.Done();
}(i);
}
//等待所有线程执行完成
wg.Wait();

无论是互斥锁,还是自旋锁,在任何时刻,最多只能有一个保持者,也就说,在任何时刻最多只能有一个执行单元获得锁。但是两者在调度机制上略有不同。对于互斥锁,如果资源已经被占用,资源申请者只能进入睡眠状态。但是自旋锁不会引起调用者睡眠,如果自旋锁已经被别的执行单元保持,调用者就一直循环在那里看是否该自旋锁的保持者已经释放了锁,”自旋”一词就是因此而得名。



信号量(semaphore)
这是一个由 Edsger Dijkstra 提出的数据结构,解决很多关于同步的问题时,它都很好用。它是一个提供了两种操作的整数:



获取(acquire,又称 wait、decrement 或者 P)
释放(release,又称 signal、increment 或者 V)
获取操作把信号量减一,如果减一的结果是非负数,那么线程可以继续执行。如果结果是负数,那么线程将会被阻塞,除非有其它线程把信号量增加回非负数,该线程才有可能恢复运行)。



释放操作把信号量加一,如果当前有被阻塞的线程,那么它们其中一个会被唤醒,恢复执行。



Go 语言的运行时提供了 runtime_SemacquireMutex 和 runtime_Semrelease 函数,像 sync.RWMutex 这些对象的实现会用到这两个函数。



Lock 方法
func (rw *RWMutex) Lock() {

rw.w.Lock()
// 通过把 rw.readerCount 设置成负数,来告知读者当前有写者正在等待进入临界区
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false)
}

}
Lock 方法让写者可以获得对共享数据的独占访问权:



首先它会获取一个叫 w 的互斥量(mutex),这会使得其它的写者无法访问这个共享数据,这个w 只有在 Unlock 函数快结束的时候,才会被解锁,从而保证一次最多只能有一个写者进入临界区。



然后 Lock 方法会把 readerCount 的值设置成负数,(通过把readerCount 减掉 rwmutexMaxReaders(即1 « 30))。然后接下来任何读者调用 RLock 函数时,都会被阻塞掉了:



if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// rw.readerCount 是负数,说明有写者正在等待进入临界区或者正在临界区内,等待写者执行完成
runtime_SemacquireMutex(&rw.readerSem, false)
}
后续来到临界区的读者们将会被阻塞,那正在运行的读者们会怎样呢?readerWait 字段就是用来记录当前有多少读者正在运行。写者阻塞在信号量 rw.writerSem 里,直到最后一个正在运行的读者执行完毕,它调用的 RUnlock 方法会把 rw.writerSem 信号量加一(我后面会讲到),这时写者才能被唤醒、进入临界区。



如果没有正在运行的读者,那么写者就可以直接进入临界区了。



rwmutexMaxReaders
(译注:原文大量使用的 pending 这个词常常被翻译为「挂起」(有暂停的语义),但是在本文中,pending 表示的是「等待进入临界区(这时是线程是暂停的)或者正在临界区里面(这时是线程正在运行的)」这个状态。「挂起」不能很好的表达该语义,所以 pending 保留原文不翻译,但读者要注意 pending 在本文的语义,例如:「一个 pending 的读者」可以理解为是一个调用了 RLock 函数但是还没调用 RUnlock 函数的读者。「一个 pending 的写者」则相应地表示一个调用了Lock 函数但是还没调用 Unlock 函数的写者)



在 rwmutex.go 里面有一个常量:



const rwmutexMaxReaders = 1 « 30
这个 1 « 30 是什么意思、做什么用的呢?



readerCount 字段是 int32 类型的,它的有效范围是:



[-1 « 31, (1 « 31) - 1] 或者说 [-2147483648, 2147483647]
RWMutex 使用这个字段来记录当前 pending 的读者数,并且这个字段还标记着当前是否有写者在 pending 状态。在 Lock 方法里面:



r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
readerCount 字段被减掉了 1«30。当 readerCount 的值为负数时,说明当前存在 pending 状态的写者。而 readerCount 再加回 1«30,又能表示当前 pending 的读者的数量。最后,rwmutexMaxReaders 还限制了 pending 读者的数量。如果我们的当前 pending 的读者数量比 rwmutexMaxReaders 还要多的话,那么 readerCount 减去 rwmutexMaxReaders 就不是负数了,这样整个机制都会被破坏掉。从中我们可以知道,pending 的读者数量不能大于 rwmutexMaxReaders - 1 ,它的值超过了 10 亿——1073741823。



RUnlock
func (rw *RWMutex) RUnlock() {

if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw(“sync: RUnlock of unlocked RWMutex”)
}
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false)
}
}

}
这个方法会把 readerCount 减一 (之前是 RLock 方法把这个值增加了的),如果 readerCount 是负数,意味着当前存在 pending 状态的写者,因为正如上面所说的,在写者调用 Lock 方法的时候,readerCount 的值会减掉 rwmutexMaxReaders,从而使 readerCount 变成负数。



然后这个方法会检查当前正在临界区里面的读者数是不是已经是 0 了,如果是的话,意味着等待进入临界区的写者可以获取到 rw.writerSem 信号量、进入临界区了。



Unlock
func (rw *RWMutex) Unlock() {

r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw(“sync: Unlock of unlocked RWMutex”)
}
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false)
}
rw.w.Unlock()

}
要解锁写者拥有的写锁,首先 readerCount 的值要增加 rwmutexMaxReaders,这个操作会使得 readerCount 恢复成非负数,如果这时候 readerCount 大于 0,这意味着当前有读者在等待着写者离开临界区。最后写者释放掉它拥有的 w 这个互斥量(译注:上文说过,这个互斥量是写者用来防止其它写者进入临界区的),这使得其它写者能够有机会再次锁定 w 这个互斥量。



如果读者或写者尝试在一个已经解锁的 RWMutex 上调用Unlock 和 RUnlock 方法会抛出错误(代码):



m := sync.RWMutex{}
m.Unlock()
输出:



fatal error: sync: Unlock of unlocked RWMutex



递归地读锁定
文档里面写道:



如果一个 goroutine 拥有一个读锁,而另外一个 goroutine 又调用了 Lock 函数,那么在第一个读锁被释放之前,没有读者可以获得读锁。这尤其限制了我们不能递归地获取读锁,因为只有这样才能确保锁都能变得可用,一个 Lock 的调用会阻止新的读者获取到读锁。
锁的拷贝
go tool vet 可以检测到是否有锁被按值拷贝了,因为这种情况会导致死锁,具体的情况可以看之前的一篇文章:Detect locks passed by value in Go (译注:GCTT 译文:检测 Go 程序中按值传递的 locks



性能
之前有人提出:随着 CPU 核心数量的增加,RWMutex 的性能会降低,详见:https://github.com/golang/go/issues/17973



type Mutex struct {



state int32 // 将一个32位整数拆分为 当前阻塞的goroutine数(29位)|饥饿状态(1位)|唤醒状态(1位)|锁状态(1位) 的形式,来简化字段设计

sema uint32 // 信号量


}



const (



mutexLocked = 1 << iota // 1 0001 含义:用最后一位表示当前对象锁的状态,0-未锁住 1-已锁住

mutexWoken // 2 0010 含义:用倒数第二位表示当前对象是否被唤醒 0-唤醒 1-未唤醒

mutexStarving // 4 0100 含义:用倒数第三位表示当前对象是否为饥饿模式,0为正常模式,1为饥饿模式。

mutexWaiterShift = iota // 3,从倒数第四位往前的bit位表示在排队等待的goroutine数

starvationThresholdNs = 1e6 // 1ms


)



WaitGroup总共有三个方法:Add(delta int),Done(),Wait()。简单的说一下这三个方法的作用。



Add:添加或者减少等待goroutine的数量



Done:相当于Add(-1)



Wait:执行阻塞,直到所有的WaitGroup数量变成0



golang中的同步是通过sync.WaitGroup来实现的.WaitGroup的功能:它实现了一个类似队列的结构,可以一直向队列中添加任务,当任务完成后便从队列中删除,如果队列中的任务没有完全完成,可以通过Wait()函数来出发阻塞,防止程序继续进行,直到所有的队列任务都完成为止.



WaitGroup的特点是Wait()可以用来阻塞直到队列中的所有任务都完成时才解除阻塞,而不需要sleep一个固定的时间来等待



//声明一个全局变量
var waitgroup sync.WaitGroup



func Afunction(shownum int) {
fmt.Println(shownum)
waitgroup.Done() //任务完成,将任务队列中的任务数量-1,其实.Done就是.Add(-1)
}



func main() {
for i := 0; i < 10; i++ {
waitgroup.Add(1) //每创建一个goroutine,就把任务队列中任务的数量+1
go Afunction(i)
}
waitgroup.Wait() //.Wait()这里会发生阻塞,直到队列中所有的任务结束就会解除阻塞
}



Category golang