Go语言通过chan进行数据传递的方法详解

 更新时间:2023年06月13日 09:50:42   作者:242030  
这篇文章主要为大家详细介绍了Go语言如何通过chan进行数据传递的功能,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起了解一下

1、并发范式-管道

通道可以分为两个方向,一个是读,另一个是写,假如一个函数的输入参数和输出参数都是相同的 chan 类型,则

该函数可以调用自己,最终形成一个调用链。当然多个具有相同参数类型的函数也能组成一个调用链,这很像

UNIX系统的管道,是一个有类型的管道。下面通过具体的示例演示Go程序这种链式处理能力。

package main
import (
	"fmt"
)
// chain函数输入参数和输入参数类型相同都是chan int类型
// chain函数功能是将chan内的数据统一加1
func chain(in chan int) chan int {
	out := make(chan int)
	go func() {
		for v := range in {
			out <- 1 + v
		}
		close(out)
	}()
	return out
}
func main() {
	in := make(chan int)
	//初始化输入参数
	go func() {
		for i := 0; i < 10; i++ {
			in <- i
		}
		close(in)
	}()
	//连续调用3次chan,想当与把in中的每个元素都加3
	out := chain(chain(chain(in)))
	for v := range out {
		fmt.Println(v)
	}
}

程序输出

3
4
5
6
7
8
9
10
11
12

2、一个线程写数据一个线程读数据

一个线程往管道里写数据、另一个线程从管道里读数据示例。

package main
import (
	"fmt"
)
// writerData
func writerData(intChan chan int) {
	for i := 1; i <= 10; i++ {
		//放入数据
		intChan <- i
		fmt.Printf("writeData写到数据=%v\n", i)
	}
	// 关闭
	close(intChan)
}
// readData
func readData(intChan chan int, exitChan chan bool) {
	for {
		v, ok := <-intChan
		if !ok {
			break
		}
		fmt.Printf("readData读到数据=%v\n", v)
	}
	// readData读完数据后,即任务完成
	exitChan <- true
	close(exitChan)
}
func main() {
	// 创建两个管道
	intChan := make(chan int, 10)
	// 判断子进程是否结束
	exitChan := make(chan bool, 1)
	go writerData(intChan)
	go readData(intChan, exitChan)
	// 注意主线程退出,两个线程直接退出
	for {
		// 等待读取
		_, ok := <-exitChan
		// 没读到就退出
		if !ok {
			break
		}
	}
}

程序输出

writeData写到数据=1
writeData写到数据=2
writeData写到数据=3
writeData写到数据=4
writeData写到数据=5
writeData写到数据=6
writeData写到数据=7
writeData写到数据=8
writeData写到数据=9
writeData写到数据=10
readData读到数据=1
readData读到数据=2
readData读到数据=3
readData读到数据=4
readData读到数据=5
readData读到数据=6
readData读到数据=7
readData读到数据=8
readData读到数据=9
readData读到数据=10

3、多线程读写数据

求每个数字的累加和:

package main
import (
	"fmt"
)
// 写入2000个数据到numChan中
func inputNum(numChan chan int) {
	for i := 1; i <= 2000; i++ {
		numChan <- i
	}
	// 写完数据关闭channel
	close(numChan)
}
// 计算每个数字的累加
// 读取数据并且存入到resChan中,exitChan做协程标记
func getNum(numChan chan int, resChan chan int, exitChan chan bool) {
	for {
		res := 0
		n, ok := <-numChan
		// 值被取完
		if !ok {
			break
		}
		for i := 1; i <= n; i++ {
			res += i
		}
		// 存入到resChan
		resChan <- res
	}
	// 标记退出
	exitChan <- true
}
func main() {
	// 创建三个管道分别为读、写、退出标记
	numChan := make(chan int, 2000)
	resChan := make(chan int, 2000)
	exitChan := make(chan bool, 8)
	// 启动多线程
	go inputNum(numChan)
	for i := 1; i <= 8; i++ {
		go getNum(numChan, resChan, exitChan)
	}
	// 再启动一个线程取出exitChan
	go func() {
		for i := 0; i < 8; i++ {
			// 从exitChan管道取出即可
			<-exitChan
		}
		// 全部取出说明读取numChan数据完毕,关闭resChan
		close(resChan)
	}()
	// 读取resChan数据
	for v := range resChan {
		fmt.Println(v)
	}
}

