Golang中NetPoll机制的实现
Linux 网络IO
Linux 的阻塞网络 I/O (输入/输出) 是指在进行网络操作(如 read() 或 write())时,如果操作无法立即完成,调用线程将被操作系统“阻塞”,直到操作成功或失败才返回。它属于同步 I/O 模型的一种,与之相对的是非阻塞 I/O。
int listenfd = socket(AF_INET, SOCK_STREAM, 0); struct sockaddr_in server_addr; server_addr.sin_family = AF_INET; server_addr.sin_addr.s_addr = htonl(INADDR_ANY); // 绑定到所有可用接口 server_addr.sin_port = htons(8080); // 绑定到 8080 端口 bind(listenfd, (struct sockaddr *)&server_addr, sizeof(server_addr)); listen(listenfd, 5); // 允许 5 个待处理的连接 // 阻塞等待 struct sockaddr_in client_addr; socklen_t client_addr_len = sizeof(client_addr); int connectfd = accept(listenfd, (struct sockaddr *)&client_addr, &client_addr_len);
Linux 非阻塞IO Select、Poll、Epoll
核心机制:这些系统调用让用户进程在调用前,能预先指定一系列要监控的文件描述符。当一个或多个文件描述符准备好进行读写操作时,系统会通知用户进程。
轮询与事件驱动:
select 和 poll: 它们采用轮询方式,每次都检查所有传入的文件描述符集合来判断哪些是活跃的。
(1)select==>时间复杂度O(n)
它仅仅知道了,有I/O事件发生了,却并不知道是哪那几个流(可能有一个,多个,甚至全部),我们只能无差别轮询所有流,找出能读出数据,或者写入数据的流,对他们进行操作。所以select具有O(n)的无差别轮询复杂度,同时处理的流越多,无差别轮询时间就越长。
(2)poll==>时间复杂度O(n)
poll本质上和select没有区别,它将用户传入的数组拷贝到内核空间,然后查询每个fd对应的设备状态, 但是它没有最大连接数的限制,原因是它是基于链表来存储的.
(3)epoll==>时间复杂度O(1)
epoll: 它采用事件驱动模型,通过内核的回调机制,只在需要时通知用户进程,效率更高,尤其是在处理大量并发连接时。epoll 机制通过在内核中维护一个就绪列表,当数据到达时,将对应的节点加入就绪列表,然后唤醒等待的用户进程,从而避免了不必要的轮询。
Golang 中Epoll 应用
一个简单的网络IO
// 启动 tcp server 代码示例
func main() {
/*
- 创建 tcp 端口监听器
- 创建 socket fd,bind、accept
- 创建 epoll 事件表(epoll_create)
- socket fd 注册到 epoll 事件表(epoll_ctl:add)
*/
l, _ := net.Listen("tcp", ":8080")
// eventloop reactor 模型
for {
/*
- 等待 tcp 连接到达
- loop + 非阻塞模式调用 accept
- 若未就绪,则通过 gopark 进行阻塞
- 等待 netpoller 轮询唤醒
- 检查是否有 io 事件就绪(epoll_wait——nonblock)
- 若发现事件就绪 通过 goready 唤醒 g
- accept 获取 conn fd 后注册到 epoll 事件表(epoll_ctl:add)
- 返回 conn
*/
conn, _ := l.Accept()
// goroutine per conn
go serve(conn)
}
}
// 处理一笔到来的 tcp 连接
func serve(conn net.Conn) {
/*
- 关闭 conn
- 从 epoll 事件表中移除该 fd(epoll_ctl:remove)
- 销毁该 fd
*/
defer conn.Close()
var buf []byte
/*
- 读取连接中的数据
- loop + 非阻塞模式调用 recv (read)
- 若未就绪,则通过 gopark 进行阻塞
- 等待 netpoller 轮询唤醒
- 检查是否有 io 事件就绪(epoll_wait——nonblock)
- 若发现事件就绪 通过 goready 唤醒 g
*/
_, _ = conn.Read(buf)
/*
- 向连接中写入数据
- loop + 非阻塞模式调用 writev (write)
- 若未就绪,则通过 gopark 进行阻塞
- 等待 netpoller 轮询唤醒
- 检查是否有 io 事件就绪(epoll_wait:nonblock)
- 若发现事件就绪 通过 goready 唤醒 g
*/
_, _ = conn.Write(buf)
}
1、调用Listen方法,通过epoll_create 初始化事件表, 创建 socket fd, 通过epoll_ctl 将socket fd注册到Epoll事件表, 监听就绪事件,等待远端连接。如果有远端连接,则会在内核空间执行回调函数,将socket fd放入就绪列表中
// 在 epoll 事件表中注册监听 r 的读就绪事件
ev := epollevent{
events: _EPOLLIN,
}
2、调用Accept方法,非阻塞调用Epoll Wait,获取当前监听的fd是否有就绪的socket fd
2.1、如果没有则将当前的goroutine阻塞,并挂载在socket fd 对应的polldesc的rg等待链表中,等待环境
2.2、如果有,则获取conn fd返回给用户程序
// Accept wraps the accept network call.
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
// ...
for {
// 以nonblock 模式发起一次 syscall accept 尝试接收到来的 conn
s, rsa, errcall, err := accept(fd.Sysfd)
// 接收conn成功,直接返回结果
if err == nil {
return s, rsa, "", err
}
switch err {
// 中断类错误直接忽略
case syscall.EINTR:
continue
// 当前未有到达的conn
case syscall.EAGAIN:
// 走入 poll_wait 流程,并标识关心的是 socket fd 的读就绪事件
// (当conn 到达时,表现为 socket fd 可读)
if fd.pd.pollable() {
// 倘若读操作未就绪,当前g 会 park 阻塞在该方法内部,直到因超时或者事件就绪而被 netpoll ready 唤醒
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
// ...
}
// ...
}
}
// 指定 mode 为 r 标识等待的是读就绪事件,然后走入更底层的 poll_wait 流程
func (pd *pollDesc) waitRead(isFile bool) error {
return pd.wait('r', isFile)
}
3、调用Read、Write方法。非阻塞调用Epoll Wait,关心conn fd上是否有读就绪事件或者写就绪事件
3.1、如果 conn fd 下读操作尚未就绪(尚无数据到达),则会执行 poll wait 将当前 g 阻塞并挂载到 conn fd 对应 pollDesc 的 rg 中
3.2、如果 conn fd 下写操作尚未就绪(缓冲区空间不足),则会执行 poll wait 将当前 g 阻塞并挂载到 conn fd 对应 pollDesc 的wg中
// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
// ...
for {
// 以非阻塞模式执行一次syscall read 操作
n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
if err != nil {
n = 0
// 走入 poll_wait 流程,并标识关心的是该 fd 的读就绪事件
if err == syscall.EAGAIN && fd.pd.pollable() {
// 倘若读操作未就绪,当前g 会 park 阻塞在该方法内部,直到因超时或者事件就绪而被 netpoll ready 唤醒
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
}
err = fd.eofError(n, err)
return n, err
}
}
4、小结
可以看到,一个连接的数据读写,如果条件不满足,goroutine都会挂起并挂载到pollDesc的rg、wg中,但此时用户程序主线程不会阻塞,而是等待底层数据到达后,再进行处理。
NetPoll的调度
由上可知,当 g 发现关心的 io 事件未就绪时,会通过 gopark 操作将自身陷入阻塞,并且将 g 挂载在 pollDesc 的 rg/wg 中, 而本小节介绍的 net_poll 流程就负责轮询获取已就绪 pollDesc 对应的 g,将其返回给上游的 gmp 调度系统,对其进行唤醒和调度.
Golang GMP模型触发netpoll时机
1、M在寻找可用的Goroutine时,在本地和全局队列上没有找到,会调用netpoll处理网络IO
// gmp 核心调度流程:g0 为当前 p 找到下一个调度的 g
/*
pick g 的核心逻辑:
1)每调度 61 次,需要专门尝试处理一次全局队列(防止饥饿)
2)尝试从本地队列中获取 g
3)尝试从全局队列中获取 g
4)以【非阻塞模式】调度 netpoll 流程,获取所有需要唤醒的 g 进行唤醒,并获取其中的首个g
5)从其他 p 中窃取一半的 g 填充到本地队列
6)仍找不到合适的 g,则协助 gc
7)以【阻塞或者超时】模式,调度netpoll 流程(全局仅有一个 p 能走入此分支)
8)当前m 添加到全局队列的空闲队列中,停止当前 m
*/
func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
// ..
/*
同时满足下述三个条件,发起一次【非阻塞模式】的 netpoll 流程:
- epoll事件表初始化过
- 有 g 在等待io 就绪事件
- 没有空闲 p 在以【阻塞或超时】模式发起 netpoll 流程
*/
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
// 以非阻塞模式发起一轮 netpoll,如果有 g 需要唤醒,一一唤醒之,并返回首个 g 给上层进行调度
if list := netpoll(0); !list.empty() { // non-blocking
// 获取就绪 g 队列中的首个 g
gp := list.pop()
// 将就绪 g 队列中其余 g 一一置为就绪态,并添加到全局队列
injectglist(&list)
// 把首个g 也置为就绪态
casgstatus(gp, _Gwaiting, _Grunnable)
// ...
//返回 g 给当前 p进行调度
return gp, false, false
}
}
// ...
/*
同时满足下述三个条件,发起一次【阻塞或超时模式】的 netpoll 流程:
- epoll事件表初始化过
- 有 g 在等待io 就绪事件
- 没有空闲 p 在以【阻塞或超时】模式发起 netpoll 流程
*/
if netpollinited() && (atomic.Load(&netpollWaiters) > 0 || pollUntil != 0) && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
// 默认为阻塞模式
delay := int64(-1)
// 存在定时时间,则设为超时模式
if pollUntil != 0 {
delay = pollUntil - now
// ...
}
// 以【阻塞或超时模式】发起一轮 netpoll
list := netpoll(delay) // block until new work is available
}
// ...
}
2、Sysmon 定时会调用netpoll处理网络IO
// The main goroutine.
func main() {
// ...
// 新建一个 m,直接运行 sysmon 函数
systemstack(func() {
newm(sysmon, nil, -1)
})
// ...
}
// 全局唯一监控线程的执行函数
func sysmon() {
// ...
for {
// ...
/*
同时满足下述三个条件,发起一次【非阻塞模式】的 netpoll 流程:
- epoll事件表初始化过
- 没有空闲 p 在以【阻塞或超时】模式发起 netpoll 流程
- 距离上一次发起 netpoll 流程的时间间隔已超过 10 ms
*/
lastpoll := int64(atomic.Load64(&sched.lastpoll))
if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
// 以非阻塞模式发起 netpoll
list := netpoll(0) // non-blocking - returns list of goroutines
// 获取到的 g 置为就绪态并添加到全局队列中
if !list.empty() {
// ...
injectglist(&list)
// ...
}
}
// ...
}
}
3、gc并发标记的流程
func startTheWorldWithSema(emitTraceEvent bool) int64 {
// 断言世界已停止
assertWorldStopped()
// ...
// 如果 epoll 事件表初始化过,则以非阻塞模式执行一次 netpoll
if netpollinited() {
// 所有取得的 g 置为就绪态并添加到全局队列
list := netpoll(0) // non-blocking
injectglist(&list)
}
// ...
}
当上述条件成立时,Netpoll执行 epoll_wait 操作,获取就绪的 io 事件 list. 一轮最多获取 128 个,根据就绪事件类型,将 mode 分为 w(写就绪事件)和 r(读就绪事件)。获取 pollDesc 实例中 rg或者wg中的 g 实例,一并返回GMP进行调度

