channel

https://golang.org/ref/mem
一,初始化过程在这之前,先看下asm_arm64.s中的汇编代码关于启动这块的逻辑
CALL runtime·args(SB)
CALL runtime·osinit(SB)
CALL runtime·hashinit(SB)
CALL runtime·schedinit(SB)



// create a new goroutine to start program
PUSHQ $runtime·main·f(SB) // entry
PUSHQ $0 // arg size
CALL runtime·newproc(SB)
POPQ AX
POPQ AX



// start this M
CALL runtime·mstart(SB)复制代码接下来就进入分析环节
1,通过osinit函数还获取cpu个数和page的大小,这块挺简单的2,接下来看看schedinit函数(跟本节相关的重要代码)
func schedinit() {
//获取当前的G
g := getg()
if raceenabled {
g.racectx, raceprocctx0 = raceinit()
}
//设置M的最大数量
sched.maxmcount = 10000
//初始化栈空间
stackinit()
//内存空间初始化操作
mallocinit()
//初始化当前的M
mcommoninit(g.m)



//将P的数量调整为CPU数量
procs := ncpu
if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
procs = n
}
if procs > _MaxGomaxprocs {
procs = _MaxGomaxprocs
}
//初始化P
if procresize(procs) != nil {
throw("unknown runnable goroutine during bootstrap")
}


}复制代码3,上面我们可以看到调用了procresize函数来初始化P,那么我们来看下procresize函数。这块代码过长,分几个部分解析(只贴重要的代码)(1) 初始化新的P
for i := int32(0); i < nprocs; i++ {
pp := allp[i]
if pp == nil {
//新建一个P对象
pp = new(p)
pp.id = i
pp.status = Pgcstop
//保存到allp数组(负责存储P的数组)
atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
}
//如果P还没有cache,那么进行分配
if pp.mcache == nil {
if old == 0 && i == 0 {
if getg().m.mcache == nil {
throw(“missing mcache?”)
}
pp.mcache = getg().m.mcache // bootstrap
} else {
pp.mcache = allocmcache()//分配cache
}
}
}复制代码(2) 释放没被使用的P
for i := nprocs; i < old; i++ {
p := allp[i]
// 将本地任务添加到全局队列中
for p.runqhead != p.runqtail {
p.runqtail–
gp := p.runq[p.runqtail%uint32(len(p.runq))].ptr()
// 插入全局队列的头部
globrunqputhead(gp)
}
//释放P所绑定的cache
freemcache(p.mcache)
p.mcache = nil
//将当前的P的G复用链接到全局
gfpurge(p)
p.status = _Pdead
// can’t free P itself because it can be referenced by an M in syscall
}复制代码经过这两个步骤后,那么我们就创建了一批的P,闲置的P会被放进调度器Sched的空闲链表中
二,创建G的过程从上面的汇编代码可以看出接下来会去调用newproc函数来创建主G,然后用这个主函数去执行runtime.main,然后创建一个线程(这个线程在运行期间专门负责系统监控),接下来就进入GO程序中的main函数去运行了。先看下newproc代码
func newproc(siz int32, fn *funcval) {
argp := add(unsafe.Pointer(&fn), sys.PtrSize)//获取参数的地址
pc := getcallerpc(unsafe.Pointer(&siz))//获取调用方的PC支
systemstack(func() {
newproc1(fn, (*uint8)(argp), siz, 0, pc)//真正创建G的地方
})
}复制代码接下来看下newpro1的主要代码
func newproc1(fn *funcval, argp *uint8, narg int32, nret int32, callerpc uintptr) *g {
//从当前P复用链表来获取G
_p
:= g.m.p.ptr()
newg := gfget(p)
//如果获取失败,则新建一个
if newg == nil {
newg = malg(StackMin)
casgstatus(newg, _Gidle, _Gdead)
allgadd(newg)
}
//将得到的G放入P的运行队列中
runqput(_p
, newg, true)
//下面三个条件分别为:是否有空闲的P;M是否处于自旋状态;当前是否创建runteime.main
if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && runtimeInitTime != 0 {
wakep()
}



}复制代码这个wakep()函数的代码也是值得一看的,这个思想可以用到平时的代码编程中去
func wakep() {
//线程被唤醒后需要绑定一个P,这里使用cas操作,可以避免唤醒过多线程,这里也对应了上面的三个判断条件之一
if !atomic.Cas(&sched.nmspinning, 0, 1) {
return
}
startm(nil, true)
}复制代码startm的代码就留给读者自己去看了,不然感觉整个博文都是代码,主要的思想是:获取一个空闲的P(如果传入的P为空),然后先尝试获取空闲M(空闲的M被调度器schedt管理,这个结构体也可以去看下),获取不到再去创建一个M等。
三,Channel这块就稍微比较简单了,代码也不多,但是看下来收获还是很多的
1,创建Channel先看下结构体定义(有删减)
type hchan struct {
qcount uint // 队列中数据个数
dataqsiz uint // 缓冲槽大小
buf unsafe.Pointer // 指向缓冲槽的指针
elemsize uint16 // 数据大小
closed uint32 // 表示 channel 是否关闭
elemtype *_type // 数据类型
sendx uint // 发送位置索引
recvx uint // 接收位置索引
recvq waitq // 接收等待列表
sendq waitq // 发送等待列表
lock mutex // 锁
}
type sudog struct {
g *g
selectdone *uint32 // CAS to 1 to win select race (may point to stack)
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}复制代码上面的recvq其实是读操作阻塞在channel的G列表,sendq其实是写操作阻塞在channel的G列表,那么G可以同时阻塞在不同的channel上,那么如何解决呢?这时候就引入了sudog,它其实是对G的一个包装,代表在等待队列上的一个G。
接下来看看创建过程
func makechan(t *chantype, size int64) *hchan {
elem := t.elem



// 大小不超过64K
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
var c *hchan
// 整个创建过程还是简单明了的
if elem.kind&kindNoPointers != 0 || size == 0 {
//一次性分配内存
c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
if size > 0 && elem.size != 0 {
c.buf = add(unsafe.Pointer(c), hchanSize)
} else {
c.buf = unsafe.Pointer(c)
}
} else {
c = new(hchan)
c.buf = newarray(elem, int(size))
}
//设置数据大小,类型和缓冲槽大小
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)

