Go channel发送方和接收方如何相互阻塞等待源码解读

 更新时间:2023年12月18日 10:42:08   作者:菜皮日记  
这篇文章主要为大家介绍了Go channel发送方和接收方如何相互阻塞等待源码解读,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

并发编程的可见性

在 Go 官网上的内存模型一文中,介绍了在 Go 并发编程下数据可见性问题,可见性是并发编程中一个重要概念,指的是在哪些条件下,可以保证一个线程中读取某个变量时,可以观察到另一个线程对该变量的写入后的值,Go 语言中的 goroutine 也适用。

一般来说可见性属于偏硬件和底层,因为涉及到多核 CPU 的 cache 读写和同步问题,开发者不需要关心细节,高级编程语言要么屏蔽掉了这些细节,要么会给出一些保证,承诺在确定的条件下就会得到确定的结果。

Go channel 有一个特性是在一个无缓冲的 channel 上发送和接收必须等待对方准备好,才可以执行,否则会被阻塞。实际上这就是一个同步保证,那么这个同步保证是如何实现的?下面看看官方文章中是如何解释的。

先 send 后 receive

文中对 channel 的描述有几个原则,第一个是

A send on a channel is synchronized before the completion of the corresponding receive from that channel.

意思是:在一个 channel 上的发送操作应该发生在对应的接收操作完成之前。说人话就是:要先发送数据,然后才能接收数据,否则就会阻塞。这也比较符合一般的认知。

并用下面一段代码举例说明,这段代码确保一定会输出 "hello, world”。

var c = make(chan int, 10)
var a string

func f() {
    a = "hello, world"
    c <- 0
}

func main() {
    go f()
    <-c
    print(a)
}

f 函数负责给变量 a 赋值,main 函数负责打印变量 amain 函数阻塞等待在 <- c 处,直到 f 函数对 a 赋值之后并写入数据到 c 中,main 函数才被唤醒继续执行,所以此时打印 a 必然会得到结果。

先 receive 后 send?

而下面这段描述有点反直觉

A receive from an unbuffered channel is synchronized before the completion of the corresponding send on that channel.

意思是在无缓冲 channel 上的接收操作发生在对应的发送操作完成之前,说人话就是:要先接收数据,之后才可以发送数据,否则就会阻塞。这句话看上去与第一条相悖,因为第一条强调发送操作要在接收完成之前发生,而这一条强调接收操作要在发送完成之前发生,这样相互等待对方的情况,不会陷入死锁状态吗?

下面的示例代码与前一个类似,区别是将 c 换成了无缓冲 channel,并把 c 的写入和读取调换了位置,这段代码同样可以保证输出 "hello, world”。

var c = make(chan int)
var a string

func f() {
    a = "hello, world"
    <-c
}

func main() {
    go f()
    c <- 0
    print(a)
}

这两段话到底是什么意思?为什么要相互等待但又不会死锁?

接下来看看 runtime/chan.go 中是怎么实现 channel 的发送和接收的。

channel 的结构

首先看看 channel 的数据结构

type hchan struct {
    qcount   uint           // 缓冲区元素数量
    dataqsiz uint           // 缓冲区大小
    buf      unsafe.Pointer // 缓冲区起始指针
    elemsize uint16
    closed   uint32
    elemtype *_type
    sendx    uint   // 下一次发送的元素在队列中的索引
    recvx    uint   // 下一个接收的元素在队列中的索引
    recvq    waitq  // 当队列无数据时,receiver 阻塞等待的队列
    sendq    waitq  // 当队列无空间时,sender 阻塞等待的队列

    lock mutex
}

channel 内部实现了一个环形队列,通过 qcount dataqsiz buf sendx recvx 几个部分组成。

另外 channel 还维护了两个等待队列,如果在执行 <-c receive 操作时,此时 channel 不满足接收条件,receiver 会进入 recvq 等待队列;同样的如果执行 c<- send 操作时,此时 channel 不满足发送条件,sender 会进入 sendq 等待队列。

具体看代码:

var c = make(chan int)
var a string

func f() {
    a = "hello, world"

    x := <-c    // 3
    fmt.Println("\nx:", x)
}

func main() {
    go f()      // 1
    c <- 123456 // 2

    print(a)
}

