使用golang实现一个MapReduce的示例代码

 更新时间:2023年09月21日 10:57:26   作者:写代码的lorre  
这篇文章主要给大家介绍了关于如何使用golang实现一个MapReduce,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

背景

在日常业务开发中,我们经常遇到需要并发处理的场景。例如:

  • 依据id列表查询db,获取数据。为了保证查询性能,单次查询的id列表长度最好不要超过50(依据业务来判断),当id列表长度超过50时,拆分成并发请求,减少耗时和提高性能,返回聚合后的结果
  • 外部提供的接口不支持批量写入/读取数据,当需要批量处理数据时,为了减少耗时和提高性能,并发请求外部接口

以上处理数据的场景,都可以分成两个阶段:

  • 请求阶段。基本都是IO操作,请求db,或者是调用外部接口
  • 处理阶段。对返回的数据进行转换,过滤,聚合等操作

同步调用,调用耗时增长明显

并发调用,可以减少调用耗时

分析

上面说的处理数据的场景,都可以分成两个阶段:

  • 请求阶段。IO操作,可以并发的去进行,互不干扰
  • 处理阶段。同步进行,保证聚合结果的正确性

这种是一种特殊的MapReduce

为了处理这类场景,我们需要明确以下几个部分:

  • 列表长度。代表有多少数据需要进行处理
  • map函数。并发处理的函数,互不干扰
  • reduce函数。同步处理的函数
  • 最大并发数。决定需要开多少线程/协程来处理
  • 拆分长度。列表长度 / 拆分长度 = 子任务数

由于我在日常开发中常使用golang语言,下面梳理下使用golang来解决这类问题的一个思路

函数签名

func ChunkProcess(length int, procedure func(start, end int) (interface{}, error),
   reduce func(partialResult interface{}, partialErr error, start, end int), maxConcurrent int, chunkSize int) 

核心逻辑:

  • 当最大并发数 <= 1 或者子任务数(列表长度 / 拆分长度) <= 1时,同步执行map函数和reduce函数即可

  • 其余情况,并发处理map函数,同步执行reduce函数

    • 获取并发处理的子任务数量:lengthTask := int(math.Ceil(float64(length) / float64(chunkSize)))
    • 通过sync.Mutex保证reduce同步执行
    • 通过sync.WaitGroup保证等待子任务全部执行完成
    • 通过chan控制最大并发数

代码实现

package test
import (
   "math"
   "sync"
)
func ChunkProcess(length int, procedure func(start, end int) (interface{}, error),
   reduce func(partialResult interface{}, partialErr error, start, end int), maxConcurrent int, chunkSize int) {
   if length < 1 {
      return
   }
   if maxConcurrent <= 1 || length <= chunkSize {
      doChunkProcessSerially(length, procedure, reduce, chunkSize)
   } else {
      doChunkProcessConcurrently(length, procedure, reduce, maxConcurrent, chunkSize)
   }
}
// 同步处理
func doChunkProcessSerially(length int, procedure func(start, end int) (interface{}, error),
   reduce func(partialResult interface{}, partialErr error, start, end int), chunkSize int) {
   // 拆分的子任务数
   chunkNums := int(math.Ceil(float64(length) / float64(chunkSize)))
   for i := 0; i < chunkNums; i++ {
      func(chunkIndex int) {
         defer func() {
            if err := recover(); err != nil {
               // 自定义错误处理
            }
         }()
         start := chunkIndex * chunkSize
         end := start + chunkSize
         if end > length {
            end = length
         }
         // 执行map
         response, err := procedure(start, end)
         // 执行reduce
         if reduce != nil {
            reduce(response, err, start, end)
         }
      }(i)
   }
}
// 并发处理
func doChunkProcessConcurrently(length int, procedure func(start, end int) (interface{}, error),
   reduce func(partialResult interface{}, partialErr error, start, end int), maxConcurrent int, chunkSize int) {
   index := 0
   chunkIndex := 0
   // 拆分的子任务数
   lengthTask := int(math.Ceil(float64(length) / float64(chunkSize)))
   // 保证reduce同步执行
   var lock sync.Mutex
   // 保证子任务全部执行完成
   var wg sync.WaitGroup
   wg.Add(lengthTask)
   // 控制并发数
   throttleChan := make(chan struct{}, maxConcurrent)
   for {
      start := index
      end := index + chunkSize
      if end > length {
         end = length
      }
      throttleChan <- struct{}{}
      go func(chunkIndex int) {
         defer func() {
            <-throttleChan
            if err := recover(); err != nil {
               // 自定义错误处理
            }
            wg.Done()
         }()
         // 执行map
         response, err := procedure(start, end)
         // 执行reduce
         if reduce != nil {
            lock.Lock()
            defer lock.Unlock()
            reduce(response, err, start, end)
         }
      }(chunkIndex)
      chunkIndex++
      index = index + chunkSize
      if index >= length {
         break
      }
   }
   wg.Wait()
   close(throttleChan)
}

