https://github.com/tidwall/evio
https://github.com/Allenxuxu/eviop/
https://github.com/Allenxuxu/gev
最近翻了 evio 的源码,发现一些问题,主要集中在 linux 平台 epoll 上和读写的处理。
用来唤醒 epoll 的 eventfd 写入数据没有读出
listen 的 fd 注册到所有事件循环,epoll 的惊群问题
loopWrite 在内核缓冲区满,无法一次写入时,出现写入数据丢失
eventfd 的使用问题
在 internal/internal_linux.go 中封装了 epoll 的使用 API 。
// Poll …
type Poll struct {
fd int // epoll fd
wfd int // wake fd
notes noteQueue
}
1
2
3
4
5
6
在 OpenPoll 时,会创建一个 eventfd 并将 fd 赋值给 Poll 的 wfd 成员, 并且注册到 epoll 监听可读事件。
当需要唤醒当前 epoll 时,提供了 Trigger 方法
// Trigger …
func (p *Poll) Trigger(note interface{}) error {
p.notes.Add(note)
_, err := syscall.Write(p.wfd, []byte{0, 0, 0, 0, 0, 0, 0, 1})
return err
}
1
2
3
4
5
6
这是往刚刚提到的 eventfd 中写入八字节数据,此时 epol l会被唤醒 epoll_wait 函数返回。 但是,evio 并没有去把 8 个字节的数据读取出来,内核缓冲区会不断积压,并且 evio 使用的是 epoll 的LT模式(默认模式),只要缓冲区中有数据,epoll 就会不断唤醒。这应该算是一个 bug 吧。
listen 的 fd 注册到所有事件循环,epoll 的惊群问题
evio 可以指定启动多个事件循环。evio 将 listen fd 注册到每一个事件循环中(epoll)监听可读事件,所以当一个连接到来时,所有的事件循环都会唤醒。
// create loops locally and bind the listeners.
for i := 0; i < numLoops; i++ {
l := &loop{
idx: i,
poll: internal.OpenPoll(),
packet: make([]byte, 0xFFFF),
fdconns: make(map[int]*conn),
}
for _, ln := range listeners {
l.poll.AddRead(ln.fd)
}
s.loops = append(s.loops, l)
}
// start loops in background
s.wg.Add(len(s.loops))
for _, l := range s.loops {
go loopRun(s, l)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
这并不是一个 bug ,因为最终只有一个线程可以accept调用返回成功,其他线程(协程)的accept函数调用返回EAGAIN错误 ,作者也做出了处理。
nfd, sa, err := syscall.Accept(fd)
if err != nil {
if err == syscall.EAGAIN {
return nil
}
return err
} 1 2 3 4 5 6 7 并且作者还利用每个事件循环都会被唤醒,来做客户端连接的负载均衡策略。
LeastConnections : 当存在其他事件循环的注册的客户端连接数比当前事件循环的连接数少的时候,直接 return nil 。当有两个最下连接数相同的时候,也没关系,因为 accept 会保证只有一个可以成功。
RoundRobin: 原理也是一样,每个事件循环都会去判断 int(atomic.LoadUintptr(&s.accepted)) % len(s.loops) ,轮到自己了,才继续执行,否则 return nil 。
if ln.fd == fd {
if len(s.loops) > 1 {
switch s.balance {
case LeastConnections:
n := atomic.LoadInt32(&l.count)
for _, lp := range s.loops {
if lp.idx != l.idx {
if atomic.LoadInt32(&lp.count) < n {
return nil // do not accept
}
}
}
case RoundRobin:
idx := int(atomic.LoadUintptr(&s.accepted)) % len(s.loops)
if idx != l.idx {
return nil // do not accept
}
atomic.AddUintptr(&s.accepted, 1)
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
这样的做法没有问题,但是个人觉得 muduo 的做法似乎更好。
loopWrite 在内核缓冲区满,无法一次写入时,出现写入数据丢失
func loopWrite(s *server, l *loop, c *conn) error {
if s.events.PreWrite != nil {
s.events.PreWrite()
}
n, err := syscall.Write(c.fd, c.out)
if err != nil {
if err == syscall.EAGAIN {
return nil
}
return loopCloseConn(s, l, c, err)
}
if n == len(c.out) {
c.out = nil
} else {
c.out = c.out[n:]
}
if len(c.out) == 0 && c.action == None {
l.poll.ModRead(c.fd)
}
return nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
当内核缓冲区满时, syscall.Write(c.fd, c.out) 会无法全部写入,可以通过返回值得到已经写入的个数。
if n == len(c.out) {
c.out = nil
} else {
c.out = c.out[n:]
}
1
2
3
4
5
作者也做出了处理,判断了 返回值 n 不是 c.out 长度的情况,c.out = c.out[n:] 。
但是作者并没有去注册 epoll 可写事件啊。
在当前文件搜索 ModReadWrite ,注册可读可写的事件,共有两处。一次是 loopWake 函数,一次是在 loopRead 函数。会不会作者在 loopRead 方法中了做了处理,规避了没有注册可写事件这种情况呢?
我们看下 loopRead
func loopRead(s *server, l *loop, c *conn) error {
var in []byte
n, err := syscall.Read(c.fd, l.packet)
if n == 0 || err != nil {
if err == syscall.EAGAIN {
return nil
}
return loopCloseConn(s, l, c, err)
}
in = l.packet[:n]
if !c.reuse {
in = append([]byte{}, in…)
}
if s.events.Data != nil {
out, action := s.events.Data(c, in)
c.action = action
if len(out) > 0 {
c.out = append([]byte{}, out…)
}
}
if len(c.out) != 0 || c.action != None {
l.poll.ModReadWrite(c.fd)
}
return nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
果然,作者做了处理! 当 s.events.Data(c, in) 函数返回,如果 c.out 有数据,就注册可读可写事件。
所以,执行的流程是:
客户端有数据到来,loopRead 函数执行
调用客户注册的回调函数 events.Data 函数,客户将需要的写入给客户端的数据返回,evio 将需要写给客户端数据存到 c.out , 然后监听可读可写事件
eopll 可写事件唤醒,执行 loopWrite 直接 write 数据。 如果写完就重新注册,只注册可读事件;如果没写完,就不重新注册,还是可读可写事件都监听
当缓冲区有空间了时,epoll 又会唤醒继续 loopWrite 。
似乎没问题,但是仔细想一想,会不会有这种情况呢:
内核的缓冲区满了,第一次没写完,等待缓冲区可写。此时客户端又来了数据,继续执行 loopRead 。调用用户回调函数,又有要写入的数据。这是来看看处理逻辑
if s.events.Data != nil {
out, action := s.events.Data(c, in)
c.action = action
if len(out) > 0 {
c.out = append([]byte{}, out…)
}
}
1
2
3
4
5
6
7
c.out = append([]byte{}, out…) 这里,之前没写完存在 c.out 里的数据直接被清空了啊。这样要写入的数据就丢失了一部分啊。
思考
evio 速度非常快,但是翻了源码,发现 evio 并没有刻意去减少 epoll 的唤醒次数,相反 evio 利用 epoll 的多次唤醒去做操作。
比如,调用客户回调后,并没有直接处理 action 的状态,反而是先把 action 存起来,增加注册 fd 的可写事件,让epoll 再唤醒,在 loopAction 中再来处理 action。先不说这样会不会有问题,这样让 epoll 频繁唤醒似乎不妥。
evio 的处理 read 和 write 的方式,也导致多次的内存拷贝,换种方式,性能还可以再次提升。evio 在 linux 环境(epoll)下,单元测试因为 用来唤醒 epoll 的 eventfd 写入数据没有读出 这个bug ,单元测试并不能通过。
evio 非常轻量,这也说明它非常简单,使用起来还是非常不方便,并且对于 epoll 的处理还有很多可以优化的地方。而且,作者似乎很忙。。。 PR也不理,Issues 也不理。所以决定自己撸一个了,更好用,更快速:eviop 。
推荐库
gev 一个轻量、快速的基于 Reactor 模式的非阻塞 TCP 网络库。
原理
evio 是 Reactor 模式的简单实现。Reactor 本质就是“non-blocking IO + IO multiplexing”,通过非阻塞 IO+ IO 多路复用来处理并发。程序运行一个或者多个事件循环,通过在事件循环中注册回调的方式实现业务逻辑。
evio 将所有文件描述符设为非阻塞,并注册到事件循环( epoll / kqueue )中。相较于传统的 per thread per connection 的处理方法,线程使用更少,线程资源利用率更高。
evio 需要在服务启动前,注册回调函数,当事件循环中有事件到来时,会调用回调函数处理。
使用示例
先从一个简单的 echo server 的例子来了解 evio 。
package main
import (
“flag”
“fmt”
“log”
“strings”
"github.com/tidwall/evio" )
func main() {
var port int
var loops int
var udp bool
var trace bool
var reuseport bool
var stdlib bool
flag.IntVar(&port, "port", 5000, "server port")
flag.BoolVar(&udp, "udp", false, "listen on udp")
flag.BoolVar(&reuseport, "reuseport", false, "reuseport (SO_REUSEPORT)")
flag.BoolVar(&trace, "trace", false, "print packets to console")
flag.IntVar(&loops, "loops", 0, "num loops")
flag.BoolVar(&stdlib, "stdlib", false, "use stdlib")
flag.Parse()
var events evio.Events
events.NumLoops = loops
events.Serving = func(srv evio.Server) (action evio.Action) {
log.Printf("echo server started on port %d (loops: %d)", port, srv.NumLoops)
if reuseport {
log.Printf("reuseport")
}
if stdlib {
log.Printf("stdlib")
}
return
}
events.Data = func(c evio.Conn, in []byte) (out []byte, action evio.Action) {
if trace {
log.Printf("%s", strings.TrimSpace(string(in)))
}
out = in
return
}
scheme := "tcp"
if udp {
scheme = "udp"
}
if stdlib {
scheme += "-net"
}
log.Fatal(evio.Serve(events, fmt.Sprintf("%s://:%d?reuseport=%t", scheme, port, reuseport))) }
上面的例子主要就是注册了两个回调函数: events.Serving 和 events.Data 。
当 server 启动时,会来执行注册的 events.Serving 回调函数;
当有数据到来时,执行 events.Data 回调函数。
程序最后调用 evio.Serve 方法开启事件循环,程序在此处不断循环检测是否有事件发生并处理(有数据到来,有数据要发送…)。
evio 都是通过回调函数来执行业务逻辑的。 当客户端有数据发送过来时,调用用户注册的 events.Data 函数。
需要发送数据给客户端时,只可以通过注册的回调函数的返回值来返回,evio 框架来负责发送(有 bug)。
回调函数的返回值主要有两个 out []byte, action evio.Action , out 就是需要发送给客户端的, Action 就是返回一些状态,用来关闭连接,或者服务器退出啥的操作。主要状态如下:
const (
// None indicates that no action should occur following an event.
None Action = iota
// Detach detaches a connection. Not available for UDP connections.
Detach
// Close closes the connection.
Close
// Shutdown shutdowns the server.
Shutdown
)
evio 的事件循环
evio.Serve
我们先来看下 evio.Serve 方法的实现
func Serve(events Events, addr …string) error {
var lns []*listener
defer func() {
// 这个函数如果推出,需要关闭所有 listener
for _, ln := range lns {
ln.close()
}
}()
var stdlib bool
// 可以选择使用 stdlib(stdlib 主要是为了支持 非 *unix 平台)
for _, addr := range addr {
// 生成 listener
var ln listener
var stdlibt bool
ln.network, ln.addr, ln.opts, stdlibt = parseAddr(addr)
if stdlibt {
stdlib = true
}
if ln.network == "unix" {
os.RemoveAll(ln.addr)
}
var err error
if ln.network == "udp" {
if ln.opts.reusePort {
ln.pconn, err = reuseportListenPacket(ln.network, ln.addr)
} else {
ln.pconn, err = net.ListenPacket(ln.network, ln.addr)
}
} else {
if ln.opts.reusePort {
ln.ln, err = reuseportListen(ln.network, ln.addr)
} else {
ln.ln, err = net.Listen(ln.network, ln.addr)
}
}
if err != nil {
return err
}
if ln.pconn != nil {
ln.lnaddr = ln.pconn.LocalAddr()
} else {
ln.lnaddr = ln.ln.Addr()
}
if !stdlib {
if err := ln.system(); err != nil {
return err
}
}
lns = append(lns, &ln)
}
if stdlib {
return stdserve(events, lns)
// 使用 std net 库 启动server
}
return serve(events, lns)
// 使用 epoll or kqueue 启动server }
从 Serve 函数签名中可以看出 evio 是支持绑定多地址监听的
func Serve(events Events, addr …string) error
使用方式如下:
evio.Serve(events, “tcp://localhost:5000”, “tcp://192.168.0.10:5001”)
现在我们看看 evio 的核心部分: serve(events, lns)
,这里会启动 evio 的 sever 。
func serve(events Events, listeners []*listener) error {
numLoops := events.NumLoops // 确定启动的事件循环数量
if numLoops <= 0 {
if numLoops == 0 {
numLoops = 1
} else {
numLoops = runtime.NumCPU()
}
}
s := &server{}
s.events = events
s.lns = listeners
s.cond = sync.NewCond(&sync.Mutex{})
s.balance = events.LoadBalance
s.tch = make(chan time.Duration)
//println("-- server starting")
if s.events.Serving != nil { // 如果注册了回调函数,就执行
var svr Server
svr.NumLoops = numLoops
svr.Addrs = make([]net.Addr, len(listeners))
for i, ln := range listeners {
svr.Addrs[i] = ln.lnaddr
}
action := s.events.Serving(svr)
switch action {
case None:
case Shutdown:
return nil
}
}
defer func() { // server 退出后的清理工作
// wait on a signal for shutdown
s.waitForShutdown()
// notify all loops to close by closing all listeners
for _, l := range s.loops {
l.poll.Trigger(errClosing)
}
// wait on all loops to complete reading events
s.wg.Wait()
// close loops and all outstanding connections
for _, l := range s.loops {
for _, c := range l.fdconns {
loopCloseConn(s, l, c, nil)
}
l.poll.Close()
}
//println("-- server stopped")
}()
// create loops locally and bind the listeners.
for i := 0; i < numLoops; i++ {
l := &loop{
idx: i,
poll: internal.OpenPoll(),
packet: make([]byte, 0xFFFF), // event loop 的 read 缓冲区
fdconns: make(map[int]*conn),
}
for _, ln := range listeners {
l.poll.AddRead(ln.fd) // 将 fd 注册到 epoll 中并监听可读事件
}
s.loops = append(s.loops, l)
}
// start loops in background
s.wg.Add(len(s.loops))
for _, l := range s.loops { // 启动所有的 event loop
go loopRun(s, l)
}
return nil }
serve 主要做这些事:
根据配置启动指定数量的 event loop,如果传入配置的 loop 数量为 0 则设置启动一个事件循环,如果传入配置小于 0 则设置为运行平台的 CPU 核心数量
如果设置了回调函数 events.Serving ,运行它
按照指定 event loop 数量,创建 epoll 句柄生成 loop ,并将所有的 listener 注册到 epoll 监听可读事件(有客户端连接)
启动所有事件循环(一个事件循环一个 goroutine)
需要注意的是,evio 将所有的 listener 的 fd 在每一个事件循环的 epoll 中都注册了。也就是说,如果有三个事件循环,一个 listener ,那么这个 listener 的 fd 会注册到三个 epoll 中。这就会出现 epoll 的惊群现象,感兴趣的可以自己搜索了解下。
evio 当一个新连接到来时,所有的事件循环都会唤醒,但是最终只有一个线程可以 accept 调用返回成功,其他线程(协程)的 accept 函数调用返回 EAGAIN 错误 。
loopRun
下面我们看看 loopRun 的内部实现
func loopRun(s *server, l *loop) {
defer func() {
//fmt.Println(“– loop stopped –”, l.idx)
s.signalShutdown()
s.wg.Done()
}()
if l.idx == 0 && s.events.Tick != nil {
go loopTicker(s, l)
}
//fmt.Println("-- loop started --", l.idx)
l.poll.Wait(func(fd int, note interface{}) error {
if fd == 0 {
return loopNote(s, l, note)
}
c := l.fdconns[fd]
switch {
case c == nil:
return loopAccept(s, l, fd)
case !c.opened:
return loopOpened(s, l, c)
case len(c.out) > 0:
return loopWrite(s, l, c)
case c.action != None:
return loopAction(s, l, c)
default:
return loopRead(s, l, c)
}
}) }
l.poll.Wait 传入一个回调函数作为参数,当 epoll 收到事件通知时,会执行这个回调函数。
在这个函数中接受客户端连接,读取客户端数据,调用客户回调函数处理业务逻辑…
我们先来看下 poll.Wait 的内部实现,再看看 loopAccept,loopOpened,loopWrite 等函数。
loopRun 方法中最重要的就是 poll.Wait ,我们看看 Linux 下 epoll 的实现
func (p *Poll) Wait(iter func(fd int, note interface{}) error) error {
events := make([]syscall.EpollEvent, 64)
for {
n, err := syscall.EpollWait(p.fd, events, -1)
if err != nil && err != syscall.EINTR {
return err
}
if err := p.notes.ForEach(func(note interface{}) error {
return iter(0, note)
}); err != nil {
return err
}
for i := 0; i < n; i++ {
if fd := int(events[i].Fd); fd != p.wfd {
if err := iter(fd, nil); err != nil {
return err
}
} else {
}
}
} } 这个函数中是一个死循环,程序会阻塞在此处等待 epoll 的”通知“,然后处理就绪的 fd (读取/发送数据、执行用户注册的回调函数)。
当有 fd 就绪的时候,syscall.EpollWait 函数返回,并且将就绪的 fd 通过 events 传出,返回值 n 为就绪 fd 的个数。
然后循环逐个遍历就绪的 fd,调用回调函数处理。
for i := 0; i < n; i++ {
if fd := int(events[i].Fd); fd != p.wfd {
if err := iter(fd, nil); err != nil {
return err
}
} else {
} } evio 的事件处理
l.poll.Wait(func(fd int, note interface{}) error {
if fd == 0 {
return loopNote(s, l, note)
}
c := l.fdconns[fd]
switch {
case c == nil:
return loopAccept(s, l, fd)
case !c.opened:
return loopOpened(s, l, c)
case len(c.out) > 0:
return loopWrite(s, l, c)
case c.action != None:
return loopAction(s, l, c)
default:
return loopRead(s, l, c)
}
})
当 epoll 检测到有就绪的 fd 时,会逐个调用上面的回调函数,evio 的主要逻辑也在这里。
当 fd == 0 时,会执行 loopNote 函数。loopNote 主要是用来处理一些非 fd 就绪的事件,比如定时任务、强制退出等。当然,我们都知道 fd 为 0 是标准输入,所以此处并不是真的去处理 fd 为 0 的文件描述符(注册到 epoll 的文件描述 >= 3)。作者知道 epoll 返回的就绪 fd 中不会有为 0 的情况,所以此处 fd 为 0,是作者调用时传入,用来表示一种特殊的唤醒场景。
func (p *Poll) Wait(iter func(fd int, note interface{}) error) error {
…
p.changes = p.changes[:0]
if err := p.notes.ForEach(func(note interface{}) error {
return iter(0, note)
…
我们跳到调用它的地方,可以看到只有在 p.notes.ForEach 这个函数中注册的回调函数中才会传入 fd 为 0 来执行 iter 回调函数。
notes noteQueue
noteQueue 的实现在 internal 目录中的 notequeue.go , 是一个无锁队列。我们不详细分析,只看下 ForEach 这个方法:
func (q *noteQueue) ForEach(iter func(note interface{}) error) error {
q.mu.Lock()
if len(q.notes) == 0 {
q.mu.Unlock()
return nil
}
notes := q.notes
q.notes = nil
q.mu.Unlock()
for _, note := range notes {
if err := iter(note); err != nil { // 执行回调函数
return err
}
}
return nil
}
当队列中有数据时, 会执行回调函数,即
func(note interface{}) error {
return iter(0, note)
}
从上面的分析中可以我们已经知道为什么会有 fd 为 0 ,下面我们看下 loopNote 做什么。
loopNote
func loopNote(s *server, l *loop, note interface{}) error {
var err error
switch v := note.(type) {
case time.Duration:
delay, action := s.events.Tick()
switch action {
case None:
case Shutdown:
err = errClosing
}
s.tch <- delay
case error: // shutdown
err = v
case *conn:
// Wake called for connection
if l.fdconns[v.fd] != v {
return nil // ignore stale wakes
}
return loopWake(s, l, v)
}
return err
}
传入的 note 是 interface{} ,首先对 note 进行类型判断。
当 note 是 time.Duration 时,调用回调函数 events.Tick() ,这是 evio 提供的定时任务接口。
在 loopRun 函数中,如果设置了定时回调函数,会启动一个协程来来运行 loopTicker
if l.idx == 0 && s.events.Tick != nil {
go loopTicker(s, l)
}
loopTicker 实现如下,可以看出会定时去触发 l.poll.Trigger,并且传入 time.Duration(0)
func loopTicker(s *server, l *loop) {
for {
if err := l.poll.Trigger(time.Duration(0)); err != nil {
break
}
time.Sleep(<-s.tch)
}
}
我们跳到 poll.Trigger 的 Linux 下的实现,可以发现 evio 在此处 p.notes.Add(note) ,也就是 time.Duration(0)
func (p *Poll) Trigger(note interface{}) error {
p.notes.Add(note)
_, err := syscall.Write(p.wfd, []byte{0, 0, 0, 0, 0, 0, 0, 1})
return err
}
poll.Trigger 这个函数不仅仅是在 p.notes 里增加了一个 note,还唤醒了事件循环。
当 epoll 中注册 fd 都没有就绪事件时,线程会挂起,epoll 的 wait 方法会处于阻塞状态。evio 使用
Linux 提供的 eventfd 来实现事件循环的唤醒,也就是代码上中的 syscall.Write(p.wfd, []byte{0, 0, 0, 0, 0, 0, 0, 1}) ,往 p.wfd 这个文件描述符中写入了 8 个字节的数据。
p.wfd 是一个 eventfd , 是 Poll 结构体的成员,在 OpenPoll 时赋值,即打开一个 eventfd 代码如下:
type Poll struct {
fd int // epoll fd
wfd int // wake fd
notes noteQueue
}
func OpenPoll() *Poll {
l := new(Poll)
p, err := syscall.EpollCreate1(0)
if err != nil {
panic(err)
}
l.fd = p
r0, _, e0 := syscall.Syscall(syscall.SYS_EVENTFD2, 0, 0, 0)
if e0 != 0 {
syscall.Close(p)
panic(err)
}
l.wfd = int(r0)
l.AddRead(l.wfd)
return l
}
syscall.Syscall(syscall.SYS_EVENTFD2, 0, 0, 0) 创建了一个 eventfd ,然后将这个 eventfd 注册到了 epoll 监听可读事件。当 syscall.Write(p.wfd, []byte{0, 0, 0, 0, 0, 0, 0, 1}) 时候,epoll 就会唤醒。
但是,我翻了好久,也没有找到 evio 在哪里读取 eventfd 写入的 8 个字节(epoll)。这是一个 bug,所以在 Linux 机器上,这是不能用的。
这个 bug 会造成 epoll 不断唤醒,CPU 被长期占用
当我们注册了 evio 的定时任务 Tick 回调函数,程序启动后会往 eventfd 里写入 8 个字节数据,但是 evio 并没有读取,并且 evio 使用的是 epoll 的默认模式 LT,即只要可读缓冲区里还有数据,epoll 会一直不断唤醒,这是一个严重的 bug,作者应该没有在 Linux 环境下严格测试过。
我们抛开这个 bug, 继续来看 note 为 error 类型的情况。在 serve 函数中,当函数退出时,通过 l.poll.Trigger(errClosing) 来通知每个事件循环退出。
func serve(events Events, listeners []*listener) error {
…
defer func() {
// wait on a signal for shutdown
s.waitForShutdown()
// notify all loops to close by closing all listeners
for _, l := range s.loops {
l.poll.Trigger(errClosing)
}
// wait on all loops to complete reading events
s.wg.Wait()
// close loops and all outstanding connections
for _, l := range s.loops {
for _, c := range l.fdconns {
loopCloseConn(s, l, c, nil)
}
l.poll.Close()
}
//println("-- server stopped") }()
…
当 note 为 *conn 这种情况,是用来提供给使用者主动唤醒当前事件循环
func (c conn) Wake() {
if c.loop != nil {
c.loop.poll.Trigger(c)
}
}
loopAccept
c := l.fdconns[fd]
switch {
case c == nil:
return loopAccept(s, l, fd)
type loop struct {
idx int // loop index in the server loops list
poll *internal.Poll // epoll or kqueue
packet []byte // read packet buffer
fdconns map[int]conn // loop connections fd -> conn
count int32 // connection count
}
fdconns 是用来存储已连接的 TCP connection 信息,key 为 fd, value 为 *conn 。
当 epoll 唤醒时,如果 fd 不在当前事件循环的连接,那就说明它是新连接,则执行 loopAccept 。
func loopAccept(s *server, l *loop, fd int) error {
for i, ln := range s.lns {
if ln.fd == fd {
if len(s.loops) > 1 {
switch s.balance {
case LeastConnections:
n := atomic.LoadInt32(&l.count)
for _, lp := range s.loops {
if lp.idx != l.idx {
if atomic.LoadInt32(&lp.count) < n {
return nil // do not accept
}
}
}
case RoundRobin:
idx := int(atomic.LoadUintptr(&s.accepted)) % len(s.loops)
if idx != l.idx {
return nil // do not accept
}
atomic.AddUintptr(&s.accepted, 1)
}
}
if ln.pconn != nil {
return loopUDPRead(s, l, i, fd)
}
nfd, sa, err := syscall.Accept(fd)
if err != nil {
if err == syscall.EAGAIN {
return nil
}
return err
}
if err := syscall.SetNonblock(nfd, true); err != nil {
return err
}
c := &conn{fd: nfd, sa: sa, lnidx: i, loop: l}
l.fdconns[c.fd] = c
l.poll.AddReadWrite(c.fd)
atomic.AddInt32(&l.count, 1)
break
}
}
return nil
}
因为 evio 支持多地址监听,所以会存在多个 listener ,也就是 s.lns 。
第一步,先遍历所有的 listener 看看当前 epoll 中就绪的 fd 是哪一个 listener ,然后执行客户端的负载策略,决定新的客户端连接放在哪一个事件循环中。
这里关于客户端的负载策略,evio 利用了 epoll 的惊群效果,所有的事件循环都会唤醒进入 loopAccept,不符合负载策略直接 return nil。 关于这边的更多细节,可以看我的另一篇文章 【Golang 网络库 evio 一些问题/bug 和思考】。
接下来就是常规操作了, syscall.Accept(fd) 接受连接,然后 syscall.SetNonblock(nfd, true) 设置成非阻塞模式, l.poll.AddReadWrite(c.fd) 最后加入事件循环,注册可读可写事件。
loopOpened
func loopOpened(s server, l *loop, c *conn) error {
c.opened = true
c.addrIndex = c.lnidx
c.localAddr = s.lns[c.lnidx].lnaddr
c.remoteAddr = internal.SockaddrToAddr(c.sa)
if s.events.Opened != nil {
out, opts, action := s.events.Opened(c)
if len(out) > 0 {
c.out = append([]byte{}, out…)
}
c.action = action
c.reuse = opts.ReuseInputBuffer
if opts.TCPKeepAlive > 0 {
if _, ok := s.lns[c.lnidx].ln.(net.TCPListener); ok {
internal.SetKeepAlive(c.fd, int(opts.TCPKeepAlive/time.Second))
}
}
}
if len(c.out) == 0 && c.action == None {
l.poll.ModRead(c.fd)
}
return nil
}
loopOpened 是在 loopAccept 执行完成后,epoll 会立马再次唤醒然后执行的。
因为在 loopAccept 中最后将新的客户端连接加入 epoll 管理时注册的是可读可写事件,当前的内核写缓冲区肯定是为空的,所以 epoll 会再次唤醒。
…
case !c.opened:
return loopOpened(s, l, c)
…
唤醒后会执行到这个 case case !c.opened:,因为在 loopAccept 中并没有去设置这个值。
loopOpened 内部的操作,主要就是设置一下 conn 的属性,然后调用客户注册的回调函数 events.Opened 。
如果在回调函数中,没有给客户端发送数据,则需要重新注册,只注册可读事件,不然 epoll 会一直唤醒(可写事件)。
loopAction
func loopAction(s *server, l *loop, c *conn) error {
switch c.action {
default:
c.action = None
case Close:
return loopCloseConn(s, l, c, nil)
case Shutdown:
return errClosing
case Detach:
return loopDetachConn(s, l, c, nil)
}
if len(c.out) == 0 && c.action == None {
l.poll.ModRead(c.fd)
}
return nil
}
case c.action != None:
return loopAction(s, l, c)
loopAction 会在 case c.action != None: 的情况下执行, c.action 是执行完用户回调函数后会被赋值的状态。
在会有 action 的 loopXXX 中都会有如下类似操作。
if len(c.out) != 0 || c.action != None {
l.poll.ModReadWrite(c.fd)
}
也就是说 loopAction 依赖于 epoll 被可写事件再次唤醒来执行,这样会不会有问题呢? 内核缓冲区满了??
loopAction 内部的主要操作就是根据 action 做一些处理,关闭连接等等。
loopRead 和 loopWrite
loopRead 和 loopWrite 主要就是调用系统调用读取和发送数据,并且调用用户回调函数,根据回调函数返回值来重新注册 epoll 的可读可写事件。
func loopRead(s *server, l *loop, c *conn) error {
var in []byte
n, err := syscall.Read(c.fd, l.packet)
if n == 0 || err != nil {
if err == syscall.EAGAIN {
return nil
}
return loopCloseConn(s, l, c, err)
}
in = l.packet[:n]
if !c.reuse {
in = append([]byte{}, in…)
}
if s.events.Data != nil {
out, action := s.events.Data(c, in)
c.action = action
if len(out) > 0 {
c.out = append([]byte{}, out…)
}
}
if len(c.out) != 0 || c.action != None {
l.poll.ModReadWrite(c.fd)
}
return nil
}
调用 n, err := syscall.Read(c.fd, l.packet) 读取内核缓冲区的数据,如果返回出错 err == syscall.EAGAIN 意思是再试一次,直接返回。
如果 n == 0 或者 err 错误不为 syscall.EAGAIN ,则说明对方关闭了连接或是其他错误,直接 loopCloseConn 。
然后调用用户回调函数 s.events.Data ,根据返回值做相应操作。c.action = action
如果 out 里有数据,则赋给 c.out , 并且注册可读可写事件。
如果 c.action != None ,同样需要注册可读可写事件,原因上面已经说过了。
loopWrite 操作也大同小异,就不细说了。
但是其实关于 loopWrite 和 loopRead 的处理是会有 bug 的