send 具体干了什么

当 main 函数执行到 c<-123456 是,会执行 runtime/chan.go 中的 chansend 函数,该函数首先会判断当前 channel c 的等待接收队列是否有阻塞的 receiver

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  // ...省略部分代码...

  // 是否有等待的 receiver 存在
    if sg := c.recvq.dequeue(); sg != nil {
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }

  // ...省略部分代码...
}

如果有等待的 receiver 则弹出队列,调用 send 函数,其中 sg 就表示 receiversg.elem 表示将数据接收到哪里去,这个地址也就对应示例代码中的变量 x 的地址。

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
  // ...省略部分代码...

    if sg.elem != nil {
        sendDirect(c.elemtype, sg, ep)
        sg.elem = nil
    }

  // ...省略部分代码...
  // 将 goroutine 置为可执行状态
}

sendDirect 函数就是直接从 src 里面将数据复制到 dst 中。

// 直接拷贝数据
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
    dst := sg.elem
    typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
    memmove(dst, src, t.size)
}

回到 chansend 函数,如果没有等待的 receiver,那么会查看当前 buf 中是否有空间,如果有空间,则数据缓存到 buf 中。

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  // ...省略部分代码...

  // 将数据缓存到 buf 中
    if c.qcount < c.dataqsiz {
        // Space is available in the channel buffer. Enqueue the element to send.
        qp := chanbuf(c, c.sendx)
        if raceenabled {
            racenotify(c, c.sendx, nil)
        }
        typedmemmove(c.elemtype, qp, ep)
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return true
    }

  // ...省略部分代码...
}

如果也没有 buf 空间,那么就将 sender 本身放入到 sendq 等待队列中。

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  // ...省略部分代码...

  // 进入 sendq 等待队列
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
    c.sendq.enqueue(mysg)

  // ...省略部分代码...
}

总结起来 send 操作分三部分:

  • 如果当前 channel 上有等待的 receiver,则直接 copy 数据过去
  • 否则如果当前 buf 有空闲空间,则将数据存在 buf 中
  • 否则将 sender 本身加入到 sendq 等待队列中

receive 具体干了什么

相应的与发送类似,执行到示例代码中第 (3) 步接收数据时,会调用 runtime/chan.go 中的 chanrecv 函数来处理接收,同样是先看 sender 等待队列是否有阻塞的 sender

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  // ...省略部分代码...

  // 从等待的 sender 取一个出来
    if sg := c.sendq.dequeue(); sg != nil {
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }
  // ...省略部分代码...
}

如果有的等待的 sender,那么将 sender 取出来,并复制数据。

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    // ...省略部分代码...
    if ep != nil {
        // copy data from sender
        recvDirect(c.elemtype, sg, ep)
    }
  // ...省略部分代码...
}

func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
    src := sg.elem
    typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
    memmove(dst, src, t.size)
}

如果没有等待的 sender,那么看 buf 中有没有缓存的数据

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  // ...省略部分代码...
    if c.qcount > 0 {
        qp := chanbuf(c, c.recvx)
        if raceenabled {
            racenotify(c, c.recvx, nil)
        }
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        typedmemclr(c.elemtype, qp)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.qcount--
        unlock(&c.lock)
        return true, true
    }
  // ...省略部分代码...
}

最后如果也没有 buf 数据,那么久把自己加入到 receiver 等待队列中 recvq

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  // ...省略部分代码...

    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.param = nil
    c.recvq.enqueue(mysg)

  // ...省略部分代码...
}

总结起来 receive 操作分三部分:

  • 如果当前 channel 上有等待的 sender,则直接 copy 数据过去
  • 否则如果当前 buf 有缓存的数据,则将读取该数据
  • 否则将 receiver 本身加入到 recvq 等待队列中

小结

这样一来就能够理解前面的两个原则了,在一个无缓冲的 channel 中,无论是 sender 先执行,还是 receiver 先执行,都会因为找不到对方,并且没有 buf 空间的情况下,将自己加入到等待队列;当对方开始执行时就会检查到已经有对端正在阻塞,进而拷贝数据,并唤醒阻塞的对象最终走完整个流程。