程序输出

1
3
6
10
15
21
28
......

多线程求素数:

package main
import (
	"fmt"
	"math"
)
// 放入数据
func putNum(intChan chan int) {
	for i := 2; i < 8000; i++ {
		intChan <- i
	}
	close(intChan)
}
// 判断素数并放入到primeChan中
func primeNum(intChan chan int, primeChan chan int, exitChan chan bool) {
	for {
		// 从管道取出并判断是不是素数
		num, ok := <-intChan
		flag := true
		if !ok {
			// 取出失败
			break
		}
		for i := 2; i < int(math.Sqrt(float64(num))); i++ {
			// 不是素数,退出
			if num%i == 0 {
				flag = false
				break
			}
		}
		if flag {
			// 是素数,加入到管道中
			primeChan <- num
		}
	}
	// 标记管道退出
	exitChan <- true
}
func main() {
	intChan := make(chan int, 1000)
	// 放入结果
	primeChan := make(chan int, 1000)
	// 标记退出
	exitChan := make(chan bool, 4)
	// 开启协程,放入1-8000
	go putNum(intChan)
	// 开启四个协程,并判断是否为素数
	for i := 0; i < 4; i++ {
		go primeNum(intChan, primeChan, exitChan)
	}
	go func() {
		for i := 0; i < 4; i++ {
			// 只从管道里把内容取出来
			<-exitChan
		}
		close(primeChan)
	}()
	for v := range primeChan {
		fmt.Println("素数有", v)
	}
}

程序输出

素数有 2
素数有 3
素数有 4
素数有 5
.....

4、并发范式-生成器

本节通过具体的程序示例来演示Go语言强大的并发处理能力,每个示例代表一个并发处理范式,这些范式具有典

型的特征,在真实的程序中稍加改造就能使用。

在应用系统编程中,常见的应用场景就是调用一个统一的全局的生成器服务,用于生成全局事务号、订单号、序列

号和随机数等。Go对这种场景的支持非常简单,下面以一个随机数生成器为例来说明。

4.1 最简单的带缓冲的生成器

package main
import (
	"fmt"
	"math/rand"
)
func GenerateIntA() chan int {
	ch := make(chan int, 5)
	// 启动一个goroutine用于生成随机数,函数返回一个通道用于获取随机数
	go func() {
		for {
			ch <- rand.Int()
		}
	}()
	return ch
}
func main() {
	// 启动生成器
	ch := GenerateIntA()
	fmt.Println(<-ch)
	fmt.Println(<-ch)
}

程序输出

5577006791947779410
8674665223082153551

4.2 多个goroutine增强型生成器

package main
import (
	"fmt"
	"math/rand"
)
func GenerateIntA() chan int {
	ch := make(chan int, 5)
	// 启动一个goroutine用于生成随机数,函数返回一个通道用于获取随机数
	go func() {
		for {
			ch <- rand.Int()
		}
	}()
	return ch
}
func GenerateIntB() chan int {
	ch := make(chan int, 5)
	// 启动一个goroutine用于生成随机数,函数返回一个通道用于获取随机数
	go func() {
		for {
			ch <- rand.Int()
		}
	}()
	return ch
}
func GenerateInt() chan int {
	ch := make(chan int, 10)
	go func() {
		for {
			// 使用select的扇入技术增加生成的随机源
			select {
			case ch <- <-GenerateIntA():
			case ch <- <-GenerateIntB():
			}
		}
	}()
	return ch
}
func main() {
	// 启动生成器
	ch := GenerateInt()
	// 获取生成器资源
	for i := 0; i < 10; i++ {
		fmt.Println(<-ch)
	}
}

程序输出

5577006791947779410
2933568871211445515
7981306761429961588
3337066551442961397
2050257992909156333
837825985403119657
8267293389953062911
7660323324116104765
273669266008440571
2282476590775666788

