golang调用原生epoll引起event loop阻塞问题

http://xiaorui.cc/archives/6758
https://github.com/panjf2000/gnet

golang标准库net很优秀,可以让开发者轻易构建非阻塞网络服务,但开发爽快带来的问题协程数加大,比如在net/http里一个连接两个协程,grpc算是业务和keepalive心跳是四个协程,数据的进出是通过channel传输。



golang netpoll抽象了epoll事件的调用,借助runtime的gopark&goready实现就绪协程的调度,让应用层用同步方法构建io异步的网络应用。



该文章原文地址 http://xiaorui.cc/archives/6758



golang gnet
问题:



那么如何规避netpoll的协程太多的问题? 业界通用的方案是通过原syscall epoll实现网络应用,比如evio、gnet库。我先前使用过evio构建过不少服务,但当你的业务调用含有阻塞逻辑时会使event loop陷入同步阻塞。当然在evio里可以开多个event loop来加大并发,且减少同步的可能,但问题要开多少个event loop?



假设同一个event loop中有两个fd被唤醒,第一个fd完成了协议解析开始执行业务逻辑,业务逻辑是访问另一个三方的http api,但对端的api处理很慢,这就意味着你需要等待api的返回,而不能处理第二个fd事件。



那么为什么不把阻塞逻辑用go func异步出去?同一个fd先来后来好几个事件,都异步?那么又要考虑socket并发安全问题。



如不能解决阻塞问题,那么自定义epoll会特别的低效。业务逻辑的阻塞行为压根是没法避免的,除非像redis服务那样,它的业务逻辑就是访问内存的数据结构,无阻塞逻辑。



evio的另外几个问题



evio封装的epoll各类事件看起来很完美,但依旧发现了他几个问题。



第一,多个event loop引起epoll listen fd惊群问题。



虽然2.6早已解决accept惊群,但对于错误性的同时绑定listen fd依然带来惊群。nginx对于epoll listen fd是通过accept mutex实现的,简单说同一时间只有一个进程在监听listen fd,谁拿锁谁去监听。高内核4.5后的EPOLLEXCLUSIVE和内核3.6 reuseport可在内核层面实现负载均衡。



除了内核方法外,还可通过reactor的架构模型也是可以解决listen fd惊群问题。



第二, 用来唤醒epoll的eventfd写入数据没有读出。



第三, loopWrite在内核缓冲区已满无法一次写入时出现写入数据丢失。



为了规避event loop的阻塞问题,我曾经在2019年时跟evio的作者沟通过方案,可以为他提交一个协程池的方案来规避阻塞问题,他的反应很冷淡…



😅 最后还是没给他提交pr。跟同事一起在公司内部基于evio封装了一个叫goepoll网络库,重要的是加入了协程池的支持。可惜的是公司没考虑开源。好在2019年有个gnet库横空出世,不仅解决了evio的种种问题,还设计了ractor网络模型,且加入了协程池的支持。



gnet的协程池设计?



项目地址:https://github.com/panjf2000/gnet ,通过benchmark得出gnet要比我们自己封装的goepoll要更稳定,接口更友好 😅。



下面是gnet官方给出协程池的用法样例。react业务逻辑中把阻塞逻辑扔到ants构建的协程池里,最后通过asyncWrite()来唤醒event loop写入返回。



Go
1
// xiaorui.cc
2

3
func (es *echoServer) React(c gnet.Conn) (out []byte, action gnet.Action) {
4
data := append([]byte{}, c.Read()…)
5
c.ResetBuffer()
6

7
// Use ants pool to unblock the event-loop.
8
_ = es.pool.Submit(func() {
9
time.Sleep(1 * time.Second)
10
c.AsyncWrite(data)
11
})
12

13
return
14
}
对于阻塞逻辑的处理可以直接go func,也可以使用协程池,主要的关键点在于调用AsyncWrite方法。



Go
1
// xiaorui.cc
2

3
func (c *conn) AsyncWrite(buf []byte) {
4
if encodedBuf, err := c.codec.Encode(c, buf); err == nil {
5
_ = c.loop.poller.Trigger(func() error { // 尝试唤醒eventfd
6
if c.opened {
7
c.write(encodedBuf) // 写到ringbuffer里并加入事件
8
pool.PutBytes(buf)
9
}
10
return nil
11
})
12
}
13
}
AsyncWrite会唤醒fd所在的eventLoop.Poller,唤醒的方法是激活eventloop里的event fd。不仅是唤醒,而且会把异步执行的func放在一个队列里,这个函数队列是slice实现的。AsyncWrite中的write把resp raw写入到ringbuffer数据结构里,然后进行socket写入,当出现eagain时加入读写事件,直到ringbuffer为空。



