go Worker Pool Pattern的实现

 更新时间:2026年06月25日 08:58:01   作者:geovindu  
本文展示了一个用Go语言实现的WorkerPool工作池模式,通过协程池处理珠宝订单的完整生产流程,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

项目结构:

/*
# 版权所有  2026 ©涂聚文有限公司™ ®
# 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎
# 描述:Worker Pool Pattern 工作池模式
# Author    : geovindu,Geovin Du 涂聚文.
# IDE       : goLang 2024.3.6 go 26.2
# os        : windows 10
# database  : mysql 9.0 sql server 2019, postgreSQL 17.0  Oracle 21c Neo4j
# Datetime  : 2026/6/21 17:56
# User      :  geovindu
# Product   : GoLand
# Project   : godesginpattern
# File      : logger.go
*/
package utils
 
import (
    "godesginpattern/workerpool/config"
    "io"
    "log"
    "os"
)
 
var Logger *log.Logger
 
func InitLogger() {
    // 打开日志文件
    file, err := os.OpenFile(config.LogFileName, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
    if err != nil {
        log.Fatal("日志文件创建失败:", err)
    }
 
    // 同时输出到:控制台 + 文件
    multiWriter := io.MultiWriter(os.Stdout, file)
 
    // 初始化日志
    Logger = log.New(multiWriter, "", config.LogFlag)
}
 
 
 
 
/*
# 版权所有  2026 ©涂聚文有限公司™ ®
# 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎
# 描述:Worker Pool Pattern 工作池模式
# Author    : geovindu,Geovin Du 涂聚文.
# IDE       : goLang 2024.3.6 go 26.2
# os        : windows 10
# database  : mysql 9.0 sql server 2019, postgreSQL 17.0  Oracle 21c Neo4j
# Datetime  : 2026/6/21 18:12
# User      :  geovindu
# Product   : GoLand
# Project   : godesginpattern
# File      : retry.go
*/
package utils
 
import (
    "godesginpattern/workerpool/config"
    "time"
)
 
// Retry 执行函数并自动重试
func Retry(task func() error) error {
    var err error
    for i := 0; i < config.MaxRetryTimes; i++ {
        err = task()
        if err == nil {
            return nil
        }
        Logger.Printf("任务失败,第 %d 次重试,错误: %v", i+1, err)
        time.Sleep(500 * time.Millisecond)
    }
    return err
}
 
 
 
/*
# 版权所有  2026 ©涂聚文有限公司™ ®
# 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎
# 描述:Worker Pool Pattern 工作池模式
# Author    : geovindu,Geovin Du 涂聚文.
# IDE       : goLang 2024.3.6 go 26.2
# os        : windows 10
# database  : mysql 9.0 sql server 2019, postgreSQL 17.0  Oracle 21c Neo4j
# Datetime  : 2026/6/21 18:12
# User      :  geovindu
# Product   : GoLand
# Project   : godesginpattern
# File      : monitor.go
*/
package utils
 
import "sync/atomic"
 
type Monitor struct {
    Running  int64
    Waiting  int64
    Finished int64
    Failed   int64
}
 
func (m *Monitor) IncRunning()  { atomic.AddInt64(&m.Running, 1) }
func (m *Monitor) DecRunning()  { atomic.AddInt64(&m.Running, -1) }
func (m *Monitor) IncWaiting()  { atomic.AddInt64(&m.Waiting, 1) }
func (m *Monitor) DecWaiting()  { atomic.AddInt64(&m.Waiting, -1) }
func (m *Monitor) IncFinished() { atomic.AddInt64(&m.Finished, 1) }
func (m *Monitor) IncFailed()   { atomic.AddInt64(&m.Failed, 1) }
 
func (m *Monitor) Get() (running, waiting, finished, failed int64) {
    return atomic.LoadInt64(&m.Running),
        atomic.LoadInt64(&m.Waiting),
        atomic.LoadInt64(&m.Finished),
        atomic.LoadInt64(&m.Failed)
}
 
 
/*
# 版权所有  2026 ©涂聚文有限公司™ ®
# 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎
# 描述:Worker Pool Pattern 工作池模式
# Author    : geovindu,Geovin Du 涂聚文.
# IDE       : goLang 2024.3.6 go 26.2
# os        : windows 10
# database  : mysql 9.0 sql server 2019, postgreSQL 17.0  Oracle 21c Neo4j
# Datetime  : 2026/6/21 17:58
# User      :  geovindu
# Product   : GoLand
# Project   : godesginpattern
# File      : settings.go
*/
package config
 
import "log"
 
const (
    WorkerCount     = 3
    QueueMaxSize    = 100
    MinTaskDelay    = 0.3
    MaxTaskDelay    = 0.8
    MaxRetryTimes   = 3
    LogFileName     = "jewelry.log"
    DBPath          = "jewelry.db"
    HTTPPort        = ":8080"
    MonitorInterval = 2
)
 
const LogFlag = log.LstdFlags | log.Lmicroseconds
 
 
 
/*
# 版权所有  2026 ©涂聚文有限公司™ ®
# 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎
# 描述:Worker Pool Pattern 工作池模式
# Author    : geovindu,Geovin Du 涂聚文.
# IDE       : goLang 2024.3.6 go 26.2
# os        : windows 10
# database  : mysql 9.0 sql server 2019, postgreSQL 17.0  Oracle 21c Neo4j
# Datetime  : 2026/6/21 18:01
# User      :  geovindu
# Product   : GoLand
# Project   : godesginpattern
# File      : task.go
*/
package tasks
 
import (
    "godesginpattern/workerpool/config"
    "math/rand"
    "time"
)
 
func RawMaterialCheck(orderID string) error {
    simulateDelay()
    return nil
}
 
func JewelryProcess(orderID string) error {
    simulateDelay()
    return nil
}
 
func FinishedGoodsCheck(orderID string) error {
    simulateDelay()
    return nil
}
 
func InventoryRecord(orderID string) error {
    simulateDelay()
    return nil
}
 
func OrderDelivery(orderID string) error {
    simulateDelay()
    return nil
}
 
var FullProcessTasks = []func(string) error{
    RawMaterialCheck,
    JewelryProcess,
    FinishedGoodsCheck,
    InventoryRecord,
    OrderDelivery,
}
 
func simulateDelay() {
    rand.Seed(time.Now().UnixNano())
    delay := config.MinTaskDelay + rand.Float64()*(config.MaxTaskDelay-config.MinTaskDelay)
    time.Sleep(time.Duration(delay*1000) * time.Millisecond)
}
 
 
 
/*
# 版权所有  2026 ©涂聚文有限公司™ ®
# 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎
# 描述:Worker Pool Pattern 工作池模式
# Author    : geovindu,Geovin Du 涂聚文.
# IDE       : goLang 2024.3.6 go 26.2
# os        : windows 10
# database  : mysql 9.0 sql server 2019, postgreSQL 17.0  Oracle 21c Neo4j
# Datetime  : 2026/6/21 18:01
# User      :  geovindu
# Product   : GoLand
# Project   : godesginpattern
# File      : worker_pool.go
*/
package core
 
import (
    "godesginpattern/workerpool/config"
    "godesginpattern/workerpool/utils"
    "os"
    "os/signal"
    "sync"
    "syscall"
    "time"
)
 
type Task struct {
    OrderID string
    Func    func(string) error
}
 
type WorkerPool struct {
    workerCount int
    taskChan    chan *Task
    wg          sync.WaitGroup
    monitor     *utils.Monitor
    quit        chan os.Signal
}
 
func NewWorkerPool(workerCnt int, queueSize int) *WorkerPool {
    wp := &WorkerPool{
        workerCount: workerCnt,
        taskChan:    make(chan *Task, queueSize),
        monitor:     &utils.Monitor{},
        quit:        make(chan os.Signal, 1),
    }
    signal.Notify(wp.quit, syscall.SIGINT, syscall.SIGTERM)
    return wp
}
 
func (wp *WorkerPool) worker(id int) {
    utils.Logger.Printf("Worker %d 已启动", id)
    for {
        select {
        case task, ok := <-wp.taskChan:
            if !ok {
                utils.Logger.Printf("Worker %d 安全退出", id)
                return
            }
 
            wp.monitor.DecWaiting()
            wp.monitor.IncRunning()
 
            utils.Logger.Printf("Worker %d 开始任务: %s", id, task.OrderID)
            err := utils.Retry(func() error {
                return task.Func(task.OrderID)
            })
 
            if err != nil {
                utils.Logger.Printf("Worker %d 任务失败: %s, 错误: %v", id, task.OrderID, err)
            } else {
                utils.Logger.Printf("Worker %d 完成任务: %s", id, task.OrderID)
            }
 
            wp.monitor.DecRunning()
            wp.monitor.IncFinished()
            wp.wg.Done()
 
        case <-wp.quit:
            utils.Logger.Printf("Worker %d 收到关闭信号,退出", id)
            return
        }
    }
}
 
func (wp *WorkerPool) Start() {
    utils.Logger.Println("工作池启动")
    for i := 1; i <= wp.workerCount; i++ {
        go wp.worker(i)
    }
    go wp.monitorLoop()
}
 
func (wp *WorkerPool) Submit(task *Task) {
    wp.wg.Add(1)
    wp.monitor.IncWaiting()
    wp.taskChan <- task
}
 
func (wp *WorkerPool) monitorLoop() {
    ticker := time.NewTicker(time.Duration(config.MonitorInterval) * time.Second)
    defer ticker.Stop()
    for {
        select {
        case <-ticker.C:
            r, w, f, failed := wp.monitor.Get()
            utils.Logger.Printf("[监控] 运行:%d | 等待:%d | 完成:%d | 失败:%d", r, w, f, failed)
            // 运行、等待都为0,代表所有任务全部完成
            if r == 0 && w == 0 {
                utils.Logger.Println("所有任务处理完成,自动停止监控输出")
                return
            }
        case <-wp.quit:
            utils.Logger.Println("监控循环收到关闭信号,停止监控输出")
            return
        }
    }
}
 
func (wp *WorkerPool) Wait() {
    // 阻塞等待 Ctrl+C / kill 信号
    <-wp.quit
    utils.Logger.Println("优雅关闭中...")
    // 关闭任务通道,worker 不再接收新任务
    close(wp.taskChan)
    // 等待正在执行的任务全部处理完毕
    wp.wg.Wait()
    utils.Logger.Println("✅ 所有任务执行完毕,服务安全退出")
}
  

调用:

/*
# 版权所有  2026 ©涂聚文有限公司™ ®
# 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎
# 描述:Worker Pool Pattern 工作池模式
# Author    : geovindu,Geovin Du 涂聚文.
# IDE       : goLang 2024.3.6 go 26.2
# os        : windows 10
# database  : mysql 9.0 sql server 2019, postgreSQL 17.0  Oracle 21c Neo4j
# Datetime  : 2026/6/21 18:02
# User      :  geovindu
# Product   : GoLand
# Project   : godesginpattern
# File      : workerpoolbll.go
*/
package bll
 
import (
    "fmt"
    "godesginpattern/workerpool/config"
    "godesginpattern/workerpool/core"
    "godesginpattern/workerpool/tasks"
    "godesginpattern/workerpool/utils"
)
 
func WorkerPoolMain() {
    utils.InitLogger()
    logger := utils.Logger
 
    logger.Println("================================================")
    logger.Println("   珠宝企业级生产系统(Go 企业级 Worker Pool)")
    logger.Println("================================================")
 
    pool := core.NewWorkerPool(config.WorkerCount, config.QueueMaxSize)
    pool.Start()
 
    totalOrder := 10
    logger.Printf("开始提交 %d 个珠宝订单\n", totalOrder)
 
    for i := 1; i <= totalOrder; i++ {
        orderID := fmt.Sprintf("订单-%03d", i)
        for _, fn := range tasks.FullProcessTasks {
            pool.Submit(&core.Task{
                OrderID: orderID,
                Func:    fn,
            })
        }
    }
 
    logger.Println("✅ 所有订单已提交,按 Ctrl+C 优雅关闭")
    pool.Wait()
}
  


输出:

到此这篇关于go Worker Pool Pattern的实现的文章就介绍到这了,更多相关go Worker Pool Pattern内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • golang实现简单工厂、方法工厂、抽象工厂三种设计模式

    golang实现简单工厂、方法工厂、抽象工厂三种设计模式

    这篇文章介绍了golang实现简单工厂、方法工厂、抽象工厂三种设计模式的方法,文中通过示例代码介绍的非常详细。对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2022-04-04
  • 详解Go是如何优雅的进行内存管理

    详解Go是如何优雅的进行内存管理

    Go语言抛弃C/C++中的开发者管理内存的方式,实现了主动申请与主动释放管理,增加了逃逸分析和垃圾回收,将开发者从内存管理中释放出来,作为进阶的Go开发,了解掌握Go的内存管理还是很有必要的
    2023-09-09
  • Golang中Json的序列化和反序列化的使用

    Golang中Json的序列化和反序列化的使用

    本文主要介绍了Golang中Json的序列化和反序列化的使用,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-04-04
  • GO语言类型转换和类型断言实例分析

    GO语言类型转换和类型断言实例分析

    这篇文章主要介绍了GO语言类型转换和类型断言,以实例形式详细分析了类型转换和类型断言的概念与使用技巧,需要的朋友可以参考下
    2015-01-01
  • Go Java算法之二叉树的所有路径示例详解

    Go Java算法之二叉树的所有路径示例详解

    这篇文章主要为大家介绍了Go Java算法之二叉树的所有路径示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-08-08
  • Go REFLECT Library反射类型详解

    Go REFLECT Library反射类型详解

    这篇文章主要为大家介绍了Go REFLECT Library反射类型详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-08-08
  • Golang Gin embed static静态文件嵌入问题

    Golang Gin embed static静态文件嵌入问题

    在使用Gin框架开发Web服务时,需解决静态资源嵌入问题,本文探讨了将static目录嵌入到Gin生成的应用程序中,通过直接使用Gin自带的http.FileServer方法,简化了开发流程,最终实现了一键部署的目标
    2026-06-06
  • 浅析Golang中float64的精度问题

    浅析Golang中float64的精度问题

    这篇文章主要来和大家一起探讨一下Golang中关于float64的精度问题,文中的示例代码讲解详细,具有一定的学习价值,感兴趣的小伙伴可以了解下
    2023-08-08
  • GO语言实现简单TCP服务的方法

    GO语言实现简单TCP服务的方法

    这篇文章主要介绍了GO语言实现简单TCP服务的方法,实例分析了Go语言实现TCP服务的技巧,需要的朋友可以参考下
    2015-03-03
  • 超实用的Golang通道指南之轻松实现并发编程

    超实用的Golang通道指南之轻松实现并发编程

    Golang 中的通道是一种高效、安全、灵活的并发机制,用于在并发环境下实现数据的同步和传递。本文主要介绍了如何利用通道轻松实现并发编程,需要的可以参考一下
    2023-04-04

最新评论