Golang中NetPoll机制的实现

 更新时间:2025年12月23日 11:13:46   作者:AI吴界  
本文主要介绍了Golang中NetPoll机制的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

Linux 网络IO

Linux 的阻塞网络 I/O (输入/输出) 是指在进行网络操作(如 read() 或 write())时,如果操作无法立即完成,调用线程将被操作系统“阻塞”,直到操作成功或失败才返回。它属于同步 I/O 模型的一种,与之相对的是非阻塞 I/O。

int listenfd = socket(AF_INET, SOCK_STREAM, 0);

struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htonl(INADDR_ANY); // 绑定到所有可用接口
server_addr.sin_port = htons(8080); // 绑定到 8080 端口
bind(listenfd, (struct sockaddr *)&server_addr, sizeof(server_addr));

listen(listenfd, 5); // 允许 5 个待处理的连接

// 阻塞等待
struct sockaddr_in client_addr;
socklen_t client_addr_len = sizeof(client_addr);
int connectfd = accept(listenfd, (struct sockaddr *)&client_addr, &client_addr_len);

Linux 非阻塞IO Select、Poll、Epoll

核心机制:这些系统调用让用户进程在调用前,能预先指定一系列要监控的文件描述符。当一个或多个文件描述符准备好进行读写操作时,系统会通知用户进程。

轮询与事件驱动
select 和 poll: 它们采用轮询方式,每次都检查所有传入的文件描述符集合来判断哪些是活跃的。

(1)select==>时间复杂度O(n)

它仅仅知道了,有I/O事件发生了,却并不知道是哪那几个流(可能有一个,多个,甚至全部),我们只能无差别轮询所有流,找出能读出数据,或者写入数据的流,对他们进行操作。所以select具有O(n)的无差别轮询复杂度,同时处理的流越多,无差别轮询时间就越长。

(2)poll==>时间复杂度O(n)

poll本质上和select没有区别,它将用户传入的数组拷贝到内核空间,然后查询每个fd对应的设备状态, 但是它没有最大连接数的限制,原因是它是基于链表来存储的.

(3)epoll==>时间复杂度O(1)

epoll: 它采用事件驱动模型,通过内核的回调机制,只在需要时通知用户进程,效率更高,尤其是在处理大量并发连接时。epoll 机制通过在内核中维护一个就绪列表,当数据到达时,将对应的节点加入就绪列表,然后唤醒等待的用户进程,从而避免了不必要的轮询

Golang 中Epoll 应用

一个简单的网络IO

// 启动 tcp server 代码示例
func main() {
	/*
		- 创建 tcp 端口监听器
		    - 创建 socket fd,bind、accept
			- 创建 epoll 事件表(epoll_create)
			- socket fd 注册到 epoll 事件表(epoll_ctl:add)

	*/
	l, _ := net.Listen("tcp", ":8080")
	// eventloop reactor 模型
	for {
		/*
			- 等待 tcp 连接到达
			    - loop + 非阻塞模式调用 accept
				- 若未就绪,则通过 gopark 进行阻塞
				- 等待 netpoller 轮询唤醒
				     - 检查是否有 io 事件就绪(epoll_wait——nonblock)
					 - 若发现事件就绪 通过 goready 唤醒 g
				- accept 获取 conn fd 后注册到 epoll 事件表(epoll_ctl:add)
				- 返回 conn
		*/
		conn, _ := l.Accept()
		// goroutine per conn
		go serve(conn)
	}
}

// 处理一笔到来的 tcp 连接
func serve(conn net.Conn) {
	/*
		- 关闭 conn
		   - 从 epoll 事件表中移除该 fd(epoll_ctl:remove)
		   - 销毁该 fd
	*/
	defer conn.Close()
	var buf []byte
	/*
		- 读取连接中的数据
		   - loop + 非阻塞模式调用 recv (read)
		   - 若未就绪,则通过 gopark 进行阻塞
		   - 等待 netpoller 轮询唤醒
			    - 检查是否有 io 事件就绪(epoll_wait——nonblock)
				- 若发现事件就绪 通过 goready 唤醒 g
	*/
	_, _ = conn.Read(buf)
	/*
		- 向连接中写入数据
		   - loop + 非阻塞模式调用 writev (write)
		   - 若未就绪,则通过 gopark 进行阻塞
		   - 等待 netpoller 轮询唤醒
			    - 检查是否有 io 事件就绪(epoll_wait:nonblock)
				- 若发现事件就绪 通过 goready 唤醒 g
	*/
	_, _ = conn.Write(buf)
}

