Go通用的 MapReduce 工具函数详解

 更新时间:2024年09月12日 10:20:23   作者:fishjam  
本文介绍了使用Go语言实现的MapReduce框架,特别是在AWSS3 SDK的MultiPartUpload功能中的应用,包括并发上传和错误处理策略,详细解释了如何通过并发goroutines提高上传效率,并通过MapReduce模型优化代码结构和处理流程,感兴趣的朋友跟随小编一起看看吧

前言

最近在测试学习 aws s3 sdk 中的 Multi Part Upload 功能,其基本步骤就是 CreateMultipartUpload 后, 串行或并行地 UploadPart ,最后 CompleteMultipartUploadAbortMultipartUpload 收尾。为了最高效率地完成整个传输,中间的 UploadPart 部分使用多个 goroutine 并发地上传是最快地。因此尝试着写了一下,并完美地实现。

扩展

虽然已经完成对应功能的开发和测试,但仔细分析一下,发现有大量的模式代码,比如:

  • 创建指定个数的 goroutine, 并使用 sync.WaitGroup 管理和同步.
  • 使用 chan 提供待处理数据,并接受处理结果
  • 看起来整个处理流程就是典型的 map-reduce 结构 或者 说是 Java Stream/ParallelStream 中的 Map, Reduce.

网上搜索一下, 发现很多人也有这个需求,也写了不少库,但实测了一下,发现根本不好用。于是决定自己再造一个“自己觉得比较好的”轮子,因此有了 mapreduce 和本篇文章。

主要功能函数

  • func Map[T any, R any](ctx context.Context, inputs []T, mapper MapperFunc[T, R]) ResultsMap[T, R]
  • 这是最简单的同步 Map, 通过泛型的 T 和 R 支持任意类型的数据转换
  • func ParallelMap[T any, R any](ctx context.Context, inputs []T, concurrency int, name string, mapper MapperFunc[T, R]) ResultsMap[T, R]
  • 这是并发的Map,内部回启动最多 2+concurrency 个 goroutine, 并发的处理完 inputs 中的所有数据. 并且结果可以按照输入的顺序排序。
  • func StreamMap[T any, R any](ctx context.Context, concurrency int, name string, startIndex int64, chInput chan T, mapper MapperFunc[T, R]) chan *OutputItem[T, R]

类似纤程池的形态, 可以无限地处理 chInput 中的数据,并将结果写入 OutputItem.

额外说明

错误处理

作为一个并发处理框架,对于错误情况也应该能很好的支持,有的时候, 一项元素处理失败了不影响整体的流程处理, 但有的时候其中一项失败, 就不需要继续进行(比如 S3 的 multi part upload, 如果其中一部分失败, 那其他的部分再上传也没有意义了)。因此代码中定义了 OperationType 类型, 其值分为
ContinueStop , 框架只根据这个值确认是否继续处理, 而不是根据 mapper 函数是否返回 error.

结果返回

并发处理时, 每个 Item 的处理时长/顺序等是不同的,而且有可能因为错误造成部分输入元素尚未处理即结束,因此返回的结果默认情况下不一定能和输入顺序一一对应,因此采用了 Map 的方式保存输入序号 => 结果。

排序

s3 的 multi part upload 在调用 CompleteMultipartUpload 时参数 Parts 需要是排好序的,因此通过 ConvertResult 函数对结果进行排序。

测试代码

注意: 并发处理带错误数据的时候, 由于错误项的处理顺序比较随机, 因此我使用了 concurrency: 1 的方式保证 UT 能顺利判断。如果将 concurrency 更改为大于1的情况, 其 want 不一定能满足. 比如: “error with stop” 时, 如果 concurrency > 1, 结果有可能就不是 [1 2 3 0] 而是 [1 2 3 0 4 5] 了, 这种属于正常现象(自己更改测试一下即可理解 )