4.3 自动退出

有时希望生成器能够自动退出,可以借助Go通道的退出通知机制(close channel to broadcast)实现。

package main
import (
	"fmt"
	"math/rand"
)
// done接收通知推出信号
func GenerateIntA(done chan struct{}) chan int {
	ch := make(chan int)
	go func() {
	Lable:
		for {
			select {
			case ch <- rand.Int():
			case <-done:
				break Lable
			}
		}
		close(ch)
	}()
	return ch
}
func main() {
	// 创建一个作为接收推出信号的chan
	done := make(chan struct{})
	// 启动生成器
	ch := GenerateIntA(done)
	fmt.Println(<-ch)
	fmt.Println(<-ch)
	// 不在需要生成器,通过close chan发送一个通知给生成器
	close(done)
	// 获取生成器资源
	for v := range ch {
		fmt.Println(v)
	}
}

程序结果

5577006791947779410
8674665223082153551

4.4 多重特性的生成器

一个融合了并发、缓冲、退出通知等多重特性的生成器。

package main
import (
	"fmt"
	"math/rand"
)
//done 接收通知推出信号
func GenerateIntA(done chan struct{}) chan int {
	ch := make(chan int, 5)
	go func() {
	Lable:
		for {
			select {
			case ch <- rand.Int():
			case <-done:
				break Lable
			}
		}
		close(ch)
	}()
	return ch
}
//done 接收通知推出信号
func GenerateIntB(done chan struct{}) chan int {
	ch := make(chan int, 10)
	go func() {
	Lable:
		for {
			select {
			case ch <- rand.Int():
			case <-done:
				break Lable
			}
		}
		close(ch)
	}()
	return ch
}
// 通过select做了扇入(Fan In)
func GenerateInt(done chan struct{}) chan int {
	ch := make(chan int)
	send := make(chan struct{})
	go func() {
	Lable:
		for {
			select {
			case ch <- <-GenerateIntA(send):
			case ch <- <-GenerateIntB(send):
			case <-done:
				send <- struct{}{}
				send <- struct{}{}
				break Lable
			}
		}
		close(ch)
	}()
	return ch
}
func main() {
	//创建一个作为接收推出信号的chan
	done := make(chan struct{})
	//启动生成器
	ch := GenerateInt(done)
	//获取生成器资源
	for i := 0; i < 10; i++ {
		fmt.Println(<-ch)
	}
	//通知生产者停止生产
	done <- struct{}{}
	fmt.Println("stop gernarate")
}

程序输出

5577006791947779410
2015796113853353331
4893789450120281907
1687184559264975024
9093919513921919021
2202916659517317514
26222426471854123
8999011805617471788
4534277910591376951
6607332037155172840
stop gernarate

5、固定worker工作池

服务器编程中使用最多的就是通过线程池来提升服务的并发处理能力。在Go语言编程中,一样可以轻松地构建固

定数目的 goroutines 作为工作线程池。下面还是以计算多个整数的和为例来说明这种并发范式。

程序中除了主要的 main goroutine ,还开启了如下几类 goroutine:

(1)、初始化任务的 goroutine。

(2)、分发任务的 goroutine。

(3)、等待所有 worker 结束通知,然后关闭结果通道的 goroutine。

main 函数负责拉起上述 goroutine,并从结果通道获取最终的结果。

程序采用三个通道,分别是:

(1)、传递 task 任务的通道。

(2)、传递 task 结果的通道。

(3)、接收 worker 处理完任务后所发送通知的通道。