1、调用Listen方法,通过epoll_create 初始化事件表, 创建 socket fd, 通过epoll_ctl 将socket fd注册到Epoll事件表, 监听就绪事件,等待远端连接。如果有远端连接,则会在内核空间执行回调函数,将socket fd放入就绪列表中

 // 在 epoll 事件表中注册监听 r 的读就绪事件
	ev := epollevent{
		events: _EPOLLIN,
	}

2、调用Accept方法,非阻塞调用Epoll Wait,获取当前监听的fd是否有就绪的socket fd
2.1、如果没有则将当前的goroutine阻塞,并挂载在socket fd 对应的polldesc的rg等待链表中,等待环境
2.2、如果有,则获取conn fd返回给用户程序

// Accept wraps the accept network call.
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
        // ...
        for {
                // 以nonblock 模式发起一次 syscall accept 尝试接收到来的 conn
		s, rsa, errcall, err := accept(fd.Sysfd)
                // 接收conn成功,直接返回结果
                if err == nil {
			return s, rsa, "", err
		}
		switch err {
                // 中断类错误直接忽略
		case syscall.EINTR:
			continue
                // 当前未有到达的conn 
		case syscall.EAGAIN:
       // 走入 poll_wait 流程,并标识关心的是 socket fd 的读就绪事件
       // (当conn 到达时,表现为 socket fd 可读)
			if fd.pd.pollable() {
         // 倘若读操作未就绪,当前g 会 park 阻塞在该方法内部,直到因超时或者事件就绪而被 netpoll ready 唤醒
				if err = fd.pd.waitRead(fd.isFile); err == nil {
					continue
				}
			}
                        // ...
		}
                // ...
	}
}

// 指定 mode 为 r 标识等待的是读就绪事件,然后走入更底层的 poll_wait 流程
func (pd *pollDesc) waitRead(isFile bool) error {
	return pd.wait('r', isFile)
}

3、调用Read、Write方法。非阻塞调用Epoll Wait,关心conn fd上是否有读就绪事件或者写就绪事件
3.1、如果 conn fd 下读操作尚未就绪(尚无数据到达),则会执行 poll wait 将当前 g 阻塞并挂载到 conn fd 对应 pollDesc 的 rg 中
3.2、如果 conn fd 下写操作尚未就绪(缓冲区空间不足),则会执行 poll wait 将当前 g 阻塞并挂载到 conn fd 对应 pollDesc 的wg中

// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
        // ... 
	for {
                // 以非阻塞模式执行一次syscall read 操作 
		n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
		if err != nil {
			n = 0
                        // 走入 poll_wait 流程,并标识关心的是该 fd 的读就绪事件
			if err == syscall.EAGAIN && fd.pd.pollable() {
                                // 倘若读操作未就绪,当前g 会 park 阻塞在该方法内部,直到因超时或者事件就绪而被 netpoll ready 唤醒
				if err = fd.pd.waitRead(fd.isFile); err == nil {
					continue
				}
			}
		}
		err = fd.eofError(n, err)
		return n, err
	}
}

4、小结
可以看到,一个连接的数据读写,如果条件不满足,goroutine都会挂起并挂载到pollDesc的rg、wg中,但此时用户程序主线程不会阻塞,而是等待底层数据到达后,再进行处理。

NetPoll的调度

由上可知,当 g 发现关心的 io 事件未就绪时,会通过 gopark 操作将自身陷入阻塞,并且将 g 挂载在 pollDesc 的 rg/wg 中, 而本小节介绍的 net_poll 流程就负责轮询获取已就绪 pollDesc 对应的 g,将其返回给上游的 gmp 调度系统,对其进行唤醒和调度.

Golang GMP模型触发netpoll时机
1、M在寻找可用的Goroutine时,在本地和全局队列上没有找到,会调用netpoll处理网络IO

