Cond

Cond用于在并发环境下routine的等待和通知



//创建Cond
cond := sync.NewCond(new(sync.Mutex))
//等待唤醒
cond.L.Lock()
cond.Wait()
//唤醒一个
cond.Signal()
//唤醒所有
cond.Broadcast()



结构体定义
type Cond struct {
noCopy noCopy //不允许复制,一个结构体,有一个Lock()方法,嵌入别的结构体中,表示不允许复制
L Locker //锁
notify notifyList //通知列表,调用Wait()方法的routine会被放入list中,每次唤醒,从这里取出
checker copyChecker //复制检查,检查cond实例是否被复制
}



noCopy:noCopy对象,拥有一个Lock方法,使得Cond对象在进行go vet扫描的时候,能够被检测到是否被复制。
/*
package: sync
file: cond.go
line: 94
*/
type noCopy struct{}



func (*noCopy) Lock() {}

‘构造’方法
// NewCond returns a new Cond with Locker l.
//通过一个Locker实例初始化,传参数的时候必须是引用或指针,比如&sync.Mutex{}或new(sync.Mutex)
//不然会报异常:cannot use lock (type sync.Mutex) as type sync.Locker in argument to sync.NewCond:
//sync.Mutex does not implement sync.Locker (Lock method has pointer receiver)
func NewCond(l Locker) *Cond {
return &Cond{L: l}
}
常用方法
Wait
//调用此方法会将此routine加入通知列表,并等待获取通知,调用此方法必须先Lock,不然方法里会调用Unlock(),报错.
func (c *Cond) Wait() {
c.checker.check() //检查是否被复制
t := runtime_notifyListAdd(&c.notify) //加入通知列表
c.L.Unlock() // 释放锁
runtime_notifyListWait(&c.notify, t) //等待通知
c.L.Lock() //被通知了,获取锁,继续运行
}
Signal
//唤醒在Wait的routine中的一个
func (c *Cond) Signal() {
c.checker.check() //检查是否被复制
runtime_notifyListNotifyOne(&c.notify) //通知等待列表中的一个
}
Broadcast
//唤醒所有等待的
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}



条件变量sync.Cond本质上是一些正在等待某个条件的线程的同步机制。



sync.Cond 主要实现一个条件变量,假如 goroutine A 执行前需要等待另外的goroutine B 的通知,那边处于等待的goroutine A 会保存在一个通知列表,也就是说需要某种变量状态的goroutine A 将会等待/Wait在那里,当某个时刻状态改变时负责通知的goroutine B 通过对条件变量通知的方式(Broadcast,Signal)来通知处于等待条件变量的goroutine A, 这样便可首先一种“消息通知”的同步机制。