package main
import (
	"fmt"
)
// 工作池的goroutine数目
const (
	NUMBER = 10
)
// 工作任务
type task struct {
	begin  int
	end    int
	result chan<- int
}
// 初始化待处理task chan
func InitTask(taskChan chan<- task, r chan int, p int) {
	qu := p / 10
	mod := p % 10
	high := qu * 10
	for j := 0; j < qu; j++ {
		b := 10*j + 1
		e := 10 * (j + 1)
		// 1-10
		// 11-20
		// ......
		tsk := task{
			begin:  b,
			end:    e,
			result: r,
		}
		taskChan <- tsk
	}
	if mod != 0 {
		tsk := task{
			begin:  high + 1,
			end:    p,
			result: r,
		}
		taskChan <- tsk
	}
	close(taskChan)
}
//读取task chan分发到worker goroutine处理,workers的总的数量是workers
func DistributeTask(taskChan <-chan task, workers int, done chan struct{}) {
	for i := 0; i < workers; i++ {
		go ProcessTask(taskChan, done)
	}
}
//工作goroutine处理具体工作,并将处理结构发送到结果chan
func ProcessTask(taskChan <-chan task, done chan struct{}) {
	for t := range taskChan {
		t.do()
	}
	done <- struct{}{}
}
// 任务处理:计算begin到end的和
// 执行结果写入到结果chan result中
func (t *task) do() {
	sum := 0
	for i := t.begin; i <= t.end; i++ {
		sum += i
	}
	t.result <- sum
}
// 通过done channel来同步等待所有工作goroutine的结束,然后关闭结果chan
func CloseResult(done chan struct{}, resultChan chan int, workers int) {
	for i := 0; i < workers; i++ {
		<-done
	}
	close(done)
	close(resultChan)
}
// 读取结果通道,汇总结果
func ProcessResult(resultChan chan int) int {
	sum := 0
	for r := range resultChan {
		sum += r
	}
	return sum
}
func main() {
	// 多线程数目
	workers := NUMBER
	// 工作通道
	taskChan := make(chan task, 10)
	// 结果通道
	resultChan := make(chan int, 10)
	// worker信号通道
	done := make(chan struct{}, 10)
	// 初始化task的goroutine,计算1000个自然数之和
	go InitTask(taskChan, resultChan, 1000)
	//分发任务在NUMBER个goroutine池
	DistributeTask(taskChan, workers, done)
	//获取各个goroutine处理完任务的通知,并关闭结果通道
	go CloseResult(done, resultChan, workers)
	//通过结果通道处理结果
	sum := ProcessResult(resultChan)
	fmt.Println("sum=", sum)
}

程序输出

sum= 500500

程序的逻辑分析:

(1)、构建 task 并发送到 task 通道中。

(2)、分别启动 n 个工作线程,不停地从 task 通道中获取任务,然后将结果写入结果通道。如果任务通道被关闭,

则负责向收敛结果的 goroutine 发送通知,告诉其当前 worker 已经完成工作。

(3)、收敛结果的 goroutine 接收到所有 task 已经处理完毕的信号后,主动关闭结果通道。

(4)、 main 中的函数 ProcessResult 读取并统计所有的结果。

6、使用 WaitGroup实现固工作池

package main
import (
	"fmt"
	"sync"
)
// 工作任务
type task struct {
	begin  int
	end    int
	result chan<- int
}
// 构建task并写入task通道
func InitTask(taskChan chan<- task, r chan int, p int) {
	qu := p / 10
	mod := p % 10
	high := qu * 10
	for j := 0; j < qu; j++ {
		b := 10*j + 1
		e := 10 * (j + 1)
		tsk := task{
			begin:  b,
			end:    e,
			result: r,
		}
		taskChan <- tsk
	}
	if mod != 0 {
		tsk := task{
			begin:  high + 1,
			end:    p,
			result: r,
		}
		taskChan <- tsk
	}
	close(taskChan)
}
//读取task chan,每个task启动一个worker goroutine进行处理,并等待每个task运行完,关闭结果通道
func DistributeTask(taskChan <-chan task, wait *sync.WaitGroup, result chan int) {
	for v := range taskChan {
		wait.Add(1)
		go ProcessTask(v, wait)
	}
	wait.Wait()
	close(result)
}
// goroutine处理具体工作,并将结果发送到结果通道
func ProcessTask(t task, wait *sync.WaitGroup) {
	t.do()
	wait.Done()
}
//任务执行: 计算begin到end的和,执行结果写入结果chan result
func (t *task) do() {
	sum := 0
	for i := t.begin; i <= t.end; i++ {
		sum += i
	}
	t.result <- sum
}
//读取结果通道,汇总结果
func ProcessResult(resultChan chan int) int {
	sum := 0
	for r := range resultChan {
		sum += r
	}
	return sum
}
func main() {
	// 创建任务通道
	taskChan := make(chan task, 10)
	// 创建结果通道
	resultChan := make(chan int, 10)
	// wait用于同步等待任务的执行
	wait := &sync.WaitGroup{}
	// 初始化task的goroutine,计算100个自然数之和
	go InitTask(taskChan, resultChan, 100)
	// 每个task启动一个goroutine进行处理
	go DistributeTask(taskChan, wait, resultChan)
	//通过结果通道获取结果并汇总
	sum := ProcessResult(resultChan)
	fmt.Println("sum=", sum)
}