// gmp 核心调度流程:g0 为当前 p 找到下一个调度的  g
    /*
        pick g 的核心逻辑:
             1)每调度 61 次,需要专门尝试处理一次全局队列(防止饥饿)
             2)尝试从本地队列中获取 g
             3)尝试从全局队列中获取 g
             4)以【非阻塞模式】调度 netpoll 流程,获取所有需要唤醒的 g 进行唤醒,并获取其中的首个g
             5)从其他 p 中窃取一半的 g 填充到本地队列
             6)仍找不到合适的 g,则协助 gc 
             7)以【阻塞或者超时】模式,调度netpoll 流程(全局仅有一个 p 能走入此分支)
             8)当前m 添加到全局队列的空闲队列中,停止当前 m
    */

func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
    // ..
    /*
      同时满足下述三个条件,发起一次【非阻塞模式】的 netpoll 流程:
        - epoll事件表初始化过
        - 有 g 在等待io 就绪事件
        - 没有空闲 p 在以【阻塞或超时】模式发起 netpoll 流程
    */ 
    if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
                // 以非阻塞模式发起一轮 netpoll,如果有 g 需要唤醒,一一唤醒之,并返回首个 g 给上层进行调度
		if list := netpoll(0); !list.empty() { // non-blocking
                // 获取就绪 g 队列中的首个 g
                        gp := list.pop()
                        // 将就绪 g 队列中其余 g 一一置为就绪态,并添加到全局队列
			injectglist(&list)
                        // 把首个g 也置为就绪态
			casgstatus(gp, _Gwaiting, _Grunnable)
                         // ... 
                         //返回 g 给当前 p进行调度
			return gp, false, false
		}
	}
 
    // ...
    /*
        同时满足下述三个条件,发起一次【阻塞或超时模式】的 netpoll 流程:
            - epoll事件表初始化过
            - 有 g 在等待io 就绪事件
            - 没有空闲 p 在以【阻塞或超时】模式发起 netpoll 流程
    */	
	if netpollinited() && (atomic.Load(&netpollWaiters) > 0 || pollUntil != 0) && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
                // 默认为阻塞模式 
		delay := int64(-1)
                // 存在定时时间,则设为超时模式
		if pollUntil != 0 {
			delay = pollUntil - now
                        // ... 
		}
                // 以【阻塞或超时模式】发起一轮 netpoll
		list := netpoll(delay) // block until new work is available 
	} 
        // ... 
}

2、Sysmon 定时会调用netpoll处理网络IO

// The main goroutine.
func main() {
        // ...
        // 新建一个 m,直接运行 sysmon 函数
	systemstack(func() {
		newm(sysmon, nil, -1)
	})

         // ...
}

// 全局唯一监控线程的执行函数
func sysmon() {
        // ...
	for {
              // ...
        /*
        同时满足下述三个条件,发起一次【非阻塞模式】的 netpoll 流程:
            - epoll事件表初始化过
            - 没有空闲 p 在以【阻塞或超时】模式发起 netpoll 流程
            - 距离上一次发起 netpoll 流程的时间间隔已超过 10 ms
        */ 
		lastpoll := int64(atomic.Load64(&sched.lastpoll))
		if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
                        // 以非阻塞模式发起 netpoll
			list := netpoll(0) // non-blocking - returns list of goroutines
                        // 获取到的  g 置为就绪态并添加到全局队列中
			if !list.empty() {
                                // ...
				injectglist(&list)
                                // ...
			}
		}
                // ... 
	}
}

3、gc并发标记的流程

func startTheWorldWithSema(emitTraceEvent bool) int64 {
        // 断言世界已停止
	assertWorldStopped()
        // ...
        // 如果 epoll 事件表初始化过,则以非阻塞模式执行一次 netpoll
	if netpollinited() {
                // 所有取得的 g 置为就绪态并添加到全局队列
		list := netpoll(0) // non-blocking
		injectglist(&list)
	}
        // ...
}

当上述条件成立时,Netpoll执行 epoll_wait 操作,获取就绪的 io 事件 list. 一轮最多获取 128 个,根据就绪事件类型,将 mode 分为 w(写就绪事件)和 r(读就绪事件)。获取 pollDesc 实例中 rg或者wg中的 g 实例,一并返回GMP进行调度

