Go进阶并发控制channel和WaitGroup的实现
1.Channel
channel一般用于协程之间的通信.不过channel也可以用于并发控制.比如主协程启动N个子协程.主协程等待所有子协程退出后再继续后续流程.这种场景下channel也可轻易实现并发控制.
场景示例:
package main
import (
"fmt"
"gomodule/data"
_ "gomodule/pubsub"
"time"
)
func main() {
//创建一个有10个元素的channel
channel := make([]chan int, 10)
for i := 0; i < 10; i++ {
//切片中放入一个channel
channel[i] = make(chan int)
//启动协程
go Process(channel[i])
}
for i, ch := range channel {
<-ch
fmt.Println("Routine ", i, "quit!")
}
}
func Process(ch chan int) {
time.Sleep(1 * time.Second)
//管道写入一个元素代表协程结束.
ch <- 1
}上面程序通过创建N个channel管理N个协程.每个协程都有一个channel用于与父协程通信.父协程创建完所有协程等待所有协程结束.
优点:
实现简单.
缺点:
需要大量创建协程就需要相同数量的channel.对于子协程继续派生出来的协程不方便控制.
2.WaitGroup:
WaitGroup可理解为Wait-Goroutine-Group.等待一组goroutine结束.示例如下.
package main
import (
"fmt"
"gomodule/data"
_ "gomodule/pubsub"
"sync"
"time"
)
func main() {
var wg sync.WaitGroup
//计数器.数值是goroutine的个数.
wg.Add(2)
go func() {
time.Sleep(1 * time.Second)
fmt.Println("goroutine 1 finish")
//计数器减一.
wg.Done()
}()
go func() {
time.Sleep(2 * time.Second)
fmt.Println("goroutine 2 finish")
wg.Done()
}()
//主goroutine阻塞等待计数器变为0.
wg.Wait()
fmt.Println("all goroutine finish")
}执行结果如下:

1).启动goroutine前通过Add(2)方法将计数器设置为待启动goroutine个数.
2).启动goroutine通过wait方法阻塞自己.等待计数器变为0.
3).每个goroutine执行结束后通过Done方法将计数器减1.
4).计数器变为0后.阻塞的goroutine被唤醒.
2.1信号量:
信号量是UNIX系统提供的一种保护共享资源的机制.用于防止多个线程同时访问某个资源.当信号量>0时.表示资源可用.获取信号量时系统自动将信号量减一.当信号量=0时.表示资源暂时不可用.获取信号量时.当前线程会进入睡眠.当信号量为正时被唤醒.
2.2WaitGroup数据结构:
源码位于src/sync/waitgroup.go:WaitGroup中.结构如下.
type WaitGroup struct {
noCopy noCopy
state atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count.
sema uint32
}state代表信号量.高32位代表信号量数量.低32位代表等待数量.
2.3WaitGroup对外方法:
Add(delta int):
func (wg *WaitGroup) Add(delta int) {
if race.Enabled {
if delta < 0 {
// Synchronize decrements with Wait.
race.ReleaseMerge(unsafe.Pointer(wg))
}
race.Disable()
defer race.Enable()
}
state := wg.state.Add(uint64(delta) << 32)
v := int32(state >> 32)
w := uint32(state)
if race.Enabled && delta > 0 && v == int32(delta) {
// The first increment must be synchronized with Wait.
// Need to model this as a read, because there can be
// several concurrent wg.counter transitions from 0.
race.Read(unsafe.Pointer(&wg.sema))
}
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
if v > 0 || w == 0 {
return
}
// This goroutine has set counter to 0 when waiters > 0.
// Now there can't be concurrent mutations of state:
// - Adds must not happen concurrently with Wait,
// - Wait does not increment waiters if it sees counter == 0.
// Still do a cheap sanity check to detect WaitGroup misuse.
if wg.state.Load() != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// Reset waiters count to 0.
wg.state.Store(0)
for ; w != 0; w-- {
runtime_Semrelease(&wg.sema, false, 0)
}
}

Wait():
func (wg *WaitGroup) Wait() {
if race.Enabled {
race.Disable()
}
for {
state := wg.state.Load()
v := int32(state >> 32)
w := uint32(state)
if v == 0 {
// Counter is 0, no need to wait.
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
// Increment waiters count.
if wg.state.CompareAndSwap(state, state+1) {
if race.Enabled && w == 0 {
// Wait must be synchronized with the first Add.
// Need to model this is as a write to race with the read in Add.
// As a consequence, can do the write only for the first waiter,
// otherwise concurrent Waits will race with each other.
race.Write(unsafe.Pointer(&wg.sema))
}
runtime_SemacquireWaitGroup(&wg.sema)
if wg.state.Load() != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
}
}

Done():
func (wg *WaitGroup) Done() {
wg.Add(-1)
}done方法就是调用add方法进行加-1.
小结:
1).Add方法用于增加工作协程计数.通常在启动新的工作协程之前调用.
2).Done方法用于减少工作协程计数.每次调用递减1.通常在工作协程内部且临近返回之前调用.
3).Wait方法用于增加坐等协程计数.通常在所有协程全部启动之后调用.
注:
Add方法累加的工作协程计数要和实际需要等待的工作协程数要一致.否则也会出发panic.
当工作协程计数多于实际需要等待的工作协程数量时.可能会因为无法唤醒死锁.Go运行检测到死锁就会发生panic.当工作协程计数小于实际等待的工作协程数量时.Done方法会在工作协程计数变为负时出发panic.
到此这篇关于Go进阶并发控制channel和WaitGroup的文章就介绍到这了,更多相关Go并发控制channel和WaitGroup内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!


最新评论