golang的tunny的用法示例教程

 更新时间:2023年07月14日 13:59:58   作者:codecraft  
这篇文章主要为大家介绍了golang的tunny的用法示例教程,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

本文主要研究一下tunny

Worker

type Worker interface {
    // Process will synchronously perform a job and return the result.
    Process(interface{}) interface{}
    // BlockUntilReady is called before each job is processed and must block the
    // calling goroutine until the Worker is ready to process the next job.
    BlockUntilReady()
    // Interrupt is called when a job is cancelled. The worker is responsible
    // for unblocking the Process implementation.
    Interrupt()
    // Terminate is called when a Worker is removed from the processing pool
    // and is responsible for cleaning up any held resources.
    Terminate()
}

 Worker接口定义了Process、BlockUntilReady、Interrupt、Terminate方法

closureWorker

type closureWorker struct {
    processor func(interface{}) interface{}
}
func (w *closureWorker) Process(payload interface{}) interface{} {
    return w.processor(payload)
}
func (w *closureWorker) BlockUntilReady() {}
func (w *closureWorker) Interrupt()       {}
func (w *closureWorker) Terminate()       {}

 closureWorker定义了processor属性,它实现了Worker接口的Process、BlockUntilReady、Interrupt、Terminate方法,其中Process方法委托给processor

callbackWorker

type callbackWorker struct{}
func (w *callbackWorker) Process(payload interface{}) interface{} {
    f, ok := payload.(func())
    if !ok {
        return ErrJobNotFunc
    }
    f()
    return nil
}
func (w *callbackWorker) BlockUntilReady() {}
func (w *callbackWorker) Interrupt()       {}
func (w *callbackWorker) Terminate()       {}

 callbackWorker定义了processor属性,它实现了Worker接口的Process、BlockUntilReady、Interrupt、Terminate方法,其中Process方法执行的是payload函数

Pool

type Pool struct {
    queuedJobs int64
    ctor    func() Worker
    workers []*workerWrapper
    reqChan chan workRequest
    workerMut sync.Mutex
}
func New(n int, ctor func() Worker) *Pool {
    p := &Pool{
        ctor:    ctor,
        reqChan: make(chan workRequest),
    }
    p.SetSize(n)
    return p
}
func NewFunc(n int, f func(interface{}) interface{}) *Pool {
    return New(n, func() Worker {
        return &closureWorker{
            processor: f,
        }
    })
}
func NewCallback(n int) *Pool {
    return New(n, func() Worker {
        return &callbackWorker{}
    })
}

 Pool定义了queuedJobs、ctor、workers、reqChan、workerMut属性;New方法根据n和ctor创建Pool;NewFunc方法根据n和f来创建closureWorker;NewCallback方法创建callbackWorker

Process

func (p *Pool) Process(payload interface{}) interface{} {
    atomic.AddInt64(&p.queuedJobs, 1)
    request, open := <-p.reqChan
    if !open {
        panic(ErrPoolNotRunning)
    }
    request.jobChan <- payload
    payload, open = <-request.retChan
    if !open {
        panic(ErrWorkerClosed)
    }
    atomic.AddInt64(&p.queuedJobs, -1)
    return payload
}

 Process方法首先递增queuedJobs,然后从reqChan读取request,然后往jobChan写入payload,之后再等待retChan,最后递减queuedJobs

SetSize

func (p *Pool) SetSize(n int) {
    p.workerMut.Lock()
    defer p.workerMut.Unlock()
    lWorkers := len(p.workers)
    if lWorkers == n {
        return
    }
    // Add extra workers if N > len(workers)
    for i := lWorkers; i < n; i++ {
        p.workers = append(p.workers, newWorkerWrapper(p.reqChan, p.ctor()))
    }
    // Asynchronously stop all workers > N
    for i := n; i < lWorkers; i++ {
        p.workers[i].stop()
    }
    // Synchronously wait for all workers > N to stop
    for i := n; i < lWorkers; i++ {
        p.workers[i].join()
    }
    // Remove stopped workers from slice
    p.workers = p.workers[:n]
}

 SetSize方法首先通过workerMut加锁,然后根据lWorkers创建newWorkerWrapper,之后执行worker.stop,再执行worker.join(),然后清空workers

