Go调度器学习之协作与抢占详解

 更新时间:2023年04月04日 08:23:47   作者:IguoChan  
如果某个G执行时间过长,其他的G如何才能被正常调度,这就引出了接下来的话题:协作与抢占。本文将通过一些示例为大家详细讲讲调度器中协作与抢占的相关知识,需要的可以参考一下

0. 简介

在上篇博客——《Golang调度器(4)—goroutine调度》中一直遗留了一个没有解答的问题:如果某个G执行时间过长,其他的G如何才能被正常调度,这就引出了接下来的话题:协作与抢占

在Go语言的v1.2版本就实现饿了基于协作的抢占式调用,这种调用的基本原理就是:

  • sysmon监控线程发现有协程的执行时间太长了,那么会友好地为这个协程设置抢占标记;
  • 当这个协程调用(call)一个函数时,会检查是否扩容栈,而这里就会检查抢占标记,如果被标记,则会让出CPU,从而实现调度。

但是这种调度方式是协程主动的,是基于协作的,但是他无法面对一些场景,比如在死循环中没有任何调用发生,那么这个协程将永远执行下去,永远不会发生调度,这显然是不可接受的。

于是,在v1.14版本,Go终于引入了基于信号的抢占式调度,下面,我们将介绍一下这两种抢占调度。

1. 用户主动让出CPU:runtime.Gosched函数

在介绍两种抢占调度之前,我们首先介绍一下runtime.Gosched函数:

// Gosched yields the processor, allowing other goroutines to run. It does not
// suspend the current goroutine, so execution resumes automatically.
func Gosched() {
   checkTimeouts()
   mcall(gosched_m)
}

根据说明,runtime.Gosched函数会主动放弃当前处理器,并且允许其他协程执行,但是起并不会暂停自己,而只是让渡调度权,之后依赖调度器获得重新调度。

之后,会通过mcall函数切换到g0栈去执行gosched_m函数:

// Gosched continuation on g0.
func gosched_m(gp *g) {
   if trace.enabled {
      traceGoSched()
   }
   goschedImpl(gp)
}

gosched_m调用goschedImpl函数,其会为协程gp让渡出本M,并且将gp放到全局队列中,等待调度。

func goschedImpl(gp *g) {
   status := readgstatus(gp)
   if status&^_Gscan != _Grunning {
      dumpgstatus(gp)
      throw("bad g status")
   }
   casgstatus(gp, _Grunning, _Grunnable)
   dropg()            // 使当前m放弃gp,就是其参数 curg
   lock(&sched.lock)
   globrunqput(gp)    // 并且把gp放到全局队列中,等待调度
   unlock(&sched.lock)

   schedule()
}

虽然runtime.Gosched具有主动放弃CPU的能力,但是对用户的要求比较高,并非用户友好的。

2. 基于协作的抢占式调度

2.1 场景

package main

import (
   "fmt"
   "runtime"
   "sync"
   "time"
)

var once = sync.Once{}

func f() {
   once.Do(func() {
      fmt.Println("I am go routine 1!")
   })
}

func main() {
   defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(1))

   go func() {
      for {
         f()
      }
   }()

   time.Sleep(10 * time.Millisecond)
   fmt.Println("I am main goroutine!")
}

我们考虑如上代码,首先我们设置P的个数为1,然后起一个协程中进入死循环,循环调用一个函数,如果没有抢占调度,那么这个协程将一直占据P,也就是会一直占据CPU,代码就永远不可能执行到fmt.Println("I am main goroutine!")这行。下面我们看看,协作式抢占是怎么避免以上问题的。

2.2 栈扩张与抢占标记

$ go tool compile -N -l main.go
$ go tool objdump main.o >> main.i

我们通过以上指令,得到2.1中代码的汇编代码,截取f函数的汇编代码如下:

TEXT "".f(SB) gofile../home/chenyiguo/smb_share/go_routine_test/main.go
  main.go:12      0x151a       493b6610      CMPQ 0x10(R14), SP 
  main.go:12      0x151e       762b         JBE 0x154b    
  main.go:12      0x1520       4883ec18      SUBQ $0x18, SP    
  main.go:12      0x1524       48896c2410    MOVQ BP, 0x10(SP)  
  main.go:12      0x1529       488d6c2410    LEAQ 0x10(SP), BP  
  main.go:13      0x152e       488d0500000000    LEAQ 0(IP), AX    [3:7]R_PCREL:"".once      
  main.go:13      0x1535       488d1d00000000    LEAQ 0(IP), BX    [3:7]R_PCREL:"".f.func1·f  
  main.go:13      0x153c       e800000000    CALL 0x1541       [1:5]R_CALL:sync.(*Once).Do    
  main.go:16      0x1541       488b6c2410    MOVQ 0x10(SP), BP  
  main.go:16      0x1546       4883c418      ADDQ $0x18, SP    
  main.go:16      0x154a       c3       RET          
  main.go:12      0x154b       e800000000    CALL 0x1550       [1:5]R_CALL:runtime.morestack_noctxt   
  main.go:12      0x1550       ebc8         JMP "".f(SB)  