return c }复制代码2,发送send函数的代码有点长,接下来就拆分进行说明(1) 如果recvq有G在阻塞,那么就从该队列取出该G,将数据给该G if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}复制代码(2) 如果hchan.buf还有可用的空间,那么就将数据放入 //通过比较qcount和datasiz来判断是否还有可用空间 if c.qcount < c.dataqsiz {
// 将数据放入buf中
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}复制代码(3) hchan.buf满了,那么就会阻塞住了 // Block on the channel. Some receiver will complete our operation for us. gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 {
mysg.releasetime = -1 } //初始化一些参数 mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.selectdone = nil mysg.c = c gp.waiting = mysg gp.param = nil // 将当前 goroutine加入等待队列 c.sendq.enqueue(mysg) goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)复制代码这里我们就可以看到了,如果满了,那么sudog就会出现了,通过初始化后代表当前G进入等待队列 3,接收同理,接收也分为三种情况 (1) 当前有发送goroutine阻塞在channel上,buf满了 if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}复制代码(2) buf中有数据 if c.qcount > 0 {
// 直接从队列中接收
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}复制代码(3) buf中无数据了,那么则会阻塞住
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// 同样的,由sudog代表G去排队
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.selectdone = nil
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)复制代码总结:虽然这块代码逻辑不复杂,但是设计的东西很多,还是用了很多时间,现在对M执行G的逻辑是懂了,但是还不清楚细节,后面会继续研究。总的读下来,首先第一是对并发的机制可以说是很了解了,对以后在编写相关代码肯定很有帮助。第二,学习到了一些编程思想,例如cas操作,如何更好的进行封装和抽象等。 <!-- more --> Channel内存布局 channel是go的内置类型,它可以被存储到变量中,可以作为函数的参数或返回值,它在runtime层对应的数据结构式hchan。hchan维护了两个链表,recvq是因读这个chan而阻塞的G,sendq则是因写这个chan而阻塞的G。waitq队列中每个元素的数据结构为sudog,其中elem用于保存数据。


