go-zero数据的流处理利器fx使用详解

 更新时间:2023年05月29日 14:02:44   作者:Keson  
这篇文章主要为大家介绍了go-zero数据的流处理利器fx使用详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

数据的流处理利器

go-zero微服务库地址https://github.com/tal-tech/go-zero

流处理(Stream processing)是一种计算机编程范式,其允许给定一个数据序列(流处理数据源),一系列数据操作(函数)被应用到流中的每个元素。同时流处理工具可以显著提高程序员的开发效率,允许他们编写有效、干净和简洁的代码。

流数据处理在我们的日常工作中非常常见,举个例子,我们在业务开发中往往会记录许多业务日志,这些日志一般是先发送到Kafka,然后再由Job消费Kafaka写到elasticsearch,在进行日志流处理的过程中,往往还会对日志做一些处理,比如过滤无效的日志,做一些计算以及重新组合日志等等,示意图如下:

流处理工具fx

gozero是一个功能完备的微服务框架,框架中内置了很多非常实用的工具,其中就包含流数据处理工具fx,下面我们通过一个简单的例子来认识下该工具:

package main
import (
    "fmt"
    "os"
    "os/signal"
    "syscall"
    "time"
    "github.com/tal-tech/go-zero/core/fx"
)
func main() {
    ch := make(chan int)
    go inputStream(ch)
    go outputStream(ch)
    c := make(chan os.Signal, 1)
    signal.Notify(c, syscall.SIGTERM, syscall.SIGINT)
    <-c
}
func inputStream(ch chan int) {
    count := 0
    for {
        ch <- count
        time.Sleep(time.Millisecond * 500)
        count++
    }
}
func outputStream(ch chan int) {
    fx.From(func(source chan<- interface{}) {
        for c := range ch {
            source <- c
        }
    }).Walk(func(item interface{}, pipe chan<- interface{}) {
        count := item.(int)
        pipe <- count
    }).Filter(func(item interface{}) bool {
        itemInt := item.(int)
        if itemInt%2 == 0 {
            return true
        }
        return false
    }).ForEach(func(item interface{}) {
        fmt.Println(item)
    })
}

inputStream函数模拟了流数据的产生,outputStream函数模拟了流数据的处理过程,其中From函数为流的输入,Walk函数并发的作用在每一个item上,Filter函数对item进行过滤为true保留为false不保留,ForEach函数遍历输出每一个item元素。

流数据处理中间操作

一个流的数据处理可能存在许多的中间操作,每个中间操作都可以作用在流上。就像流水线上的工人一样,每个工人操作完零件后都会返回处理完成的新零件,同理流处理中间操作完成后也会返回一个新的流。

fx的流处理中间操作:

操作函数功能输入
Distinct去除重复的itemKeyFunc,返回需要去重的key
Filter过滤不满足条件的itemFilterFunc,Option控制并发量
Group对item进行分组KeyFunc,以key进行分组
Head取出前n个item,返回新streamint64保留数量
Map对象转换MapFunc,Option控制并发量
Merge合并item到slice并生成新stream
Reverse反转item
Sort对item进行排序LessFunc实现排序算法
Tail与Head功能类似,取出后n个item组成新streamint64保留数量
Walk作用在每个item上WalkFunc,Option控制并发量

下图展示了每个步骤和每个步骤的结果:

用法与原理分析

From

通过From函数构建流并返回Stream,流数据通过channel进行存储:

// 例子
s := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}
fx.From(func(source chan<- interface{}) {
  for _, v := range s {
    source <- v
  }
})
// 源码
func From(generate GenerateFunc) Stream {
    source := make(chan interface{})
    go func() {
        defer close(source)
    // 构造流数据写入channel
        generate(source)
    }()
    return Range(source)
}

Filter

Filter函数提供过滤item的功能,FilterFunc定义过滤逻辑true保留item,false则不保留:

