Go实现后台任务调度系统的实例代码

 更新时间:2023年06月12日 10:10:30   作者:堆栈future  
平常我们在开发API的时候,前端传递过来的大批数据需要经过后端处理,如果后端处理的速度快,前端响应就快,反之则很慢,影响用户体验,为了解决这一问题,需要我们自己实现后台任务调度系统,本文将介绍如何用Go语言实现后台任务调度系统,需要的朋友可以参考下

一、背景

平常我们在开发API的时候,前端传递过来的大批数据需要经过后端处理,如果后端处理的速度快,前端响应就快,反之则很慢,影响用户体验。针对这种场景我们一般都是后台异步处理,不需要前端等待所有的都执行完才返回。为了解决这一问题,需要我们自己实现后台任务调度系统。

二、任务调度器实现

poll.go

package poller
import (
	"context"
	"fmt"
	"log"
	"sync"
	"time"
)
type Poller struct {
	routineGroup *goroutineGroup // 并发控制
	workerNum    int // 记录同时在运行的最大goroutine数
	sync.Mutex
	ready  chan struct{} // 某个goroutine已经准备好了
	metric *metric // 统计当前在运行中的goroutine数量
}
func NewPoller(workerNum int) *Poller {
	return &Poller{
		routineGroup: newRoutineGroup(),
		workerNum:    workerNum,
		ready:        make(chan struct{}, 1),
		metric:       newMetric(),
	}
}
// 调度器
func (p *Poller) schedule() {
	p.Lock()
	defer p.Unlock()
	if int(p.metric.BusyWorkers()) >= p.workerNum {
		return
	}
	select {
	case p.ready <- struct{}{}: // 只要满足当前goroutine数量小于最大goroutine数量 那么就通知poll去调度goroutine执行任务
	default:
	}
}
func (p *Poller) Poll(ctx context.Context) error {
	for {
		// step01
		p.schedule() // 调度
		select {
		case <-p.ready: // goroutine准备好之后 这里就会有消息
		case <-ctx.Done():
			return nil
		}
	LOOP:
		for {
			select {
			case <-ctx.Done():
				break LOOP
			default:
				// step02
				task, err := p.fetch(ctx) // 获取任务
				if err != nil {
					log.Println("fetch task error:", err.Error())
					break
				}
				fmt.Println(task)
				p.metric.IncBusyWorker() // 当前正在运行的goroutine+1
				// step03
				p.routineGroup.Run(func() { // 执行任务
					if err := p.execute(ctx, task); err != nil {
						log.Println("execute task error:", err.Error())
					}
				})
				break LOOP
			}
		}
	}
}
func (p *Poller) fetch(ctx context.Context) (string, error) {
	time.Sleep(1000 * time.Millisecond)
	return "task", nil
}
func (p *Poller) execute(ctx context.Context, task string) error {
	defer func() {
		p.metric.DecBusyWorker() // 执行完成之后 goroutine数量-1
		p.schedule() // 重新调度下一个goroutine去执行任务 这一步是必须的
	}()
	return nil
}

metric.go

package poller
import "sync/atomic"
type metric struct {
	busyWorkers uint64
}
func newMetric() *metric {
	return &metric{}
}
func (m *metric) IncBusyWorker() uint64 {
	return atomic.AddUint64(&m.busyWorkers, 1)
}
func (m *metric) DecBusyWorker() uint64 {
	return atomic.AddUint64(&m.busyWorkers, ^uint64(0))
}
func (m *metric) BusyWorkers() uint64 {
	return atomic.LoadUint64(&m.busyWorkers)
}

goroutine_group.go

package poller
import "sync"
type goroutineGroup struct {
	waitGroup sync.WaitGroup
}
func newRoutineGroup() *goroutineGroup {
	return new(goroutineGroup)
}
func (g *goroutineGroup) Run(fn func()) {
	g.waitGroup.Add(1)
	go func() {
		defer g.waitGroup.Done()
		fn()
	}()
}
func (g *goroutineGroup) Wait() {
	g.waitGroup.Wait()
}