type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
lock mutex
}
type sudog struct {
g *g
selectdone *uint32
next *sudog
prev *sudog
elem unsafe.Pointer // data element
releasetime int64
nrelease int32 // -1 for acquire
waitlink *sudog // g.waiting list
}
hchan只是channel的头部,头部后面的一段内存连续的数组将作为channel的缓冲区,即用于存放channel数据的环形队列。qcount datasize分别描述了缓冲区当前使用量和容量。若channel是无缓冲的,则size是0,就没有这个环形队列了。
之所以要分开指针类型缓冲区主要是为了区分gc操作,需要将它设置为flagNoScan。并且指针大小固定,可以跟hchan头部一起分配内存,不需要先new(hchan)再newarry。



声明但不make初始化的chan是nil chan。读写nil chan会阻塞,关闭nil chan会panic。
func makechan(t chantype, size int64) *hchan {
elem := t.elem
var c *hchan
if elem.kind&kindNoPointers != 0 || size == 0 {
c = (
hchan)(mallocgc(hchanSize+uintptr(size)*uintptr(elem.size),
nil, flagNoScan))
if size > 0 && elem.size != 0 {
c.buf = add(unsafe.Pointer(c), hchanSize)
} else {
c.buf = unsafe.Pointer(c)
}
} else {
c = new(hchan)
c.buf = newarray(elem, uintptr(size))
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
return c
}



Channel操作
从实现中可见读写chan都要lock,这跟读写共享内存一样都有lock的开销。



数据在chan中的传递方向从chansend开始从入参最终写入recvq中的goroutine的数据域,这中间如果发生阻塞可能先写入sendq中goroutine的数据域等待中转。



从gopark返回后sudog对象可重用。



同步读写
写channel c<-x 调用runtime.chansend。读channel <-c 调用runtime.chanrecv。总结同步读写的过程就是:



写chan时优先检查recvq中有没有等待读chan的goroutine,若有从recvq中出队sudoG。syncsend将要写入chan的数据ep复制给刚出队的sudoG的elem域。通过goready唤醒接收者G,状态设置为_Grunnable,之后放进P本地待运行队列。之后这个读取到数据的G可以再次被P调度了。
写chan时如果没有G等待读,当前G因等待写而阻塞。这时创建或获取acquireSudog,封装上要写入的数据进入sendq队列。同时当前Ggopark休眠等待被唤醒。
读chan时优先唤醒sendq中等待写的goroutine,并从中获取数据;若没人写则将自己挂到recvq中等待唤醒。
func chansend(t *chantype, c *hchan, ep unsafe.Pointer,
block bool, callerpc uintptr) bool {

lock(&c.lock)
if c.dataqsiz == 0 { // synchronous channel
sg := c.recvq.dequeue()
if sg != nil { // found a waiting receiver
unlock(&c.lock)
recvg := sg.g
syncsend(c, sg, ep)
goready(recvg, 3)
return true
}
// no receiver available: block on this channel.
mysg := acquireSudog()
mysg.elem = ep

c.sendq.enqueue(mysg)
goparkunlock(&c.lock, “chan send”, traceEvGoBlockSend, 3)
// someone woke us up.
releaseSudog(mysg)
return true
}
}
func chanrecv(t *chantype, c *hchan, ep unsafe.Pointer, block bool)
(selected, received bool) {
if c.dataqsiz == 0 { // synchronous channel
sg := c.sendq.dequeue()
if sg != nil {
unlock(&c.lock)
typedmemmove(c.elemtype, ep, sg.elem)
gp.param = unsafe.Pointer(sg)
goready(gp, 3)
return true, true
}
// no sender available: block on this channel.
mysg := acquireSudog()
mysg.elem = ep
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, “chan receive”, traceEvGoBlockRecv, 3)
// someone woke us up
releaseSudog(mysg)
return recvclosed(c, ep)
}
}
异步读写
异步与同步的区别就是读写时会优先检查缓冲区有没有数据读或有没有空间写。并且真正读写chan后会发生缓冲区变化,这时可能之前阻塞的goroutine有机会写和读了,所以要尝试唤醒它们。 总结过程:



写chan时缓冲区已满,则将当前G和数据封装好放入sendq队列中等待写入,同时挂起gopark当前goroutine。若缓冲区未满,则直接将数据写入缓冲区,并更新缓冲区最新数据的index以及qcount。同时尝试从recvq中唤醒goready一个之前因为缓冲区无数据可读而阻塞的等待读的goroutine。
读chan时首先看缓冲区有没有数据,若有则直接读取,并尝试唤醒一个之前因为缓冲区满而阻塞的等待写的goroutine,让它有机会写数据。若无数据可读则入队recvq。
func chansend(t *chantype, c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// asynchronous channel
var t1 int64
for futile := byte(0); c.qcount >= c.dataqsiz; futile = traceFutileWakeup {
mysg := acquireSudog()
c.sendq.enqueue(mysg)
goparkunlock(&c.lock, “chan send”, traceEvGoBlockSend|futile, 3)
// someone woke us up - try again
releaseSudog(mysg)
}
// write our data into the channel buffer
typedmemmove(c.elemtype, chanbuf(c, c.sendx), ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
// wake up a waiting receiver
sg := c.recvq.dequeue()
if sg != nil {
goready(sg.g, 3)
}
return true
}
func chanrecv(t *chantype, c *hchan, ep unsafe.Pointer, block bool)
(selected, received bool) {
// asynchronous channel
for futile := byte(0); c.qcount <= 0; futile = traceFutileWakeup {
mysg := acquireSudog()
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, “chan receive”, traceEvGoBlockRecv|futile, 3)
// someone woke us up - try again
releaseSudog(mysg)
}
typedmemmove(c.elemtype, ep, chanbuf(c, c.recvx))
memclr(chanbuf(c, c.recvx), uintptr(c.elemsize))
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount–
// ping a sender now that there is space
sg := c.sendq.dequeue()
if sg != nil {
goready(sg.g, 3)
}
return true, true
}
关闭
通过goready唤醒recvq中等待读的goroutine,之后唤醒所有sendq中等待写的goroutine。因此close chan相当于解除所有因它阻塞的gouroutine的阻塞。



func closechan(c *hchan) {
c.closed = 1
// release all readers
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}…
goready(gp, 3)
}
// release all writers
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}…
goready(gp, 3)
}
}
写closed chan或关闭 closed chan会导致panic。读closed chan永远不会阻塞,会返回一个通道数据类型的零值,返回给函数的参数ep。



所以通常在close chan时需要通过读操作来判断chan是否关闭。



if v, open := <- c; !open {
// chan is closed
}
Happens before
在go memory model 里讲了happens-before问题很有意思。其中有一些跟chan相关的同步规则可以解释一些一直以来的疑问,记录如下:



对带缓冲chan的写操作 happens-before相应chan的读操作
关闭chan happens-before 从该chan读最后的返回值0
不带缓冲的chan的读操作 happens-before相应chan的写操作
var c = make(chan int, 10)
var a string
func f() {
a = “hello, world” //(1)
c <- 0 // (2)
}
func main() {
go f()
<- c //(3)
print(a) //(4)
}
(1) happens-before(2) (3) happens-before(4),再根据规则可知(2) happens(3)。因此(1)happens-before(4),这段代码没有问题,肯定会输出hello world。



var c = make(chan int)
var a string
func f() {
a = “hello, world” //(1)
<-c // (2)
}
func main() {
go f()
c <- 0 //(3)
print(a) //(4)
}
同样根据规则三可知(2)happens-before(3) 最终可以保证(1) happens-before(4)。若c改成待缓冲的chan,则结果将不再有任何同步保证使得(2) happens-before(3)。



看一个简单的例子,了解一下channel的使用。



package main



import “fmt”



func main() {
// Create a new channel with make(chan val-type).
// Channels are typed by the values they convey.
messages := make(chan string)
// Send a value into a channel using the channel <-
// syntax. Here we send "ping" to the messages
// channel we made above, from a new goroutine.
go func() { messages <- “ping” }()
// The <-channel syntax receives a value from the
// channel. Here we’ll receive the "ping" message
// we sent above and print it out.
msg := <-messages
fmt.Println(msg)
}
channel的功能点:



队列
阻塞
当一端阻塞,可以被另一个端唤醒
我们围绕这3点功能展开,讲讲具体的实现。



channel结构
注释标注了几个重要的变量,从功能上大致可以分为两个功能单元,一个是 ring buffer,用于存数据; 一个是存放 goroutine 的队列。