其中第一行,CMPQ 0x10(R14), SP就是比较SP0x10(R14)(其实就是stackguard0)的大小(注意AT&T格式下CMP系列指令的顺序),当SP小于等于0x10(R14)时,就会调转到0x154b地址调用runtime.morestack_noctxt,触发栈扩张操作。其实如果你仔细观察就会发现,所有的函数的序言(函数调用的最前方)都被插入了检测指令,除非在函数上标记//go:nosplit

接下来,我们将关注于两点来打通整个链路,即:

  • 栈扩张怎么重新调度,让出CPU的执行权?
  • 何时会设置栈扩张标记?

2.3 栈扩张怎么触发重新调度

// morestack but not preserving ctxt.
TEXT runtime·morestack_noctxt(SB),NOSPLIT,$0
   MOVL   $0, DX
   JMP    runtime·morestack(SB)

TEXT runtime·morestack(SB),NOSPLIT,$0-0
   ...

   // Set g->sched to context in f.
   MOVQ   0(SP), AX // f's PC
   MOVQ   AX, (g_sched+gobuf_pc)(SI)
   LEAQ   8(SP), AX // f's SP
   MOVQ   AX, (g_sched+gobuf_sp)(SI)
   MOVQ   BP, (g_sched+gobuf_bp)(SI)
   MOVQ   DX, (g_sched+gobuf_ctxt)(SI)

   ...
   CALL   runtime·newstack(SB)
   CALL   runtime·abort(SB)  // crash if newstack returns
   RET

以上代码中,runtime·morestack_noctxt调用runtime·morestack,在runtime·morestack中,会首先记录协程的PC和SP,然后调用runtime.newstack

func newstack() {
   ...

   gp := thisg.m.curg
   
   ...
   stackguard0 := atomic.Loaduintptr(&gp.stackguard0)

   ...
   preempt := stackguard0 == stackPreempt
   ...

   if preempt {
      if gp == thisg.m.g0 {
         throw("runtime: preempt g0")
      }
      if thisg.m.p == 0 && thisg.m.locks == 0 {
         throw("runtime: g is running but p is not")
      }

      if gp.preemptShrink {
         // We're at a synchronous safe point now, so
         // do the pending stack shrink.
         gp.preemptShrink = false
         shrinkstack(gp)
      }

      if gp.preemptStop {
         preemptPark(gp) // never returns
      }

      // Act like goroutine called runtime.Gosched.
      gopreempt_m(gp) // never return
   }

   ...
}

我们简化runtime.newstack函数,总结起来就是通过现有工作协程的stackguard0字段,来判断是不是应该发生抢占,如果需要的话,则调用gopreempt_m(gp)函数:

func gopreempt_m(gp *g) {
   if trace.enabled {
      traceGoPreempt()
   }
   goschedImpl(gp)
}

可以看到,gopreempt_m函数和前面讲到Gosched函数时说到的gosched_m函数一样,都将调用goschedImpl函数,为协程gp让渡出本M,并且将gp放到全局队列中,等待调度。

这里我们就明白了,一旦发生栈扩张,就有可能会发生让渡出执行权,进行重新调度的可能性,那什么时候会发生栈扩张呢?

2.4 何时设置栈扩张标记

在代码中,将stackguard0字段置为stackPreempt的地方有不少,但是和我们以上场景相符的还是在后台监护线程sysmon循环中,对于陷入系统调用和长时间运行的goroutine的运行权进行夺取的retake函数:

func sysmon() {
   ...

   for {
      ...
      // retake P's blocked in syscalls
      // and preempt long running G's
      if retake(now) != 0 {
         idle = 0
      } else {
         idle++
      }
      ...
   }
}
func retake(now int64) uint32 {
   ...
   for i := 0; i < len(allp); i++ {
      ...
      s := _p_.status
      sysretake := false
      if s == _Prunning || s == _Psyscall {
         // Preempt G if it's running for too long.
         t := int64(_p_.schedtick)
         if int64(pd.schedtick) != t {
            pd.schedtick = uint32(t)
            pd.schedwhen = now
         } else if pd.schedwhen+forcePreemptNS <= now { // forcePreemptNS=10ms
            preemptone(_p_) // 在这里设置栈扩张标记
            // In case of syscall, preemptone() doesn't
            // work, because there is no M wired to P.
            sysretake = true
         }
      }
      ...
   }
   unlock(&allpLock)
   return uint32(n)
}

其中,在preemptone函数中进行栈扩张标记的设置:

func preemptone(_p_ *p) bool {
   mp := _p_.m.ptr()
   if mp == nil || mp == getg().m {
      return false
   }
   gp := mp.curg
   if gp == nil || gp == mp.g0 {
      return false
   }

   gp.preempt = true

   // Every call in a goroutine checks for stack overflow by
   // comparing the current stack pointer to gp->stackguard0.
   // Setting gp->stackguard0 to StackPreempt folds
   // preemption into the normal stack overflow check.
   gp.stackguard0 = stackPreempt // 设置栈扩张标记

   // Request an async preemption of this P.
   if preemptMSupported && debug.asyncpreemptoff == 0 {
      _p_.preempt = true
      preemptM(mp)
   }

   return true
}

通过以上,我们串通起了goroutine协作式抢占的逻辑:

  • 首先,后台监控线程会对运行时间过长(≥10ms)的协程设置栈扩张标记;
  • 协程运行到任何一个函数的序言的时候,都会首先检查栈扩张标记;
  • 如果需要进行栈扩张,在进行栈扩张的时候,会夺取这个协程的运行权,从而实现抢占式调度。

3. 基于信号的抢占式调度

分析以上结论我们可以知道,上述抢占触发逻辑有一个致命的缺点,那就是必须要运行到函数栈的序言部分,而这根本无法读取以下协程的运行权,在Go的1.14版本之前,一下代码不会打印最后一句"I am main goroutine!"

package main

import (
   "fmt"
   "runtime"
   "sync"
   "time"
)

var once = sync.Once{}

func main() {
   defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(1))

   go func() {
      for {
         once.Do(func() {
            fmt.Println("I am go routine 1!")
         })
      }
   }()

   time.Sleep(10 * time.Millisecond)
   fmt.Println("I am main goroutine!")
}

因为以上协程中的for循环是个死循环,且并不会包含栈扩张逻辑,所以不会让渡出自身的执行权。

3.1 发送抢占信号

为此,Go SDK引入了基于信号的抢占式调度。我们注意分析上一节preemptone函数代码中有以下部分:

if preemptMSupported && debug.asyncpreemptoff == 0 {
   _p_.preempt = true
   preemptM(mp)
}

其中preemptM函数会发送_SIGURG信号给需要抢占的线程:

const sigPreempt = _SIGURG


func preemptM(mp *m) {
   // On Darwin, don't try to preempt threads during exec.
   // Issue #41702.
   if GOOS == "darwin" || GOOS == "ios" {
      execLock.rlock()
   }

   if atomic.Cas(&mp.signalPending, 0, 1) {
      if GOOS == "darwin" || GOOS == "ios" {
         atomic.Xadd(&pendingPreemptSignals, 1)
      }

      // If multiple threads are preempting the same M, it may send many
      // signals to the same M such that it hardly make progress, causing
      // live-lock problem. Apparently this could happen on darwin. See
      // issue #37741.
      // Only send a signal if there isn't already one pending.
      signalM(mp, sigPreempt)
   }

   if GOOS == "darwin" || GOOS == "ios" {
      execLock.runlock()
   }
}

3.2 抢占调用的注入

说到这里,我们就需要回到最开始,在第一个协程m0开启mstart的调用链路上,会调用mstartm0函数,在这里会调用initsig

func initsig(preinit bool) {
  ...

   for i := uint32(0); i < _NSIG; i++ {
      ...

      handlingSig[i] = 1
      setsig(i, abi.FuncPCABIInternal(sighandler))
   }
}

在以上,注册了sighandler函数:

func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) {
   ...

   if sig == sigPreempt && debug.asyncpreemptoff == 0 {
      // Might be a preemption signal.
      doSigPreempt(gp, c)
      // Even if this was definitely a preemption signal, it
      // may have been coalesced with another signal, so we
      // still let it through to the application.
   }

   ...
}