三、测试

package main
import (
	"context"
	"fmt"
	"ta/poller"
	"go.uber.org/goleak"
	"testing"
)
func TestMain(m *testing.M)  {
	fmt.Println("start")
	goleak.VerifyTestMain(m)
}
func TestPoller(t *testing.T) {
	producer := poller.NewPoller(5)
	producer.Poll(context.Background())
}

结果:

四、总结

大家用别的方式也可以实现,核心要点就是控制并发节奏,防止大量请求打到task service,在这里起到核心作用的就是schedule,它控制着整个任务系统的调度。同时还封装了WaitGroup,这在大多数开源代码中都比较常见,大家可以去尝试。另外就是test case一定得跟上,防止goroutine泄漏。

以上就是Go实现后台任务调度系统的实例代码的详细内容,更多关于Go后台任务调度系统的资料请关注脚本之家其它相关文章!

相关文章

  • Go高级特性之并发处理http详解

    Go高级特性之并发处理http详解

    Golang 作为一种高效的编程语言,提供了多种方法来实现并发发送 HTTP 请求,本文将深入探讨 Golang 中并发发送 HTTP 请求的最佳技术和实践,希望对大家有所帮助
    2024-02-02
  • Go设计模式之享元模式讲解和代码示例

    Go设计模式之享元模式讲解和代码示例

    享元是一种结构型设计模式,它允许你在消耗少量内存的情况下支持大量对象,模式通过共享多个对象的部分状态来实现上述功能,换句话来说,享元会将不同对象的相同数据进行缓存以节省内存,本文就将通过代码示例给大家详细介绍一下享元模式
    2023-06-06
  • Go语言实现生成样式美观的PDF文件

    Go语言实现生成样式美观的PDF文件

    使用 Go 语言生成样式美观的 PDF 文件是一个常见的需求,尤其是在报告生成、发票、合同等场景中,下面就跟随小编一起来学习一下具体实现方法吧
    2025-01-01
  • Go Gin框架优雅重启和停止实现方法示例

    Go Gin框架优雅重启和停止实现方法示例

    Web应用程序中,有时需要重启或停止服务器,无论是因为更新代码还是进行例行维护,这时需要保证应用程序的可用性和数据的一致性,就需要优雅地关闭和重启应用程序,即不丢失正在处理的请求和不拒绝新的请求,本文将详解如何在Go语言中使用Gin这个框架实现优雅的重启停止
    2024-01-01
  • GO中的条件变量sync.Cond详解

    GO中的条件变量sync.Cond详解

    条件变量是基于互斥锁的,它必须基于互斥锁才能发挥作用,条件变量的初始化离不开互斥锁,并且它的方法有点也是基于互斥锁的,这篇文章主要介绍了GO的条件变量sync.Cond,需要的朋友可以参考下
    2023-01-01
  • go第三方库sqlx操作MySQL及ORM原理

    go第三方库sqlx操作MySQL及ORM原理

    这篇文章主要为大家介绍了go第三方库sqlx操作MySQL及ORM实现原理,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-05-05
  • go mod更新指定的tag的包后,go vendor内容未更新问题

    go mod更新指定的tag的包后,go vendor内容未更新问题

    这篇文章主要介绍了go mod更新指定的tag的包后,go vendor内容未更新问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2023-09-09
  • Golang 中反射的应用实例详解

    Golang 中反射的应用实例详解

    这篇文章主要为大家介绍了Golang 中反射的应用实例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-08-08
  • Golang IOT中的数据序列化与解析过程

    Golang IOT中的数据序列化与解析过程

    这篇文章主要介绍了Golang IOT中的数据序列化与解析,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2023-05-05
  • Go设计模式之观察者模式图解

    Go设计模式之观察者模式图解

    观察者模式是一种行为设计模式, 允许你定义一种订阅机制, 可在对象事件发生时通知多个 “观察” 该对象的其他对象,下面这篇文章主要给大家介绍了关于图解Go观察者模式的相关资料,需要的朋友可以参考下
    2023-07-07

最新评论