Channel 是 Go 语言的主要同步和通信原语,它们必须速度快且可扩展。
目标:
令单线程(无竞争)的 Channel 操作更快
令有竞争带缓存(生产者消费者)的 Channel 操作更快
令无阻塞失败操作(如检查 Channel 是否已关闭)更快
令信号量 Channel(chan struct{})更快
令 select 语句更快
非目标:
令 Channel 完全无锁(这会导致实现的复杂度大幅提升且在普通使用场景下变得泵满)
令有竞争的同步 Channel 操作更快。
Channel 的类型
Go 语言有 3 种不同类型的 Channel:
同步 Channel。它们不需要任何缓冲以及缓冲控制代码。而且它们实现了直接传递的语义(一个 Goroutine 会直接选择它的接收方并与其完成通信)
异步 Channel。这实际上就是基于环状缓冲的传统生产者消费者队列。它们没有实现传递语义:一个被解锁的消费者会和其他消费者一起竞争,如果它没能胜利就会重新被阻塞
带 0 体积元素的异步 Channel(chan struct{})。这实际上就是信号量。它们不需要缓冲区(只占用 O(1) 的内存),也不实现传递语义
同步发送/接收
在我们深入到 select 之前,我们先来想想一般的发送/接收是怎么工作的。
同步 Channel 在多数时候都是由互斥锁保护的,除非是在执行无须阻塞的快速失败代码路径(如从一个空的 Channel 中进行无阻塞接收)。同步 Channal 包含如下信息:
struct Hchan {
Lock;
bool closed;
SudoG* sendq; // waiting senders
SudoG* recvq; // waiting receivers
};
发送操作会占用互斥锁,并检查它是否需要阻塞或满足一个反向操作:
bool syncchansend(Hchan *c, T val, bool block) {
if(c->closed) // 快速失败路径
panic(“closed”);
if(!block && c->recvq == nil) // 快速失败路径
return false;
lock(c);
if(c->closed) {
unlock(c);
panic(“closed”);
}
if(sg = removewaiter(&c->recvq)) {
// 找到一个正在阻塞的接收方,与之通信
unlock(c);
sg->val = val;
sg->completed = true;
unblock(sg->g);
return true;
}
if(!block) {
unlock(c);
return false;
}
// 阻塞并等待接收方
sg->g = g;
sg->val = val;
addwaiter(&c->sendq, sg);
unlock(c);
block();
if(!sg->completed)
panic(“closed”); // 被 close 操作解锁
// 由一个接收方解锁
return true;
}
异步发送/接收
异步发送/接收在不需要操作等待队列时是无锁的,而等待队列由一个互斥锁保护。非阻塞失败操作同样是短路的。
我们首先来看看非阻塞操作时怎么进行的。
一个异步 Channel 包含以下数据:
struct Hchan {
uint32 cap; // Channel 容量
Elem* buf; // 大小为 cap 的环状缓冲
// 发送和接收位置
// 低 32 位代表在 buf 中的位置
// 高 32 位代表环状缓冲的当前圈数
uint64 sendx;
uint64 recvx;
};
struct Elem {
// 当前圈数
// 当前圈数为 0, 2, 4, … 时,元素可读
// 当前圈数为 1, 3, 5, … 时,元素可写
uint32 lap;
T val; // 用户数据
};
发送操作通过使用 CAS 递增 sendx 来实现同步,成功递增 sendx 的 Goroutine 得以写入元素。发送与接收间的同步通过元素的 lap 变量实现,基本而言,lap 值表示该元素在当前圈数(sendx/recvx 的高 32 位)是否可读/可写。
如下即为发送操作:
bool asyncchansend_nonblock(Hchan* c, T val) {
uint32 pos, lap, elap;
uint64 x, newx;
Elem *e;
for(;;) {
x = atomicload64(&c->sendx);
pos = (uint32)x;
lap = (uint32)(x » 32);
e = &c->buf[pos];
elap = atomicload32(&e->lap);
if(lap == elap) {
// 该元素已可在该圈可写
// 尝试获得写入该元素的权利
if(pos + 1 < c->cap) // 获取下一个 pos
newx = x + 1; // 直接加
else
newx = (uint64)(lap + 2) « 32; // 下一圈
if(!cas64(&c->sendx, x, newx))
continue; // 输掉了,重试
// 获得了元素的所有权,可以非原子地写入
e->val = val;
// 使元素可读
atomicstore32(&e->lap, elap + 1);
return true;
} else if((int32)(lap - elap) > 0) {
// 该元素还未被上一圈读出,
// Channel 满
return false;
} else {
// 该元素已在该圈上被写入,
// 这意味着 c->sendx 也已经改变了
// 重试
}
}
}
接收操作则是完全对称的,除了 recvs 由第 1 圈开始而且是读元素而不是写元素。
现在我们来看看阻塞操作是怎么实现的。Channel 结构体还包含一个互斥锁和发送方/接收方等待队列:
struct Hchan {
// …
Lock;
SudoG* sendq;
SudoG* recvq;
};
要实现阻塞发送,一个 Goroutine 首先尝试进行非阻塞发送。如果它成功了,那么它会查看是否有接收方等待,如果有的话就解锁其中一个接收方。
如果非阻塞发送失败了(Channel 已满),它会锁定互斥锁,将自己添加到发送方等待队列,然后重新检查 Channel 是否仍满。若果 Channel 仍满,那么 Goroutine 阻塞;否则,Goroutine 将自己从等待队列中移除,解锁互斥锁并重试。
阻塞接收的过程完全一致,除了 s/send/recv/ 、 s/recv/send/(笑)。
要实现这样一个阻塞算法最巧妙的地方在于确保不会发生死锁(一个发送方被无限期地阻塞在一个未满 Channel 上,或者一个接收方被无限期地阻塞在一个非空 Channel 上)。通过这样检查、保存、再检查,我们确保(1)发送方看到一个接收方等待者并解锁它,或(2)接收方看到缓冲中的元素并消费它,或(3)情形 1 和 2 同时存在(在这种情况下我们通过使用互斥锁来解决竞争);但不会发生(4)发送方看不到接收方等待者或接收方但不到缓冲里的元素并无限期阻塞。
以下是阻塞发送的算法:
void asyncchansend(Hchan* c, T val) {
for(;;) {
if(asyncchansend_nonblock(c, val)) {
// 发送成功,看看我们要不要解锁一个接收方
if(c->recvq != nil) {
lock(c);
sg = removewaiter(&c->recvq);
unlock(c);
if(sg != nil)
unblock(sg->g);
}
return;
} else {
// 队列已满
lock(c);
sg->g = g;
addwaiter(&c->sendq, sg);
if(notfull(c)) {
removewaiter(&c->sendq, sg);
unlock(c);
continue;
}
unlock(c);
block();
// 重新尝试发送
}
}
}
struct{} 发送/接收
0 体积异步 Channel 大体上与非 0 体积异步 Channel 相同:
在非阻塞情形下,操作是无锁的
等待队列仍由互斥锁保护
非阻塞失败操作是短路的
区别在于
Hchan 只包含一个计数器而不是发送/接收位置和环状缓冲,该计数器代表 Channel 中的元素数量
非阻塞发送/接受会使用 CAS 循环来更新计数器
满/空判断只需要检查计数器的值
其他的部分,包括阻塞算法,则是一样的。
close
关闭操作会锁定互斥锁,将设置 closed 标志位并解锁所有等待者。异步发送/接收操作在阻塞前会检查 closed 标志位。
这实现了与异步发送/接收阻塞相同的保证,即(1)关闭操作看到一个等待者,或(2)一个等待者看到 closed 标志位被设置,或(3)情形 1 与 2 同时发生(此时通过互斥锁来避免竞争)
select
现在我们可以来学习 select 了。
Select 操作不会立刻锁定所有相关 Channel 的互斥锁,而是会对每个 Channel 进行细粒度的操作。
Select 包含 4 个阶段:
对所有相关的 Channel 进行乱序以提供一个伪随机次序保证(接下来的每一步都会在这个乱序 Channel 列表的基础上工作)
一个一个地检查每个 Channel,看看它们之中是否有人已经准备好通信了,如果是的话就进行通信并退出。这使得 select 语句不需要更早地阻塞且有更好的可扩展性,因为它们不需要排序并锁定这些互斥锁。除此之外,这样的 select 如果发现第一个 Channel 已经准备好的话甚至不需要接触所有的 Channel
准备阻塞所有 Channel
阻塞。返回第 1 步
对于第 2 步我们需要再解释一下。
本质上来讲,它的工作原理和异步发送/接受的阻塞操作时相同的。也就是,锁定 Channel 的互斥锁,将 Goroutine 放入到发送方/接收方等待队列,然后重新检查 Channel 是否已经准备好通信。如果 Channel 还没有准备好,那就继续下一个 Channel;否则,就将自己从等待队列中移除并回到第 1 步。
还有另一个有趣的地方,select 让我们成为了多个 Channel 的等待者,但我们不希望多个同步 Channel 操作都能利用该 select 完成通信(对于同步 Channel 来说,解锁即完成通信)。为了避免这样的情况发生,由 select 放入到等待队列中的实体包含一个指向 select 全局状态字的指针。在解锁这样的一个等待者时,其他 Goroutine 会首先尝试 CAS(statep 、 nil 、 sg),以获得解锁等待者或与等待者通信的权利。如果 CAS 失败了,Goroutine 会无视这个等待者(它已经被其他人唤醒了)。
这个算法要求所有类型的 Channel 实现 isready(c) 函数,但这不是什么很大的问题。一次 select 操作的高层算法如下:
Scase *select(Select *sel) {
randomize channel order;
for(;;) {
// 第 1 步
foreach(Scase *cas in sel) {
if(chansend/recv_nonblock(cas->c, …))
return cas;
}
// 第 2 步
selectstate = nil;
foreach(Scase *cas in sel) {
lock(cas->c);
cas->sg->g = g;
cas->sg->selectstatep = &selectstate;
addwaiter(&cas->c->sendq/recvq, cas->sg);
if(isready(cas->c)) {
unlock(c);
goto ready;
}
unlock(cas->c);
}
// 第 3 步
block();
ready:
CAS(&selectstate, nil, 1);
foreach(Scase *cas in sel) {
lock(cas->c);
removewaiter(&cas->c->sendq/recvq, cas->sg);
unlock(cas->c);
}
// 如果我们是被同步 Channel 操作解锁的,
// 那么通信已经完成
if(selectstate > 1)
return selectstate; // 代表完成了的 case
}
}