Go
1
// xiaorui.cc
2

3
func (c *conn) write(buf []byte) {
4
if !c.outboundBuffer.IsEmpty() {
5
_, _ = c.outboundBuffer.Write(buf)
6
return
7
}
8
n, err := unix.Write(c.fd, buf)
9
if err != nil {
10
if err == unix.EAGAIN {
11
_, _ = c.outboundBuffer.Write(buf)
12
_ = c.loop.poller.ModReadWrite(c.fd)
13
return
14
}
15
_ = c.loop.loopCloseConn(c, err)
16
return
17
}
18
if n < len(buf) {
19
_, _ = c.outboundBuffer.Write(buf[n:])
20
_ = c.loop.poller.ModReadWrite(c.fd)
21
}
22
}
当event loop的poller在epoll_wait拿到wakeFD时,遍历执行函数队列里的函数。为什么不在异步协程里直接write fd,主要是为了保证fd写入安全,所有的write fd都统一由于epoll poller来操作。



Go
1
// xiaorui.cc
2

3
type Poller struct {
4
fd int // epoll fd
5
wfd int // wake fd
6
wfdBuf []byte // wfd buffer to read packet
7
asyncJobQueue internal.AsyncJobQueue
8
}
9

10
// 存放异步协程最后的resp写入func
11
type AsyncJobQueue struct {
12
lock sync.Locker
13
jobs []func() error
14
}
15

16
// 把func放到队列中,并且通过写wfd事件来唤醒event loop的poller
17
func (p *Poller) Trigger(job internal.Job) error {
18
if p.asyncJobQueue.Push(job) == 1 {
19
_, err := unix.Write(p.wfd, b)
20
return err
21
}
22
return nil
23
}
24

25
// 阻塞调用epoll wait,直到有新的事件
26
func (p *Poller) Polling(callback func(fd int, ev uint32) error) (err error) {
27
el := newEventList(InitEvents)
28
var wakenUp bool
29
for {
30
n, err0 := unix.EpollWait(p.fd, el.events, -1)
31

32
// 遍历执行所有的事件
33
for i := 0; i < n; i++ {
34
if fd := int(el.events[i].Fd); fd != p.wfd {
35

36
} else {
37
// 唤醒的fd是wfd
38
wakenUp = true
39
_, _ = unix.Read(p.wfd, p.wfdBuf)
40
}
41
}
42
if wakenUp {
43
// 执行asyncWrite的的收尾动作
44
if err = p.asyncJobQueue.ForEach(); err != nil {
45
return
46
}
47
}
48

49
}
50
}
51

52
func (q *AsyncJobQueue) ForEach() (err error) {
53
for i := range jobs {
54
// 遍历执行该poller里所有异步协程收尾工作
55
if err = jobsi; err != nil {
56
return err
57
}
58
}
59
}
为什么要使用协程池?



我们在二次开发evio时加入了协程池,gnet同样也使用了协程池。原因在于一方面规避了more stack的发生,另一方面避免系统抖动时协程大量的产生,退出的协程依旧在allg结构中造成gc的扫描问题。



协程调度延迟加大问题?



event loop通过epoll_wait发现无事件时会syscall阻塞。如果event loop数目超过gomaxprocs,那么其他业务协程很大程度是拿不到调度,依赖sysmon来抢占并handoffp解绑pmg才能拿到调度。



当然epoll_wait可以非阻塞的调用,就是timeout = 0,这样其他协程虽然拿到调度,但对于poller来说无脑调用epoll_wait会增加系统消耗。所以event loop的数目要好好斟酌。我们的经验值是3/5。😁



总结:



使用自定义epoll封装网络服务,性能相当的友好。我们已经在多个项目中实践了自定义epoll方案,qps普遍能提高最少20%左右。蚂蚁金服出品的高性能sidecar mosn同样实现了自定义epoll。



前公司的领导是广发证券的架构师,他设计的推送服务就是通过原生epoll来构建的,但没有使用gnet/evio那种syscall epoll方法,而是使用c封装epoll系统调用常驻监听,使用mmap开辟一个无锁队列,这样c和go共享这个队列地址来通信。


Category golang