// netpoll checks for ready network connections.
// Returns list of goroutines that become runnable.
/*
    - netpoll 流程用于轮询检查是否有就绪的 io 事件
    - 如果有就绪 io 事件,还需要检查是否有 pollDesc 中的 g 关心该事件
    - 找到所有关心该就绪 io 事件的 g,添加到 list 中返回给上游进行 goready 唤醒
*/
func netpoll(delay int64) gList {
     /*
        根据传入的 delay 参数,决定调用 epoll_wait 的模式
            - delay < 0:设为 -1 阻塞模式(在 gmp 调度流程中,如果某个 p 迟迟获取不到可执行的 g 时,会通过该模式,使得 thread 陷入阻塞态,但该情况全局最多仅有一例)
            - delay = 0:设为 0 非阻塞模式(通常情况下为此模式,包括 gmp 常规调度流程、gc 以及全局监控线程 sysmon 都是以此模式触发的 netpoll 流程)
            - delay > 0:设为超时模式(在 gmp 调度流程中,如果某个 p 迟迟获取不到可执行的 g 时,并且通过 timer 启动了定时任务时,会令 thread 以超时模式执行 epoll_wait 操作)
    */
     var waitms int32
     if delay < 0 {
		waitms = -1
     } else if delay == 0 {
		waitms = 0
     // 针对 delay 时长取整
     } else if delay < 1e6 {
		waitms = 1
     } else if delay < 1e15 {
		waitms = int32(delay / 1e6)
     } else {
     // 1e9 ms == ~11.5 days.
		waitms = 1e9
     }
     // 一次最多接收 128 个 io 就绪事件	
     var events [128]epollevent
retry:
     // 以指定模式,调用 epoll_wait 指令
	n := epollwait(epfd, &events[0], int32(len(events)), waitms)
     // ...
 
     // 遍历就绪的每个 io 事件	
     var toRun gList
     for i := int32(0); i < n; i++ {
	   ev := &events[i]
           if ev.events == 0 {
                continue
           }

           // pipe 接收端的信号量处理
           if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd {
           // ...
            }
 
            /*
             根据 io 事件类型,标识出 mode:
                 - EPOLL_IN -> r;
                 - EPOLL_OUT -> w;
                 - 错误或者中断事件 -> r & w;
             */
            var mode int32
            if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
			mode += 'r'
            }
            if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
			mode += 'w'
            }
            // 根据 epollevent.data 获取到监听了该事件的 pollDesc 实例
            if mode != 0 {
			pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
                        // ...			
                        // 尝试针对对应 pollDesc 进行唤醒操作
                        netpollready(&toRun, pd, mode)
            }
      }
      return toRun
}




/*
    epollwait 操作:
        - epfd:epoll 事件表 fd 句柄
        - ev:用于承载就绪 epoll event 的容器
        - nev:ev 的容量
        - timeout:
            - -1:阻塞模式
            - 0:非阻塞模式:
            - >0:超时模式. 单位 ms
        - 返回值 int32:就绪的 event 数量
*/
func epollwait(epfd int32, ev *epollevent, nev, timeout int32) int32





// It declares that the fd associated with pd is ready for I/O.
// The toRun argument is used to build a list of goroutines to return
// from netpoll. The mode argument is 'r', 'w', or 'r'+'w' to indicate
/*
 根据 pd 以及 mode 标识的 io 就绪事件,获取需要进行 ready 唤醒的 g list
 对应 g 会存储到 toRun 这个 list 容器当中
*/
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
	var rg, wg *g
	if mode == 'r' || mode == 'r'+'w' {
        // 倘若到达事件包含读就绪,尝试获取需要 ready 唤醒的 g
		rg = netpollunblock(pd, 'r', true)
	}
	if mode == 'w' || mode == 'r'+'w' {
 // 倘若到达事件包含写就绪,尝试获取需要 ready 唤醒的 g
		wg = netpollunblock(pd, 'w', true)
	}
 // 找到需要唤醒的 g,添加到 glist 中返回给上层
	if rg != nil {
		toRun.push(rg)
	}
	if wg != nil {
		toRun.push(wg)
	}
}