程序输出

sum= 5050

程序的逻辑分析:

(1)、InitTask 函数构建 task 并发送到 task 通道中。

(2)、分发任务函数 DistributeTask 为每个 task 启动一个 goroutine 处理任务,等待其处理完成,然后关闭结果通道。

(3)、ProcessResult 函数读取并统计所有的结果。

这几个函数分别在不同的 goroutine 中运行,它们通过通道和 sync.WaitGroup 进行通信和同步。

以上就是Go语言通过chan进行数据传递的方法详解的详细内容,更多关于Go chan数据传递的资料请关注脚本之家其它相关文章!

相关文章

  • Golang 中的json.Marshal问题总结(推荐)

    Golang 中的json.Marshal问题总结(推荐)

    这篇文章主要介绍了Golang中的json.Marshal问题总结,本文通过一个例子给大家详细讲解,本次提出的问题中,我们不难注意到其中的time.Time是一个匿名(Anonymous)字段,而这个就是答案的由来,需要的朋友可以参考下
    2022-06-06
  • golang package time的用法具体详解

    golang package time的用法具体详解

    本篇文章主要介绍了golang package time的用法具体详解,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-05-05
  • Golang干货分享之利用AST实现AOP功能

    Golang干货分享之利用AST实现AOP功能

    本文主要是一个纯干货分享,主要介绍了Golang如何利用AST实现AOP功能,文中的示例代码简洁易懂,感兴趣的小伙伴可以跟随小编一起学习一下
    2023-04-04
  • 基于Go编写一个可视化Navicat本地密码解析器

    基于Go编写一个可视化Navicat本地密码解析器

    这篇文章主要给大家介绍了基于Go编写一个可视化Navicat本地密码解析器的方法,文中有详细的代码示例和图文介绍,有需要的朋友可以参考阅读本文
    2023-08-08
  • Go语言入门学习之Channel通道详解

    Go语言入门学习之Channel通道详解

    go routine可以使用channel来进行通信,使用通信的手段来共享内存,下面这篇文章主要给大家介绍了关于Go语言入门学习之Channel通道的相关资料,文中通过示例代码介绍的非常详细,需要的朋友可以参考下
    2022-07-07
  • golang gorm 操作mysql及gorm基本用法

    golang gorm 操作mysql及gorm基本用法

    golang 官方的那个操作mysql的有点麻烦所以就使用了gorm,下面就gorm的使用做下简单介绍,感兴趣的朋友跟随小编一起看看吧
    2018-11-11
  • Golang 变量申明的三种方式

    Golang 变量申明的三种方式

    这篇文章主要介绍了Golang 变量申明的三种方式,帮助大家更好的理解和学习golang,感兴趣的朋友可以了解下
    2020-08-08
  • Go语言字符串拼接方式与性能比较分析

    Go语言字符串拼接方式与性能比较分析

    这篇文章主要为大家介绍了Go语言字符串拼接方式与性能比较示例分析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-12-12
  • go中如何获取本机ip地址

    go中如何获取本机ip地址

    这篇文章主要介绍了go中如何获取本机ip地址问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2023-09-09
  • Go语言中的错误处理最佳实践详解

    Go语言中的错误处理最佳实践详解

    这篇文章主要为大家详细介绍了Go语言中的错误处理的相关知识,文中的示例代码讲解详细,对我们深入了解Go语言有一定的帮助,需要的可以参考下
    2023-08-08

最新评论