// netpoll checks for ready network connections.
// Returns list of goroutines that become runnable.
/*
- netpoll 流程用于轮询检查是否有就绪的 io 事件
- 如果有就绪 io 事件,还需要检查是否有 pollDesc 中的 g 关心该事件
- 找到所有关心该就绪 io 事件的 g,添加到 list 中返回给上游进行 goready 唤醒
*/
func netpoll(delay int64) gList {
/*
根据传入的 delay 参数,决定调用 epoll_wait 的模式
- delay < 0:设为 -1 阻塞模式(在 gmp 调度流程中,如果某个 p 迟迟获取不到可执行的 g 时,会通过该模式,使得 thread 陷入阻塞态,但该情况全局最多仅有一例)
- delay = 0:设为 0 非阻塞模式(通常情况下为此模式,包括 gmp 常规调度流程、gc 以及全局监控线程 sysmon 都是以此模式触发的 netpoll 流程)
- delay > 0:设为超时模式(在 gmp 调度流程中,如果某个 p 迟迟获取不到可执行的 g 时,并且通过 timer 启动了定时任务时,会令 thread 以超时模式执行 epoll_wait 操作)
*/
var waitms int32
if delay < 0 {
waitms = -1
} else if delay == 0 {
waitms = 0
// 针对 delay 时长取整
} else if delay < 1e6 {
waitms = 1
} else if delay < 1e15 {
waitms = int32(delay / 1e6)
} else {
// 1e9 ms == ~11.5 days.
waitms = 1e9
}
// 一次最多接收 128 个 io 就绪事件
var events [128]epollevent
retry:
// 以指定模式,调用 epoll_wait 指令
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
// ...
// 遍历就绪的每个 io 事件
var toRun gList
for i := int32(0); i < n; i++ {
ev := &events[i]
if ev.events == 0 {
continue
}
// pipe 接收端的信号量处理
if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd {
// ...
}
/*
根据 io 事件类型,标识出 mode:
- EPOLL_IN -> r;
- EPOLL_OUT -> w;
- 错误或者中断事件 -> r & w;
*/
var mode int32
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
}
// 根据 epollevent.data 获取到监听了该事件的 pollDesc 实例
if mode != 0 {
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
// ...
// 尝试针对对应 pollDesc 进行唤醒操作
netpollready(&toRun, pd, mode)
}
}
return toRun
}
/*
epollwait 操作:
- epfd:epoll 事件表 fd 句柄
- ev:用于承载就绪 epoll event 的容器
- nev:ev 的容量
- timeout:
- -1:阻塞模式
- 0:非阻塞模式:
- >0:超时模式. 单位 ms
- 返回值 int32:就绪的 event 数量
*/
func epollwait(epfd int32, ev *epollevent, nev, timeout int32) int32
// It declares that the fd associated with pd is ready for I/O.
// The toRun argument is used to build a list of goroutines to return
// from netpoll. The mode argument is 'r', 'w', or 'r'+'w' to indicate
/*
根据 pd 以及 mode 标识的 io 就绪事件,获取需要进行 ready 唤醒的 g list
对应 g 会存储到 toRun 这个 list 容器当中
*/
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
var rg, wg *g
if mode == 'r' || mode == 'r'+'w' {
// 倘若到达事件包含读就绪,尝试获取需要 ready 唤醒的 g
rg = netpollunblock(pd, 'r', true)
}
if mode == 'w' || mode == 'r'+'w' {
// 倘若到达事件包含写就绪,尝试获取需要 ready 唤醒的 g
wg = netpollunblock(pd, 'w', true)
}
// 找到需要唤醒的 g,添加到 glist 中返回给上层
if rg != nil {
toRun.push(rg)
}
if wg != nil {
toRun.push(wg)
}
}
/*
根据指定的就绪io 事件类型以及 pollDesc,判断是否有 g 需要被唤醒. 若返回结果非空,则为需要唤醒的 g
*/
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
// 根据 io 事件类型,获取 pollDesc 中对应的状态标识器
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
for {
// 从 gpp 中取出值,此时该值应该为调用过 park 操作的 g
old := gpp.Load()
// ...
if ioready {
new = pdReady
}
// 通过 cas 操作,将 gpp 值由 g 置换成 pdReady
if gpp.CompareAndSwap(old, new) {
// 返回需要唤醒的 g
return (*g)(unsafe.Pointer(old))
}
}
}
到此这篇关于Golang中NetPoll机制的实现 的文章就介绍到这了,更多相关Golang NetPoll 内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!


最新评论