然后接收到sigPreempt信号时,会通过doSigPreempt函数处理如下:

func doSigPreempt(gp *g, ctxt *sigctxt) {
   // Check if this G wants to be preempted and is safe to
   // preempt.
   if wantAsyncPreempt(gp) {
      if ok, newpc := isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()); ok {
         // Adjust the PC and inject a call to asyncPreempt.
         ctxt.pushCall(abi.FuncPCABI0(asyncPreempt), newpc) // 插入抢占调用
      }
   }

   // Acknowledge the preemption.
   atomic.Xadd(&gp.m.preemptGen, 1)
   atomic.Store(&gp.m.signalPending, 0)

   if GOOS == "darwin" || GOOS == "ios" {
      atomic.Xadd(&pendingPreemptSignals, -1)
   }
}

最终,doSigPreempt—>asyncPreempt->asyncPreempt2

func asyncPreempt2() {
   gp := getg()
   gp.asyncSafePoint = true
   if gp.preemptStop {
      mcall(preemptPark)
   } else {
      mcall(gopreempt_m)
   }
   gp.asyncSafePoint = false
}

然后,又回到了我们熟悉的gopreempt_m函数,这里就不赘述了。

所以对于基于信号的抢占调度,总结如下:

  • M1发送信号_SIGURG
  • M2接收到信号,并通过信号处理函数进行处理;
  • M2修改执行的上下文,并恢复到修改后的位置;
  • 重新进入调度循环,进而调度其他goroutine

4. 小结

总的来说,Go的调度策略的发展,也是随着需求的丰富而逐步发展的,协作式调度能够保证具备函数调用的用户 Goroutine 正常停止; 抢占式调度则能避免由于死循环导致的任意时间的垃圾回收延迟。

到此这篇关于Go调度器学习之协作与抢占详解的文章就介绍到这了,更多相关Go调度器内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • go Gin安装及初始化操作示例

    go Gin安装及初始化操作示例

    这篇文章主要介绍了gin安装及初始化,修改启动端口,get/post 请求参数,模型绑定shouldbind,自定义验证器/表单验证,等操作步骤,有需要的朋友可以借鉴参考下
    2022-04-04
  • go gin 正确读取http response body内容并多次使用详解

    go gin 正确读取http response body内容并多次使用详解

    这篇文章主要为大家介绍了go gin 正确读取http response body内容并多次使用解决思路,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-01-01
  • Go语言中错误处理的方式总结

    Go语言中错误处理的方式总结

    这篇文章会结合 errors 中的函数,来讨论一下 Go 中常见的 error 使用方式,文中的示例代码讲解详细,具有一定的学习价值,需要的可以了解一下
    2023-07-07
  • 详解如何在Go服务中做链路追踪

    详解如何在Go服务中做链路追踪

    使用 Go 语言开发微服务的时候,需要追踪每一个请求的访问链路,本文主要介绍了如何在Go 服务中做链路追踪,感兴趣的可以了解一下
    2021-09-09
  • 秒懂Golang匿名函数

    秒懂Golang匿名函数

    所谓匿名函数,就是没有名字的函数,本文重点给大家介绍Golang匿名函数的相关知识,通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-02-02
  • golang 设置web请求状态码操作

    golang 设置web请求状态码操作

    这篇文章主要介绍了golang 设置web请求状态码操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-12-12
  • 详解Go语言中的逃逸分析

    详解Go语言中的逃逸分析

    逃逸分析是编译器用于决定将变量分配到栈上还是堆上的一种行为,下面小编就来为大家详细讲讲go语言中是如何进行逃逸分析的,需要的小伙伴可以参考下
    2023-09-09
  • Go语言对JSON进行编码和解码的方法

    Go语言对JSON进行编码和解码的方法

    这篇文章主要介绍了Go语言对JSON进行编码和解码的方法,涉及Go语言操作json的技巧,具有一定参考借鉴价值,需要的朋友可以参考下
    2015-02-02
  • Go语言使用Timeout Context取消任务的实现

    Go语言使用Timeout Context取消任务的实现

    本文主要介绍了Go语言使用Timeout Context取消任务的实现,包括基本的任务取消和控制HTTP客户端请求的超时,具有一定的参考价值,感兴趣的可以了解一下
    2024-01-01
  • Go语言CSP并发模型goroutine及channel底层实现原理

    Go语言CSP并发模型goroutine及channel底层实现原理

    这篇文章主要为大家介绍了Go语言CSP并发模型goroutine channel底层实现原理,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-05-05

最新评论