Close

func (p *Pool) Close() {
    p.SetSize(0)
    close(p.reqChan)
}

 Close方法执行SetSize(0)及close(p.reqChan)

实例

func TestFuncJob(t *testing.T) {
    pool := NewFunc(10, func(in interface{}) interface{} {
        intVal := in.(int)
        return intVal * 2
    })
    defer pool.Close()
    for i := 0; i < 10; i++ {
        ret := pool.Process(10)
        if exp, act := 20, ret.(int); exp != act {
            t.Errorf("Wrong result: %v != %v", act, exp)
        }
    }
}

 TestFuncJob通过NewFunc创建pool,

小结

tunny的Worker接口定义了Process、BlockUntilReady、Interrupt、Terminate方法;NewFunc方法创建的是closureWorker,NewCallback方法创建的是callbackWorker。

doc

以上就是golang的tunny的详细内容,更多关于golang tunny的资料请关注脚本之家其它相关文章!

相关文章

  • 基于原生Go语言开发一个博客系统

    基于原生Go语言开发一个博客系统

    这篇文章主要为大家详细介绍了如何基于原生Go语言开发一个简单的博客系统,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下
    2024-02-02
  • 深入探讨Go语言中的预防性接口为什么是不必要的

    深入探讨Go语言中的预防性接口为什么是不必要的

    在Go语言中,有一种从其他语言带来的常见模式:预防性接口,虽然这种模式在 Java 等语言中很有价值,但在Go中往往会成为反模式,本文我们就来深入探讨一下原因
    2025-01-01
  • Gorm更新零值问题解决思路与过程

    Gorm更新零值问题解决思路与过程

    这篇文章主要介绍了Gorm更新零值问题解决思路与过程,总的来说这并不是一道难题,那为什么要拿出这道题介绍?拿出这道题真正想要传达的是解题的思路,以及不断优化探寻最优解的过程。希望通过这道题能给你带来一种解题优化的思路
    2023-01-01
  • Go基础教程系列之import导入包(远程包)和变量初始化详解

    Go基础教程系列之import导入包(远程包)和变量初始化详解

    这篇文章主要介绍了Go基础教程系列之import导包和初始化详解,需要的朋友可以参考下
    2022-04-04
  • 深入了解Go的HttpClient超时机制

    深入了解Go的HttpClient超时机制

    在写 Go 的过程中经常对比这Java和GO语言的特性,踩了不少坑,也发现了不少有意思的地方,今天就来聊聊 Go 自带的 HttpClient 的超时机制
    2022-11-11
  • Go语言中常量和变量的定义、使用规范及常见应用场景

    Go语言中常量和变量的定义、使用规范及常见应用场景

    每一门语言都会有常量的定义,变量的定义,以及基于这些定义的运算,下面这篇文章主要给大家介绍了关于Go语言中常量和变量的定义、使用规范及常见应用场景的相关资料,文中通过代码介绍的非常详细,需要的朋友可以参考下
    2024-06-06
  • 对Golang import 导入包语法详解

    对Golang import 导入包语法详解

    今天小编就为大家分享一篇对Golang import 导入包语法详解,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2019-06-06
  • Golang中的Unicode与字符串示例详解

    Golang中的Unicode与字符串示例详解

    这篇文章主要给大家介绍了关于Golang中Unicode与字符串的相关资料,文中通过示例代码介绍的非常详细,对大家学习或者使用Golang具有一定的参考学习价值,需要的朋友们下面来一起学习学习吧
    2020-05-05
  • Golang import本地包和导入问题相关详解

    Golang import本地包和导入问题相关详解

    这篇文章主要介绍了Golang import本地包和导入问题相关详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-02-02
  • Go获取两个时间点时间差的具体实现

    Go获取两个时间点时间差的具体实现

    本文主要介绍了Go获取两个时间点时间差的具体实现,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-04-04

最新评论