以go的http处理为例,在Go的源码中http模块server部分源码中所示,当需要处理一个新的连接的时候,若连接conn是实现自tls.Conn的情况下,会进行相关的客户端与服务端的“握手”处理Handshake(), 入口代码如下
if tlsConn, ok := c.rwc.(
tls.Conn); ok {
if d := c.server.ReadTimeout; d != 0 {
c.rwc.SetReadDeadline(time.Now().Add(d))
}
if d := c.server.WriteTimeout; d != 0 {
c.rwc.SetWriteDeadline(time.Now().Add(d))
}
if err := tlsConn.Handshake(); err != nil {
c.server.logf(“http: TLS handshake error from %s: %v”, c.rwc.RemoteAddr(), err)
return
}
c.tlsState = new(tls.ConnectionState)
*c.tlsState = tlsConn.ConnectionState()
if proto := c.tlsState.NegotiatedProtocol; validNPN(proto) {
if fn := c.server.TLSNextProto[proto]; fn != nil {
h := initNPNRequest{tlsConn, serverHandler{c.server}}
fn(c.server, tlsConn, h)
}
return
}



其中的Handshake函数代码通过使用条件变量的方式来处理新连接握手调用的同步问题:
func (c *Conn) Handshake() error {
c.handshakeMutex.Lock()
defer c.handshakeMutex.Unlock()



for {
if err := c.handshakeErr; err != nil {
return err
}
if c.handshakeComplete {
return nil
}
if c.handshakeCond == nil {
break
}



c.handshakeCond.Wait()
}



c.handshakeCond = sync.NewCond(&c.handshakeMutex)
c.handshakeMutex.Unlock()



c.in.Lock()
defer c.in.Unlock()



c.handshakeMutex.Lock()



if c.handshakeErr != nil || c.handshakeComplete {
panic(“handshake should not have been able to complete after handshakeCond was set”)
}



if c.isClient {
c.handshakeErr = c.clientHandshake()
} else {
c.handshakeErr = c.serverHandshake()
}
if c.handshakeErr == nil {
c.handshakes++
} else {
c.flush()
}



if c.handshakeErr == nil && !c.handshakeComplete {
panic(“handshake should have had a result.”)
}



c.handshakeCond.Broadcast()
c.handshakeCond = nil



当使用sync.Cond的时候有两点移动要注意的:



一定要在调用cond.Wait方法前,锁定与之关联的读写锁
一定不要忘记在cond.Wait后,若数据已经处理完毕,在返回前要对与之关联的读写锁进行解锁。



如果不释放锁, 其它收到信号的gouroutine将阻塞无法继续执行。



两个要点
Cond不能被复制:Cond在内部持有一个等待队列,这个队列维护所有等待在这个Cond的goroutine。因此若这个Cond允许值传递,则这个队列在值传递的过程中会进行复制,导致在唤醒goroutine的时候出现错误。
顺序唤醒: notifyList对象持有两个无限自增的字段wait和notify,wait字段在有新的goroutine等待的时候加1,notify字段在有新的唤醒信号的时候加1。在有新的goroutine加入队列的时候,会将当前wait赋值给goroutine的ticket,唤醒的时候会唤醒ticket等于notify的gourine。另外,当wait==notify时表示没有goroutine需要被唤醒,wait>notify时表示有goroutine需要被唤醒,waity恒大于等于notify
// Cond实现了一个条件变量,一个等待或宣布事件发生的goroutines的集合点。
//
// 每个Cond都有一个相关的Locker L(通常是* Mutex或* RWMutex)。
type Cond struct {
// 不允许复制,一个结构体,有一个Lock()方法,嵌入别的结构体中,表示不允许复制
// noCopy对象,拥有一个Lock方法,使得Cond对象在进行go vet扫描的时候,能够被检测到是否被复制
noCopy noCopy



// 锁的具体实现,通常为 mutex 或者rwmutex
L Locker

// 通知列表,调用Wait()方法的goroutine会被放入list中,每次唤醒,从这里取出
// notifyList对象,维护等待唤醒的goroutine队列,使用链表实现
// 在 sync 包中被实现, src/sync/runtime.go
notify notifyList

// 复制检查,检查cond实例是否被复制
// copyChecker对象,实际上是uintptr对象,保存自身对象地址
checker copyChecker


}



// NewCond方法传入一个实现了Locker接口的对象,返回一个新的Cond对象指针,
// 保证在多goroutine使用cond的时候,持有的是同一个实例
func NewCond(l Locker) *Cond {
return &Cond{L: l}
}



// 等待原子解锁c.L并暂停执行调用goroutine。
// 稍后恢复执行后,Wait会在返回之前锁定c.L.
// 与其他系统不同,除非被广播或信号唤醒,否则等待无法返回。



// 因为等待第一次恢复时c.L没有被锁定,
// 所以当Wait返回时,调用者通常不能认为条件为真。
// 相反,调用者应该循环等待:



// c.L.Lock()
// for !condition() {
// c.Wait()
// }
// … make use of condition …
// c.L.Unlock()
//



// 调用此方法会将此routine加入通知列表,并等待获取通知,调用此方法必须先Lock,不然方法里会调用Unlock(),报错
//
func (c *Cond) Wait() {



// 检查是否被复制; 如果是就panic
// check检查,保证cond在第一次使用后没有被复制
c.checker.check()
// 将当前goroutine加入等待队列, 该方法在 runtime 包的 notifyListAdd 函数中实现
// src/runtime/sema.go
t := runtime_notifyListAdd(&c.notify)
// 释放锁,
// 因此在调用Wait方法前,必须保证获取到了cond的锁,否则会报错
c.L.Unlock()

// 等待队列中的所有的goroutine执行等待唤醒操作
// 将当前goroutine挂起,等待唤醒信号
// 该方法在 runtime 包的 notifyListWait 函数中实现
// src/runtime/sema.go
runtime_notifyListWait(&c.notify, t)
// 被通知了,获取锁,继续运行
c.L.Lock() }


//
// 唤醒单个 等待的 goroutine
func (c *Cond) Signal() {
// 检查c是否是被复制的,如果是就panic
// 保证cond在第一次使用后没有被复制
c.checker.check()
// 通知等待列表中的一个
// 顺序唤醒一个等待的gorountine
// 在runtime 包的 notifyListNotifyOne 函数中被实现
// src/runtime/sema.go
runtime_notifyListNotifyOne(&c.notify)
}



// 唤醒等待队列中的所有goroutine。
func (c *Cond) Broadcast() {
// 检查c是否是被复制的,如果是就panic
// 保证cond在第一次使用后没有被复制
c.checker.check()
// 唤醒等待队列中所有的goroutine
// 有runtime 包的 notifyListNotifyAll 函数实现
// src\runtime\sema.go
runtime_notifyListNotifyAll(&c.notify)
}



// copyChecker保持指向自身的指针以检测对象复制。
type copyChecker uintptr



// 检查c是否被复制,如果是则panic
/**
check方法在第一次调用的时候,会将checker对象地址赋值给checker,也就是将自身内存地址赋值给自身
/
func (c *copyChecker) check() {
/**
因为 copyChecker的底层类型为 uintptr
那么 这里的 *c其实就是 copyChecker类型本身,然后强转成uintptr
和拿着 c 也就是copyChecker的指针去求 uintptr,理论上要想等
即:内存地址为一样,则表示没有被复制
*/
// 下述做法是:
// 其实 copyChecker中存储的对象地址就是 copyChecker 对象自身的地址
// 先把 copyChecker 处存储的对象地址和自己通过 unsafe.Pointer求出来的对象地址作比较,
// 如果发现不相等,那么就尝试的替换,由于使用的 old是0,
// 则表示c还没有开辟内存空间,也就是说,只有是首次开辟地址才会替换成功
// 如果替换不成功,则表示 copyChecker出所存储的地址和 unsafe计算出来的不一致
// 则表示对象是被复制了
if uintptr(
c) != uintptr(unsafe.Pointer(c)) &&
!atomic.CompareAndSwapUintptr((uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
uintptr(
c) != uintptr(unsafe.Pointer(c)) {
panic(“sync.Cond is copied”)
}
}



// noCopy可以嵌入到结构中,在第一次使用后不得复制。
//
// 详细介绍请查看: https://github.com/golang/go/issues/8005#issuecomment-190753527
type noCopy struct{}



// Lock is a no-op used by -copylocks checker from go vet.
// Lock 是有 go vet 命令来判断是否有 copy 的检查的
func (*noCopy) Lock() {}



// sync/runtime.go
// 使用链表实现
type notifyList struct {



wait   uint32       // 等待数
notify uint32 // 唤醒数
lock uintptr // 信号锁
// 使用链表实现
head unsafe.Pointer // 队列的当前头
tail unsafe.Pointer // 队列的当前尾 }


L:实现了Locker接口的锁对象,通常使用Mutex或RWMutex。
/*
package: sync
file: mutex.go
line: 31
*/
type Locker interface {
Lock()
Unlock()
}



notify:notifyList对象,维护等待唤醒的goroutine队列,使用链表实现。
/*
package: sync
file: runtime.go
line: 29

*/
type notifyList struct {
wait uint32
notify uint32
lock uintptr
head unsafe.Pointer
tail unsafe.Pointer
}



checker:copyChecker对象,实际上是uintptr对象,保存自身对象地址。
/*
package: sync
file: cond.go
line: 79

*/

type copyChecker uintptr



func (c copyChecker) check() {
if uintptr(
c) != uintptr(unsafe.Pointer(c)) &&
!atomic.CompareAndSwapUintptr((uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
uintptr(
c) != uintptr(unsafe.Pointer(c)) {
panic(“sync.Cond is copied”)
}
}



检查当前checker的地址是否等于保存在checker中的地址
对checker进行原子CAS操作,将checker当前地址赋值给为空的checker
重复操作1,防止在进行1和2的时候,有其他gorountine并发的修改了checker值
若1、2、3都不满足,则表示当前cond是复制的,抛出panic



check方法在第一次调用的时候,会将checker对象地址赋值给checker,也就是将自身内存地址赋值给自身。
再次调用checker方法的时候,会将当前checker地址的值与保存的checker地址值进行比较,若不相同则表示当前checker的地址不是第一次调用check方法时候的地址,即cond对象被复制了,checker被重新分配了内存地址。



方法
NwoCond
/*
package: sync
file: cond.go
line: 32

*/

func NewCond(l Locker) *Cond {
return &Cond{L: l}
}



NewCond方法传入一个实现了Locker接口的对象,返回一个新的Cond对象指针,保证在多goroutine使用cond的时候,持有的是同一个实例。
Wait
/*
package: sync
file: cond.go
line: 52

*/
func (c *Cond) Wait() {
c.checker.check() //step 1
t := runtime_notifyListAdd(&c.notify) //step 2
c.L.Unlock() //step 3
runtime_notifyListWait(&c.notify, t) //step 4
c.L.Lock() //step 5
}



check检查,保证cond在第一次使用后没有被复制
将notify队列的等待数加1,并将之前的等待数返回
/*
package: runtime
file: sema.go
line: 476

*/
func notifyListAdd(l *notifyList) uint32 {
return atomic.Xadd(&l.wait, 1) - 1
}



释放锁,因此在调用Wait方法前,必须保证获取到了cond的锁,否则会报错
将当前goroutine挂起,等待唤醒信号
/*
package: runtime
file: sema.go
line: 485

*/
func notifyListWait(l *notifyList, t uint32) {
lock(&l.lock) //step a



if less(t, l.notify) { //step b
unlock(&l.lock)
return
}



s := acquireSudog() //step c
s.g = getg()
s.ticket = t
s.releasetime = 0
t0 := int64(0)
if blockprofilerate > 0 {
t0 = cputicks()
s.releasetime = -1
}
if l.tail == nil { //step d
l.head = s
} else {
l.tail.next = s
}
l.tail = s
goparkunlock(&l.lock, “semacquire”, traceEvGoBlockCond, 3) //step e
if t0 != 0 {
blockevent(s.releasetime-t0, 2)
}
releaseSudog(s)
}



a. 锁定notify队列
b. 如果notif队列的等待数小于唤醒数 表示当前goroutine不需要再进行等待,则解锁notify队列,直接返回
c. 获取当前goroutine,设置相关参数,将当前等待数赋值给ticket
d. 将当前goroutine加入到notify队列中
e. 将当前goroutine挂起,等待唤醒信号



gorountine被唤醒,重新获取锁



Signal
/*
package: sync
file: cond.go
line: 64

*/
func (c *Cond) Signal() {
c.checker.check() //step 1
runtime_notifyListNotifyOne(&c.notify) //step 2
}



check检查,保证cond在第一次使用后没有被复制



顺序唤醒一个等待的gorountine
/*
package: runtime
file: sema.go
line: 485

*/
func notifyListNotifyOne(l *notifyList) {



if atomic.Load(&l.wait) == atomic.Load(&l.notify) { //step a
return
}



lock(&l.lock) //step b



t := l.notify
if t == atomic.Load(&l.wait) {
unlock(&l.lock)
return
}



atomic.Store(&l.notify, t+1)



for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next { //step c
if s.ticket == t {
n := s.next
if p != nil {
p.next = n
} else {
l.head = n
}
if n == nil {
l.tail = p
}
unlock(&l.lock)
s.next = nil
readyWithTime(s, 4)
return
}
}
unlock(&l.lock)
}



a. 如果notify队列的等待数等于唤醒数,表示没有新的goroutine在等待队列上,不需要唤醒任何goroutine
b. 锁定notify队列,对队列进行双检查,查看是否有新的goroutine需要被唤醒,若无则将唤醒数加1
c. 遍历等待唤醒的goroutine队列,唤醒ticket等于唤醒数的goroutine,即顺序唤醒一个最先加入等待队列的goroutine



Broadcast
/*
package: sync
file: cond.go
line: 73

*/
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}



check检查,保证cond在第一次使用后没有被复制



唤醒所有gorountine
/*
package: runtime
file: sema.go
line: 485

*/
func notifyListNotifyAll(l *notifyList) {



if atomic.Load(&l.wait) == atomic.Load(&l.notify) { //step a
return
}



lock(&l.lock) //step b
s := l.head
l.head = nil
l.tail = nil



atomic.Store(&l.notify, atomic.Load(&l.wait))
unlock(&l.lock)



for s != nil { //step c
next := s.next
s.next = nil
readyWithTime(s, 4)
s = next
}
}



a. 如果notify队列的等待数等于唤醒数,表示没有新的goroutine在等待队列上,不需要唤醒任何goroutine
b. 锁定notify队列,清空等待唤醒队列,将等待数赋值给唤醒数
c. 遍历等待唤醒的gorountine队列,将所有goroutine唤醒



总结



Cond不能被复制:Cond在内部持有一个等待队列,这个队列维护所有等待在这个Cond的goroutine。因此若这个Cond允许值传递,则这个队列在值传递的过程中会进行复制,导致在唤醒goroutine的时候出现错误。
顺序唤醒: notifyList对象持有两个无限自增的字段wait和notify,wait字段在有新的goroutine等待的时候加1,notify字段在有新的唤醒信号的时候加1。在有新的goroutine加入队列的时候,会将当前wait赋值给goroutine的ticket,唤醒的时候会唤醒ticket等于notify的gourine。另外,当wait==notify时表示没有goroutine需要被唤醒,wait>notify时表示有goroutine需要被唤醒,waity恒大于等于notify。


Category golang