golang waitgroup辅助并发控制使用场景和方法解析

 更新时间:2023年09月27日 10:16:14   作者:lincoln_hlf1  
Golang 提供了简洁的 go 关键字来让开发者更容易的进行并发编程,同时也提供了 WaitGroup 对象来辅助并发控制,今天我们就来分析下 WaitGroup 的使用方法,顺便瞧一瞧它的底层源码

WaitGroup 的使用场景和方法

当我们有很多任务要同时进行时,如果并不需要关心各个任务的执行进度,那直接使用 go 关键字即可。

如果我们需要关心所有任务完成后才能往下运行时,则需要 WaitGroup 来阻塞等待这些并发任务了。

WaitGroup 如同它的字面意思,就是等待一组 goroutine 运行完成,主要有三个方法组成:

  • Add(delta int) :添加任务数
  • Wait():阻塞等待所有任务的完成
  • Done():完成任务

下面是它们的具体用法,具体的作用都在注释上:

package main
import (
    "fmt"
    "sync"
    "time"
)
func worker(wg *sync.WaitGroup) {
    doSomething()
    wg.Done() // 2.1、完成任务
}
func main() {
    var wg sync.WaitGroup
    wg.Add(5) // 1、添加 5 个任务
    for i := 1; i <= 5; i++ {
        go worker(&wg) // 2、每个任务并发执行
    }
    wg.Wait() // 3、阻塞等待所有任务完成
}

WaitGroup 源码分析

上面 WaitGroup 的使用很简单,接下来我们到 src/sync/waitgroup.go 里分析下它的源码。首先,是 WaitGroup 的结构体:

type WaitGroup struct {
    noCopy noCopy
    state1 [3]uint32
}

noCopy

其中,noCopy 表示 WaitGroup 是不可复制的。那么什么叫不可复制呢?

举个例子,当我们对函数参数定义了这个不可复制的类型时,开发者只能通过指针来传递函数参数。而规定使用指针传递又有什么好处呢?

好处在于如果有多个函数都定义了这个不可复制的参数时,那么这多个函数参数就可以共用同一个指针变量,来同步执行结果。而 WaitGroup 就是需要这样的约束规定。

state1 字段

接下来我们来看看 WaitGroup 的 state1 字段。state1 是一个包含了 counter 总数、 waiter 等待数、sema 信号量的 uint32 数组。

每当有 goroutine 调用了 Wait() 方法阻塞等待时,就会对 waiter 数量 + 1,然后等待信号量的唤起通知。

当我们调用 Add() 方法时,就会对 state1 的 counter 数量 + 1。

当调用 Done() 方法时就会对 counter 数量 -1。

直到 counter == 0 时就可以通过信号量唤起对应 waiter 数量的 goroutine 了,也就是唤起刚刚阻塞等待的 goroutine 们。

关于信号量的解释,可以参考下 golang 重要知识:mutex 里的相关介绍:

PV 原语解释:
通过操作信号量 S 来处理进程间的同步与互斥的问题。
S>0:表示有 S 个资源可用;S=0 表示无资源可用;S<0 绝对值表示等待队列或链表中的进程个数。信号量 S 的初值应大于等于 0。
P 原语:表示申请一个资源,对 S 原子性的减 1,若 减 1 后仍 S>=0,则该进程继续执行;若 减 1 后 S<0,表示已无资源可用,需要将自己阻塞起来,放到等待队列上。
V 原语:表示释放一个资源,对 S 原子性的加 1;若 加 1 后 S>0,则该进程继续执行;若 加 1 后 S<=0,表示等待队列上有等待进程,需要将第一个等待的进程唤醒。

此处操作系统可以理解为 Go 的运行时 runtime进程可以理解为协程

方法解释

最后,我们来深入 WaitGroup 的三个方法,进行源码分析。大家感兴趣的可以继续往下看,主要是对源码的分析注释。

Add(delta int) 方法