测试:

func TestChunkProcess(t *testing.T) {
   trackIDs := []int64{123, 456, 789}
   results := make([]int64, 0)
   ChunkProcess(len(trackIDs), func(start, end int) (interface{}, error) {
      result := trackIDs[start] + 100
      return result, nil
   }, func(partialResult interface{}, partialErr error, start, end int) {
      results = append(results, partialResult.(int64))
   }, 2, 1)
   fmt.Println(results)
}

总结

多对业务场景进行抽象分析,为这一类场景提供解决方案

以上就是使用golang实现一个MapReduce的详细内容,更多关于golang实现MapReduce的资料请关注脚本之家其它相关文章!

相关文章

  • 初学Go必备的vscode插件及最常用快捷键和代码自动补全

    初学Go必备的vscode插件及最常用快捷键和代码自动补全

    这篇文章主要给大家介绍了关于初学vscode写Go必备的vscode插件及最常用快捷键和代码自动补全的相关资料,由于vscode是开源免费的,而且开发支持vscode的插件相对比较容易,更新速度也很快,需要的朋友可以参考下
    2023-07-07
  • Go语言执行系统命令行命令的方法

    Go语言执行系统命令行命令的方法

    这篇文章主要介绍了Go语言执行系统命令行命令的方法,实例分析了Go语言操作系统命令行的技巧,具有一定参考借鉴价值,需要的朋友可以参考下
    2015-02-02
  • Go使用fmt包输出与格式化核心库的完整指南

    Go使用fmt包输出与格式化核心库的完整指南

    在 Go 语言中,fmt 是最基础也是使用频率最高的标准库之一,几乎每一个 Go 程序都会用到它,下面小编就和大家详细介绍一下fmt包的具体使用吧
    2026-03-03
  • 详解为什么说Golang中的字符串类型不能修改

    详解为什么说Golang中的字符串类型不能修改

    在接触Go这么语言,可能你经常会听到这样一句话。对于字符串不能修改,可能你很纳闷,日常开发中我们对字符串进行修改也是很正常的,为什么又说Go中的字符串不能进行修改呢?本文就来通过实际案例给大家演示一下
    2023-03-03
  • 一文带你了解Golang中的泛型

    一文带你了解Golang中的泛型

    泛型是一种可以编写独立于使用的特定类型的代码的方法,可以通过编写函数或类型来使用一组类型中的任何一个,下面就来和大家聊聊Golang中泛型的使用吧
    2023-07-07
  • 使用Go实现健壮的内存型缓存的方法

    使用Go实现健壮的内存型缓存的方法

    这篇文章主要介绍了使用Go实现健壮的内存型缓存,本文比较了字节缓存和结构体缓存的优劣势,介绍了缓存穿透、缓存错误、缓存预热、缓存传输、故障转移、缓存淘汰等问题,并对一些常见的缓存库进行了基准测试,需要的朋友可以参考下
    2022-05-05
  • Golang中Slice 底层机制的实现

    Golang中Slice 底层机制的实现

    本文主要介绍了Golang中Slice 底层机制的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2026-03-03
  • 详解Golang 中的并发限制与超时控制

    详解Golang 中的并发限制与超时控制

    这篇文章主要介绍了详解Golang 中的并发限制与超时控制,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-02-02
  • Go-Gin Web框架的实现示例

    Go-Gin Web框架的实现示例

    本文主要介绍了Go-Gin Web框架的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2025-11-11
  • GoLand如何设置中文

    GoLand如何设置中文

    这篇文章主要介绍了GoLand如何设置中文,本文通过图文并茂的形式给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-12-12

最新评论