type hchan struct {
qcount uint // 当前队列中的元素个数
dataqsiz uint // 缓冲队列的固定大小
buf unsafe.Pointer // 缓冲数组
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // 下一次发送的 index
recvx uint // 下一次接收的 index
recvq waitq // 接受者队列
sendq waitq // 发送者队列



// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex } Ring Buffer 主要是以下变量组成的功能, 一个 buf 存储实际数据,两个指针分别代表发送,接收的索引位置,配合 size, count 在数组大小范围内来回滑动。


qcount uint // 当前队列中的元素个数
dataqsiz uint // 缓冲队列的固定大小
buf unsafe.Pointer // 缓冲数组
sendx uint // 下一次发送的 index
recvx uint // 下一次接收的 index
举个例子,假设我们初始化了一个带缓冲的channel, ch := make(chan int, 3), 那么它初始状态的值为:



qcount = 0
dataqsiz = 3
buf = [3]int{0, 0, 0} // 表示长度为3的数组
sendx = 0
recvx = 0
第一步,向 channel 里 send 一个值, ch <- 1, 因为现在缓冲还没满,所以操作后状态如下:



qcount = 1
dataqsiz = 3
buf = [3]int{1, 0, 0} // 表示长度为3的数组
sendx = 1
recvx = 0
快进两部,连续向 channel 里 send 两个值 (2, 3),状态如下:



qcount = 3
dataqsiz = 3
buf = [3]int{1, 2, 3} // 表示长度为3的数组
sendx = 0 // 下一个发送的 index 回到了0
recvx = 0
从 channel 中 receive 一个值, <- ch, 状态如下:



qcount = 2
dataqsiz = 3
buf = [3]int{1, 2, 3} // 表示长度为3的数组
sendx = 0 // 下一个发送的 index 回到了0
recvx = 1 // 下一个接收的 index
阻塞
我们看下,如果 receive channel 时,channel 的 buffer中没有数据是怎么处理的。逻辑在 chanrecv 这个方法中,它的大致流程如下,仅保留了阻塞操作的代码。



func chanrecv(t *chantype, c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// 检查 channdel 是否为 nil



// 当不阻塞时,检查buffer大小,当前大小,检查chennel是否关闭,看看是否能直接返回

// 检查发送端是否有等待的goroutine,下部分会提到

// 当前buffer中有数据,则尝试取出。

// 如果非阻塞,直接返回

// 没有sender等待,buffer中没有数据,则阻塞等待。
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.selectdone = nil
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
//关键操作:设置 goroutine 状态为 waiting, 把 G 和 M 分离
goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)

// someone woke us up
// 被唤醒,清理 sudog
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed } 这里的操作就是 创建一个 当前 goroutine 的 sudog, 然后把这个 sudog 放入 channel 的接受者等待队列;设置当前 G 的状态,和 M分离,到这里当前G就阻塞了,代码不会执行下去。 当被唤醒后,执行sudog的清理操作。这里接受buffer中的值的指针是 ep 这个变量,被唤醒后好像没有向 ep 中赋值的操作。这个我们下部分会讲。


sudog
还剩最后一个疑问,当一个goroutine因为channel阻塞,另一个goroutine是如何唤醒它的。



channel 中有两个 waitq 类型的变量, 看下结构发现,就是sudog的链表,关键是 sudog。sudog中包含了goroutine的引用,注意一下 elem这个变量,注释说可能会指向stack。



type waitq struct {
first *sudog
last *sudog
}



type sudog struct {
// The following fields are protected by the hchan.lock of the
// channel this sudog is blocking on. shrinkstack depends on
// this.
g *g
selectdone *uint32 // CAS to 1 to win select race (may point to stack)
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)



// The following fields are never accessed concurrently.
// waitlink is only accessed by g.

acquiretime int64
releasetime int64
ticket uint32
waitlink *sudog // g.waiting list
c *hchan // channel } 讲阻塞部分的时候,我们看到goroutine被调度之前,有一个 enqueue操作,这时,当前G的sudog已经被存入recvq中,我们看下发送者这时的操作。


这里的操作是,sender发送的值 直接被拷贝到 sudog.elem 了。然后唤醒 sudog.g ,这样对面的receiver goroutine 就被唤醒了。具体请下面的注释。