// 例子 保留偶数
s := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}
fx.From(func(source chan<- interface{}) {
  for _, v := range s {
    source <- v
  }
}).Filter(func(item interface{}) bool {
  if item.(int)%2 == 0 {
    return true
  }
  return false
})
// 源码
func (p Stream) Filter(fn FilterFunc, opts ...Option) Stream {
    return p.Walk(func(item interface{}, pipe chan<- interface{}) {
    // 执行过滤函数true保留,false丢弃
        if fn(item) {
            pipe <- item
        }
    }, opts...)
}

Group

Group对流数据进行分组,需定义分组的key,数据分组后以slice存入channel:

// 例子 按照首字符"g"或者"p"分组,没有则分到另一组
    ss := []string{"golang", "google", "php", "python", "java", "c++"}
    fx.From(func(source chan<- interface{}) {
        for _, s := range ss {
            source <- s
        }
    }).Group(func(item interface{}) interface{} {
        if strings.HasPrefix(item.(string), "g") {
            return "g"
        } else if strings.HasPrefix(item.(string), "p") {
            return "p"
        }
        return ""
    }).ForEach(func(item interface{}) {
        fmt.Println(item)
    })
}
// 源码
func (p Stream) Group(fn KeyFunc) Stream {
  // 定义分组存储map
    groups := make(map[interface{}][]interface{})
    for item := range p.source {
    // 用户自定义分组key
        key := fn(item)
    // key相同分到一组
        groups[key] = append(groups[key], item)
    }
    source := make(chan interface{})
    go func() {
        for _, group := range groups {
      // 相同key的一组数据写入到channel
            source <- group
        }
        close(source)
    }()
    return Range(source)
}

Reverse

reverse可以对流中元素进行反转处理:

// 例子
fx.Just(1, 2, 3, 4, 5).Reverse().ForEach(func(item interface{}) {
  fmt.Println(item)
})
// 源码
func (p Stream) Reverse() Stream {
    var items []interface{}
  // 获取流中数据
    for item := range p.source {
        items = append(items, item)
    }
    // 反转算法
    for i := len(items)/2 - 1; i >= 0; i-- {
        opp := len(items) - 1 - i
        items[i], items[opp] = items[opp], items[i]
    }
  // 写入流
    return Just(items...)
}

Distinct

distinct对流中元素进行去重,去重在业务开发中比较常用,经常需要对用户id等做去重操作:

// 例子
fx.Just(1, 2, 2, 2, 3, 3, 4, 5, 6).Distinct(func(item interface{}) interface{} {
  return item
}).ForEach(func(item interface{}) {
  fmt.Println(item)
})
// 结果为 1,2,3,4,5,6
// 源码
func (p Stream) Distinct(fn KeyFunc) Stream {
    source := make(chan interface{})
    threading.GoSafe(func() {
        defer close(source)
        // 通过key进行去重,相同key只保留一个
        keys := make(map[interface{}]lang.PlaceholderType)
        for item := range p.source {
            key := fn(item)
      // key存在则不保留
            if _, ok := keys[key]; !ok {
                source <- item
                keys[key] = lang.Placeholder
            }
        }
    })
    return Range(source)
}

Walk

Walk函数并发的作用在流中每一个item上,可以通过WithWorkers设置并发数,默认并发数为16,最小并发数为1,如设置unlimitedWorkers为true则并发数无限制,但并发写入流中的数据由defaultWorkers限制,WalkFunc中用户可以自定义后续写入流中的元素,可以不写入也可以写入多个元素:

// 例子
fx.Just("aaa", "bbb", "ccc").Walk(func(item interface{}, pipe chan<- interface{}) {
  newItem := strings.ToUpper(item.(string))
  pipe <- newItem
}).ForEach(func(item interface{}) {
  fmt.Println(item)
})
// 源码
func (p Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream {
    pipe := make(chan interface{}, option.workers)
    go func() {
        var wg sync.WaitGroup
        pool := make(chan lang.PlaceholderType, option.workers)
        for {
      // 控制并发数量
            pool <- lang.Placeholder
            item, ok := <-p.source
            if !ok {
                <-pool
                break
            }
            wg.Add(1)
            go func() {
                defer func() {
                    wg.Done()
                    <-pool
                }()
                // 作用在每个元素上
                fn(item, pipe)
            }()
        }
    // 等待处理完成
        wg.Wait()
        close(pipe)
    }()
    return Range(pipe)
}

并发处理

fx工具除了进行流数据处理以外还提供了函数并发功能,在微服务中实现某个功能往往需要依赖多个服务,并发的处理依赖可以有效的降低依赖耗时,提升服务的性能。

fx.Parallel(func() {
  userRPC() // 依赖1
}, func() {
  accountRPC() // 依赖2
}, func() {
  orderRPC() // 依赖3
})

注意fx.Parallel进行依赖并行处理的时候不会有error返回,如需有error返回或者有一个依赖报错需要立马结束依赖请求请使用MapReduce工具进行处理。

总结

本篇文章介绍了流处理的基本概念和gozero中的流处理工具fx,在实际的生产中流处理场景应用也非常多,希望本篇文章能给大家带来一定的启发,更好的应对工作中的流处理场景。

以上就是go-zero数据的流处理利器fx使用详解的详细内容,更多关于go-zero数据流处理fx的资料请关注脚本之家其它相关文章!

相关文章

  • Golang 内存模型详解(一)

    Golang 内存模型详解(一)

    这篇文章主要介绍了Golang 内存模型详解(一),本文讲解了Go内存模型interface、,需要的朋友可以参考下
    2014-10-10
  • 从零封装Gin框架及项目初始化教程

    从零封装Gin框架及项目初始化教程

    这篇文章主要为大家介绍了从零封装Gin框架及项目的初始化教程详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2024-01-01
  • Go高级特性探究之协程池详解

    Go高级特性探究之协程池详解

    在并发编程中,协程是 Go 语言的核心特性之一,本文将介绍如何使用 Go 协程池构造一个协程池,并解决函数传参问题、优雅关闭协程池和保证协程安全的问题,感兴趣的可以了解一下
    2023-06-06
  • Go语言使用buffer读取文件的实现示例

    Go语言使用buffer读取文件的实现示例

    本文主要介绍了Go语言使用buffer读取文件的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-04-04
  • Golang算法之田忌赛马问题实现方法分析

    Golang算法之田忌赛马问题实现方法分析

    这篇文章主要介绍了Golang算法之田忌赛马问题实现方法,结合具体实例形式分析了基于Go语言的田忌赛马问题原理与算法实现技巧,需要的朋友可以参考下
    2017-02-02
  • golang的time包:秒、毫秒、纳秒时间戳输出方式

    golang的time包:秒、毫秒、纳秒时间戳输出方式

    这篇文章主要介绍了golang的time包:秒、毫秒、纳秒时间戳输出方式,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-12-12
  • Golang unsafe包中的类型和函数详解

    Golang unsafe包中的类型和函数详解

    Golang中的unsafe包用于在运行时进行低级别的操作,这些操作通常是不安全的,因为可以打破Golang的类型安全性和内存安全性,使用 unsafe包的程序可能会影响可移植性和兼容性,接下来看下unsafe包中的类型和函数
    2023-08-08
  • Go 并发读写 sync.map 详细

    Go 并发读写 sync.map 详细

    阅读本文你将会明确 sync.Map 和原生 map +互斥锁/读写锁之间的性能情况。标准库 sync.Map 虽说支持并发读写 map,但更适用于读多写少的场景,因为他写入的性能比较差,使用时要考虑清楚这一点。
    2021-10-10
  • Golang共享变量如何解决问题

    Golang共享变量如何解决问题

    协程之间的通信只能够通过通道。但是我们习惯于共享变量,而且很多时候使用共享变量能让代码更简洁。那么Golang共享变量如何解决问题,感兴趣的可以了解一下
    2021-12-12
  • golang 后台进程的启动和停止操作

    golang 后台进程的启动和停止操作

    这篇文章主要介绍了golang 后台进程的启动和停止操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2021-04-04

最新评论