/*
 根据指定的就绪io 事件类型以及 pollDesc,判断是否有 g 需要被唤醒. 若返回结果非空,则为需要唤醒的 g
*/
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
        // 根据 io 事件类型,获取 pollDesc 中对应的状态标识器
        gpp := &pd.rg
	if mode == 'w' {
		gpp = &pd.wg
	}

	for {
                // 从 gpp 中取出值,此时该值应该为调用过 park 操作的 g
		old := gpp.Load()
                 // ... 
		if ioready {
			new = pdReady
		}
                // 通过 cas 操作,将 gpp 值由 g 置换成 pdReady
		if gpp.CompareAndSwap(old, new) {
                        // 返回需要唤醒的 g 
			return (*g)(unsafe.Pointer(old))
		}
	}
}

到此这篇关于Golang中NetPoll机制的实现 的文章就介绍到这了,更多相关Golang NetPoll 内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 一文掌握Go语言并发编程必备的Mutex互斥锁

    一文掌握Go语言并发编程必备的Mutex互斥锁

    Go 语言提供了 sync 包,其中包括 Mutex 互斥锁、RWMutex 读写锁等同步机制,本篇博客将着重介绍 Mutex 互斥锁的基本原理,需要的可以参考一下
    2023-04-04
  • Go语言使用组合的思想实现继承

    Go语言使用组合的思想实现继承

    这篇文章主要为大家详细介绍了在 Go 里面如何使用组合的思想实现“继承”,文中的示例代码讲解详细,对我们学习Go语言有一定的帮助,需要的可以了解一下
    2022-12-12
  • Goland项目使用gomod配置的详细步骤

    Goland项目使用gomod配置的详细步骤

    Goland是一个用于Go语言开发的IDE,Goland的项目结构与Go语言的项目结构相似,下面这篇文章主要给大家介绍了关于Goland项目使用gomod配置的详细步骤,文中通过图文介绍的非常详细,需要的朋友可以参考下
    2023-04-04
  • 一文带你掌握Go语言运算符的使用

    一文带你掌握Go语言运算符的使用

    运算符用于在程序运行时执行数学或逻辑运算。Go 语言内置的运算符有:算术运算符、关系运算符、逻辑运算符、位运算符、赋值运算符、其他运算符。本文将带大家详细了解一下这些运算符的使用,感兴趣的可以了解一下
    2022-04-04
  • Go语言利用ffmpeg转hls实现简单视频直播

    Go语言利用ffmpeg转hls实现简单视频直播

    这篇文章主要为大家介绍了Go语言利用ffmpeg转hls实现简单视频直播,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-04-04
  • Golang判断两个链表是否相交的方法详解

    Golang判断两个链表是否相交的方法详解

    这篇文章主要为大家详细介绍了如何通过Golang判断两个链表是否相交,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下
    2023-03-03
  • Go-RESTful实现下载功能思路详解

    Go-RESTful实现下载功能思路详解

    这篇文章主要介绍了Go-RESTful实现下载功能,文件下载包括文件系统IO和网络IO,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2022-10-10
  • 详解Go语言设计模式之单例模式

    详解Go语言设计模式之单例模式

    单例模式很容易记住。就像名称一样,它只能提供对象的单一实例,保证一个类只有一个实例,并提供一个全局访问该实例的方法。本文就来聊聊Go语言中的单例模式,感兴趣的小伙伴可以了解一下
    2022-10-10
  • 轻松入门:使用Golang开发跨平台GUI应用

    轻松入门:使用Golang开发跨平台GUI应用

    Golang是一种强大的编程语言,它的并发性和高性能使其成为开发GUI桌面应用的理想选择,Golang提供了丰富的标准库和第三方库,可以轻松地创建跨平台的GUI应用程序,通过使用Golang的GUI库,开发人员可以快速构建具有丰富用户界面和交互功能的应用程序,需要的朋友可以参考下
    2023-10-10
  • Go设计模式之访问者模式讲解和代码示例

    Go设计模式之访问者模式讲解和代码示例

    访问者是一种行为设计模式, 允许你在不修改已有代码的情况下向已有类层次结构中增加新的行为,本文将通过代码示例给大家详细的介绍一下Go设计模式之访问者模式,需要的朋友可以参考下
    2023-08-08

最新评论