func TestMap(t *testing.T) {
	type args struct {
		ctx         context.Context
		inputs      []string
		concurrency int
		mapper      MapperFunc[string, int]
	}
	tests := []struct {
		name     string
		args     args
		want     []int
		wantErrs []error
		opType   OperationType
	}{
		{
			name: "all successful",
			args: args{ctx: context.Background(),
				inputs: []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"}, concurrency: runtime.NumCPU(), mapper: convertStopFunc,
			},
			want:     []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
			wantErrs: []error{nil, nil, nil, nil, nil, nil, nil, nil, nil, nil},
			opType:   Continue,
		},
		{
			name:     "error with continue",
			args:     args{ctx: context.Background(), inputs: []string{"1", "not", "3"}, concurrency: 1, mapper: convertContinueFunc},
			want:     []int{1, 0, 3},
			wantErrs: []error{nil, numberErrHelper("not"), nil},
			opType:   Continue, // 出现过错误,但忽略了. 如果采用 Continue 的方式来处理错误, 则只能自己遍历 ResultsMap 的结果集才知道是否有错误
		},
		{
			// 注意: 如果并发度 concurrency > 1, 则结果个数不确定, 但肯定至少有一个错误的
			name: "error with stop",
			args: args{ctx: context.Background(), inputs: []string{"1", "2", "3", "not_exist", "4", "5", "6"},
				concurrency: 1, mapper: convertStopFunc},
			want:     []int{1, 2, 3, 0},
			wantErrs: []error{nil, nil, nil, numberErrHelper("not_exist")},
			opType:   Stop,
		},
		{
			name: "error last", // 最后一个数据出错时,其返回的结果数组长度和输入数组的长度相同. 因此不能依靠数组长度来判断是否有问题.
			args: args{ctx: context.Background(), inputs: []string{"1", "2", "3", "not_exist"},
				concurrency: 1, mapper: convertStopFunc},
			want:     []int{1, 2, 3, 0},
			wantErrs: []error{nil, nil, nil, numberErrHelper("not_exist")},
			opType:   Stop,
		},
	}
	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			if true {
				//使用 Map 串行转换
				got := Map(tt.args.ctx, tt.args.inputs, tt.args.mapper)
				//flog.Infof("Map name=%s, got=%+v", tt.name, got)
				realResult, errs, opType := got.ConvertResult()
				du.GoAssertEqual(t, tt.want, realResult, "want")
				du.GoAssertEqual(t, tt.wantErrs, errs, "wantErrs")
				du.GoAssertEqual(t, tt.opType, opType, "opType")
			}
			if true {
				//使用 ParallelMap 并行转换
				got := ParallelMap(tt.args.ctx, tt.args.inputs, tt.args.concurrency, tt.name, tt.args.mapper)
				//flog.Infof("ParallelMap name=%s, got=%+v", tt.name, got)
				realResult, errs, opType := got.ConvertResult()
				du.GoAssertEqual(t, tt.want, realResult, "want")
				du.GoAssertEqual(t, tt.wantErrs, errs, "wantErrs")
				du.GoAssertEqual(t, tt.opType, opType, "opType")
			}
		})
	}
}
func TestStreamMap(t *testing.T) {
	ctx := context.Background()
	inItemCount := 10000
	chInput := make(chan string)
	go func() {
		for i := 0; i < inItemCount; i++ {
			idx := rand.Intn(100)
			chInput <- fmt.Sprintf("%d", idx)
		}
		close(chInput)
	}()
	//启动 100 个 纤程并行处理 inItemCount(10000) 个数据的转换
	chOutput := StreamMap(ctx, 100, "testStreamMap", 100, chInput, convertContinueFunc)
	mapResultCount := 0
	for outItem := range chOutput {
		mapResultCount++
		flog.Debugf("outItem=%v", outItem)
	}
	du.GoAssertEqual(t, inItemCount, mapResultCount, "inItemCount")
}

##补充信息

  • 因为众所周知的原因, 以后 go-library 的代码将只更新 https://gitee.com/fishjam/go-library, 不再更新 github 上的版本.
  • S3 的 multi upload 不需要大家自己写, manager.NewUploader 已经提供了完整的实现, 比大多数人实现得更好。

到此这篇关于Go通用的 MapReduce 工具函数的文章就介绍到这了,更多相关Go MapReduce 工具函数内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 深入理解Go中defer的机制

    深入理解Go中defer的机制

    本文主要介绍了Go中defer的机制,包括执行顺序、参数预计算、闭包和与返回值的交互,具有一定的参考价值,感兴趣的可以了解一下
    2025-02-02
  • Go 文件读取和写入操作全面讲解

    Go 文件读取和写入操作全面讲解

    这篇文章主要为大家介绍了Go文件的读取和写入操作示例的全面详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-11-11
  • 一文带你深入理解Golang中的泛型

    一文带你深入理解Golang中的泛型

    Go 在泛型方面一直被诟病,因为它在这方面相对比较落后。但是,在 Go 1.18 版本中,泛型已经被正式引入,成为了 Go 语言中一个重要的特性。本文将会详细介绍 Go 泛型的相关概念,语法和用法,希望能够帮助大家更好地理解和应用这一特性
    2023-05-05
  • Go语言使用sqlx操作数据库的示例详解

    Go语言使用sqlx操作数据库的示例详解

    sqlx 是 Go 语言中一个流行的第三方包,它提供了对 Go 标准库 database/sql 的扩展,本文重点讲解 sqlx 在 database/sql 基础上扩展的功能,希望对大家有所帮助
    2023-06-06
  • 一篇文章带你轻松搞懂Golang的error处理

    一篇文章带你轻松搞懂Golang的error处理

    在进行后台开发的时候,错误处理是每个程序员都会遇到的问题,下面这篇文章主要给大家介绍了关于Golang中error处理的相关资料,文中通过实例代码介绍的非常详细,需要的朋友可以参考下
    2022-07-07
  • 浅析如何利用Go的plugin机制实现热更新

    浅析如何利用Go的plugin机制实现热更新

    热更新,或称热重载或动态更新,是一种软件更新技术,允许程序在运行时,不停机更新代码或资源,本文主要来讨论下GO语言是否可以利用plugin机制实现热更新,感兴趣的可以了解下
    2024-04-04
  • golang并发编程中Goroutine 协程的实现

    golang并发编程中Goroutine 协程的实现

    Go语言中的协程是一种轻量级线程,通过在函数前加go关键字来并发执行,具有动态栈、快速启动和低内存使用等特点,本文就来详细的介绍一下,感兴趣的可以了解一下
    2024-10-10
  • Go语言中websocket的使用demo分享

    Go语言中websocket的使用demo分享

    WebSocket是一种在单个TCP连接上进行全双工通信的协议。这篇文章主要和大家分享了一个Go语言中websocket的使用demo,需要的可以参考一下
    2022-12-12
  • Golang迭代如何在Go中循环数据结构使用详解

    Golang迭代如何在Go中循环数据结构使用详解

    这篇文章主要为大家介绍了Golang迭代之如何在Go中循环数据结构使用详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-10-10
  • Go语言题解LeetCode35搜索插入位置示例详解

    Go语言题解LeetCode35搜索插入位置示例详解

    这篇文章主要为大家介绍了Go语言题解LeetCode35搜索插入位置示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-12-12

最新评论