func (wg *WaitGroup) Add(delta int) {
    statep, semap := wg.state()
    if race.Enabled { // 此处是 go 的竞争检测,可以不用关心
        _ = *statep
        if delta < 0 {
            race.ReleaseMerge(unsafe.Pointer(wg))
        }
        race.Disable()
        defer race.Enable()
    }
    state := atomic.AddUint64(statep, uint64(delta)<<32)
    v := int32(state >> 32) // 获取 counter
    w := uint32(state) // 获取 waiter
    if race.Enabled && delta > 0 && v == int32(delta) { // go 的竞争检测,可以不用关心
        race.Read(unsafe.Pointer(semap))
    }
    if v < 0 {
        panic("sync: negative WaitGroup counter")
    }
    if w != 0 && delta > 0 && v == int32(delta) {
        panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    if v > 0 || w == 0 { // counter > 0:还有任务在执行;waiter == 0 表示没有在阻塞等待的 goroutine
        return
    }
    if *statep != state {
        panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    // 执行到此处相当于 countr = 0,即所有的任务都已执行完,需要唤起等待的 goroutine了
    *statep = 0
    for ; w != 0; w-- {
        runtime_Semrelease(semap, false, 0)
    }
}

Done 方法

func (wg *WaitGroup) Done() {
    wg.Add(-1) // 直接调用 Add 方法 对 counter -1
}

Wait 方法

func (wg *WaitGroup) Wait() {
    statep, semap := wg.state()
    if race.Enabled { // go 的竞争检测,可以不用关心
        _ = *statep
        race.Disable()
    }
    for {
        state := atomic.LoadUint64(statep)
        v := int32(state >> 32)
        w := uint32(state)
        if v == 0 {
            // counter 为 0, 不需要再等待了。
            if race.Enabled {
                race.Enable()
                race.Acquire(unsafe.Pointer(wg))
            }
            return
        }
        // waiters 数目 +1.
        if atomic.CompareAndSwapUint64(statep, state, state+1) {
            if race.Enabled && w == 0 {
                race.Write(unsafe.Pointer(semap)) // go 的竞争检测,可以不用关心
            }
            runtime_Semacquire(semap) // 阻塞等待唤起
            if *statep != 0 {
                panic("sync: WaitGroup is reused before previous Wait has returned")
            }
            if race.Enabled {
                race.Enable()
                race.Acquire(unsafe.Pointer(wg))
            }
            return
        }
    }
}

从这几个方法的源码,我们可以看出,Go 并没有使用 mutex 等锁去做字段值修改,而是采用了 atomic 原子操作来进行修改的。这是在底层硬件上支持的,所以性能更好。

总结

WaitGroup 比较简单,就是一些计数值的维护和 goroutine 的阻塞唤起。它的运用也简单,Add、Done、Wait 这三个方法经常是同时出现的。相信大伙深入到源码也能瞧出个大概,更多关于golang waitgroup并发控制的资料请关注脚本之家其它相关文章!

相关文章

  • Go简单实现协程池的实现示例

    Go简单实现协程池的实现示例

    本文主要介绍了Go简单实现协程池的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-06-06
  • go实现脚本解释器gscript

    go实现脚本解释器gscript

    这篇文章主要为大家介绍了go实现脚本解释器gscript示例代码,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-07-07
  • Go语言获取本机逻辑CPU数量的方法

    Go语言获取本机逻辑CPU数量的方法

    这篇文章主要介绍了Go语言获取本机逻辑CPU数量的方法,实例分析了runtime库的操作技巧,需要的朋友可以参考下
    2015-03-03
  • golang grpc 负载均衡的方法

    golang grpc 负载均衡的方法

    这篇文章主要介绍了golang grpc 负载均衡的方法,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-07-07
  • Go模板template用法详解

    Go模板template用法详解

    这篇文章主要介绍了Go标准库template模板用法详解;包括GO模板注释,作用域,语法,函数等知识,需要的朋友可以参考下
    2022-04-04
  • Go语言题解LeetCode599两个列表的最小索引总和

    Go语言题解LeetCode599两个列表的最小索引总和

    这篇文章主要为大家介绍了Go语言题解LeetCode599两个列表的最小索引总和示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-12-12
  • golang使用go test输出单元测试覆盖率的方式

    golang使用go test输出单元测试覆盖率的方式

    单元测试覆盖率是衡量代码质量的一个重要指标,重要的代码文件覆盖率应该至少达到80%以上,Java 可以通过JaCoCo 统计覆盖率,那么go 项目如何进行代码覆盖率测试呢,本文将给大家详细的介绍一下golang使用go test输出单元测试覆盖率的方式,需要的朋友可以参考下
    2024-02-02
  • Golang利用自定义模板发送邮件的方法详解

    Golang利用自定义模板发送邮件的方法详解

    这篇文章主要给大家介绍了关于Golang利用自定义模板发送邮件的方法,文中通过示例代码将实现的方法介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧。
    2017-10-10
  • 从零封装Gin框架配置初始化全局变量

    从零封装Gin框架配置初始化全局变量

    这篇文章主要为大家介绍了从零封装Gin框架配置初始化全局变量,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2024-01-01
  • golang架构设计开闭原则手写实现

    golang架构设计开闭原则手写实现

    这篇文章主要为大家介绍了golang架构设计开闭原则手写实例,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-07-07

最新评论