func chansend(t *chantype, c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 检查工作



// 如果能从 chennel 的 recvq 弹出 sudog, 那么直接send
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) })
return true
}

// buffer有空余空间,返回; 阻塞操作 }


func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func()) {
// 处理 index



// 关键
if sg.elem != nil {
// 这里是根据 elemtype.size 复制内存
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}

// 一些处理

// 重新设置 goroutine 的状态,唤醒它
goready(gp, 4) }


func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
// src is on our stack, dst is a slot on another stack.



// Once we read sg.elem out of sg, it will no longer
// be updated if the destination's stack gets copied (shrunk).
// So make sure that no preemption points can happen between read & use.
dst := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
memmove(dst, src, t.size) }


// memmove copies n bytes from “from” to “to”.
// in memmove_*.s
//go:noescape
func memmove(to, from unsafe.Pointer, n uintptr)
select
在看 chanrecv()方法 时,发现了一个 block 参数,代表操作是否阻塞。一般情况下,channel 都是阻塞的(不考虑buffer),那什么时候非阻塞呢?



第一个想到的就是 select, 在写了default case的时候,其他的channel是非阻塞的。



还有一个可能不常用,就是 channel 的反射 value, 可以是非阻塞的,这个方法是public的,我们先看下简单的。



func (v Value) TryRecv() (x Value, ok bool)
func (v Value) TrySend(x Value) bool
select 就复杂一点点,首先在源码中发现一段注释:



// compiler implements
//
// select {
// case c <- v:
// … foo
// default:
// … bar
// }
//
// as
//
// if selectnbsend(c, v) {
// … foo
// } else {
// … bar
// }
//
func selectnbsend(t *chantype, c *hchan, elem unsafe.Pointer) (selected bool) {
return chansend(t, c, elem, false, getcallerpc(unsafe.Pointer(&t)))
}



// compiler implements
//
// select {
// case v = <-c:
// … foo
// default:
// … bar
// }
//
// as
//
// if selectnbrecv(&v, c) {
// … foo
// } else {
// … bar
// }
//
func selectnbrecv(t *chantype, elem unsafe.Pointer, c *hchan) (selected bool) {
selected, _ = chanrecv(t, c, elem, false)
return
}
如果是一个 case + default 的模式,那么编译器就调用以上方法来实现。



如果是多个 case + default 的模式呢?select 在runtime到底是如何执行的?写个简单的select编译一下。



package main



func main() {
var ch chan int
select {
case <-ch:
case ch <- 1:
default:
}
}
go tool compile -S -l -N test.go > test.s 结果中找一下关键字,例如:



0x008c 00140 (test.go:5) CALL runtime.newselect(SB)
0x00ad 00173 (test.go:6) CALL runtime.selectrecv(SB)
0x00ec 00236 (test.go:7) CALL runtime.selectsend(SB)
0x0107 00263 (test.go:8) CALL runtime.selectdefault(SB)
0x0122 00290 (test.go:5) CALL runtime.selectgo(SB)
这里 selectgo 是实际运行的方法,找一下,注意注释。先检查channel是否能操作,如果不能操作,就走 default 逻辑。



loop:
// pass 1 - look for something already waiting
var dfl *scase
var cas *scase
for i := 0; i < int(sel.ncase); i++ {
cas = &scases[pollorder[i]]
c = cas.c



    switch cas.kind {
// 接受数据
case caseRecv:
sg = c.sendq.dequeue()
// 如果有 sender 在等待
if sg != nil {
goto recv
}
// 当前buffer中有数据
if c.qcount > 0 {
goto bufrecv
}
// 关闭的channel
if c.closed != 0 {
goto rclose
}
case caseSend:
if raceenabled {
racereadpc(unsafe.Pointer(c), cas.pc, chansendpc)
}
// 关闭
if c.closed != 0 {
goto sclose
}
// 有 receiver 正在等待
sg = c.recvq.dequeue()
if sg != nil {
goto send
}
// 有空间接受
if c.qcount < c.dataqsiz {
goto bufsend
}
// 走default
case caseDefault:
dfl = cas
}
}

if dfl != nil {
selunlock(scases, lockorder)
cas = dfl
goto retc
}

Category golang