有一种说法是:sender 必须在 receiver 准备好才能执行,否则就会阻塞;而 receiver 必须在 sender 准备好才能执行,否则就会阻塞;这个说法没错,但是太笼统了,什么叫准备好?怎么算是准备好?这是比较模糊的。而看过 send 和 receive 的流程之后,就更能理解整个过程了。

为什么要有无缓冲 channel

实际上两个 goroutine 相互等待对方到达某个状态的效果,非常类似操作系统中的一种同步机制:屏障 barrier,同步屏障要求只有当所有进程都到达屏障后,才能一起执行下一状态,否则就阻塞在屏障处。

回到 channel 操作,即 sender 和 receiver 无论谁先执行,都必须等待对方也已经执行,两者才可以继续执行。就像一块电路板串联有两个开关,要想电路联通,必须两个开关都被打开才可以,而不管哪一个先打开,都必须等待另一个开关也打开,之后电流才可以接通电路也才联通。

可以将无缓冲 channel 看做是一种同步屏障,同步屏障能够让多个 goroutine 都达到某种状态之后才可以继续执行,这是带缓冲 channel 无法做到的。另外在无缓冲 channel 数据的交换更加简单快速,因为不需要维护缓存 buf,实现逻辑也更简单,运行更可靠。

以上就是Go channel发送方和接收方如何相互阻塞等待源码解读的详细内容,更多关于Go channel相互阻塞等待的资料请关注脚本之家其它相关文章!

相关文章

  • 记一次go语言使用time.Duration类型踩过的坑

    记一次go语言使用time.Duration类型踩过的坑

    本文主要介绍了记一次go语言使用time.Duration类型踩过的坑,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-01-01
  • Golang语言实现gRPC的具体使用

    Golang语言实现gRPC的具体使用

    本文主要介绍了Golang语言实现gRPC的具体使用,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-08-08
  • win7下配置GO语言环境 + eclipse配置GO开发

    win7下配置GO语言环境 + eclipse配置GO开发

    这篇文章主要介绍了win7下配置GO语言环境 + eclipse配置GO开发,需要的朋友可以参考下
    2014-10-10
  • Golang爬虫框架colly使用浅析

    Golang爬虫框架colly使用浅析

    这篇文章主要介绍了Golang爬虫框架colly的使用,colly是Go实现的比较有名的一款爬虫框架,而且Go在高并发和分布式场景的优势也正是爬虫技术所需要的,感兴趣想要详细了解可以参考下文
    2023-05-05
  • Go语言map不支持并发写操作的原因

    Go语言map不支持并发写操作的原因

    Go语言为什么不支持并发读写map​,Go官方的说法是在多数情况下map只存在并发读操作,如果原生支持并发读写,即降低了并发读操作的性能,在使用 map 时,要特别注意是否存在对 map 的并发写操作,如果存在,要结合 sync 包的互斥锁一起使用,
    2024-01-01
  • golang程序使用alpine编译出最小arm镜像实现

    golang程序使用alpine编译出最小arm镜像实现

    这篇文章主要为大家介绍了golang程序使用alpine编译出最小arm镜像,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-12-12
  • golang逐行读取文件的操作

    golang逐行读取文件的操作

    这篇文章主要介绍了golang逐行读取文件的操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-12-12
  • Golang基于Vault实现敏感数据加解密

    Golang基于Vault实现敏感数据加解密

    数据加密是主要的数据安全防护技术之一,敏感数据应该加密存储在数据库中,降低泄露风险,本文将介绍一下利用Vault实现敏感数据加解密的方法,需要的可以参考一下
    2023-07-07
  • Go语言中的各类运算操作符详解

    Go语言中的各类运算操作符详解

    本文全面探讨了Go语言中的各类运算操作符,从基础的数学和位运算到逻辑和特殊运算符,文章旨在深入解析每一种运算操作符的工作原理、应用场景和注意事项,以帮助开发者编写更高效、健壮和可读的Go代码,</P><P>
    2023-09-09
  • 深入探究Go语言的错误策略与异常机制

    深入探究Go语言的错误策略与异常机制

    本文深入探讨了Go语言的错误策略与异常机制,主要介绍了错误处理的重要性,以及Go语言中的错误类型和处理函数,此外还讨论了Go语言的异常机制,包括panic和recover函数的使用,需要的朋友可以参考下
    2024-02-02

最新评论