Golang中Retry重试实践

 更新时间:2026年02月26日 09:04:47   作者:X_PENG  
本文主要介绍了Golang中Retry重试实践,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

使用github.com/kamilsk/retry/v5包,核心API:

// Do takes the action and performs it, repetitively, until successful.
//
// Optionally, strategies may be passed that assess whether or not an attempt
// should be made.
func Do(
    breaker Breaker,
    action func(context.Context) error,
    strategies ...func(Breaker, uint, error) bool,
) error {
    var (
       ctx        = convert(breaker)
       err  error = internal
       core error
    )

    for attempt, should := uint(0), true; should; attempt++ {
       core = unwrap(err)
       for i, repeat := 0, len(strategies); should && i < repeat; i++ {
          should = should && strategies[i](breaker, attempt, core)
       }

       select {
       case <-breaker.Done():// 若有中断信号,则直接返回
          return breaker.Err()
       default:// 没有中断信号,则根据策略决定是否执行
          if should {
             err = action(ctx)
          }
       }

       should = should && err != nil
    }

    return err
}

// Action defines a callable function that package retry can handle.
type Action = func(context.Context) error

// 中断器带有「中断信号」
// A Breaker carries a cancellation signal to interrupt an action execution.
//
// It is a subset of the built-in context and github.com/kamilsk/breaker interfaces.
type Breaker = interface {
    // Done returns a channel that's closed when a cancellation signal occurred.
    Done() <-chan struct{}
    // If Done is not yet closed, Err returns nil.
    // If Done is closed, Err returns a non-nil error.
    // After Err returns a non-nil error, successive calls to Err return the same error.
    Err() error
}

由两个维度控制执行行为:一个是「Breaker中断器」,一个是「strategies策略」。

中断器

具体有哪些中断器见github.com/kamilsk/breaker(也可自定义中断器):

// Interface carries a cancellation signal to interrupt an action execution.
//
// Example based on github.com/kamilsk/retry/v5 module:
//
//  if err := retry.Do(breaker.BreakByTimeout(time.Minute), action); err != nil {
//      log.Fatal(err)
//  }
//
// Example based on github.com/kamilsk/semaphore/v5 module:
//
//  if err := semaphore.Acquire(breaker.BreakByTimeout(time.Minute), 5); err != nil {
//      log.Fatal(err)
//  }
//
type Interface interface {
    // Close closes the Done channel and releases resources associated with it.
    Close()
    // Done returns a channel that's closed when a cancellation signal occurred.
    Done() <-chan struct{}
    // If Done is not yet closed, Err returns nil.
    // If Done is closed, Err returns a non-nil error.
    // After Err returns a non-nil error, successive calls to Err return the same error.
    Err() error

    // trigger is a private method to guarantee that the breakers come from
    // this package and all of them return a valid Done channel.
    trigger() Interface
}


// 基于信号的中断器
func BreakByChannel(signal <-chan struct{}) Interface {
    return (&channelBreaker{newBreaker(), signal}).trigger()
}

type channelBreaker struct {
    *breaker
    relay <-chan struct{}
}

// Close closes the Done channel and releases resources associated with it.
func (br *channelBreaker) Close() {
    br.closer.Do(func() {
       close(br.signal)
    })
}

// trigger starts listening to the internal signal to close the Done channel.
func (br *channelBreaker) trigger() Interface {
    go func() {
       select {
       case <-br.relay:
       case <-br.signal:
       }
       br.Close()

       // the goroutine is done
       atomic.StoreInt32(&br.released, 1)
    }()
    return br
}


// 基于超时时间的中断器
func BreakByTimeout(timeout time.Duration) Interface {
	if timeout < 0 {
		return closedBreaker()
	}
	return newTimeoutBreaker(timeout).trigger()
}

type timeoutBreaker struct {
	*breaker
	*time.Timer
}

// Close closes the Done channel and releases resources associated with it.
func (br *timeoutBreaker) Close() {
	br.closer.Do(func() {
		stop(br.Timer)
		close(br.signal)
	})
}

// trigger starts listening to the internal timer to close the Done channel.
func (br *timeoutBreaker) trigger() Interface {
	go func() {
		select {
		case <-br.Timer.C:
		case <-br.signal:
		}
		br.Close()

		// the goroutine is done
		atomic.StoreInt32(&br.released, 1)
	}()
	return br
}

func stop(timer *time.Timer) {
	if !timer.Stop() {
		select {
		case <-timer.C:
		default:
		}
	}
}

func newBreaker() *breaker {
    return &breaker{signal: make(chan struct{})}
}

type breaker struct {
    closer   sync.Once
    signal   chan struct{}
    released int32
}

// Close closes the Done channel and releases resources associated with it.
func (br *breaker) Close() {
    br.closer.Do(func() {
       close(br.signal)
       atomic.StoreInt32(&br.released, 1)
    })
}

// Done returns a channel that's closed when a cancellation signal occurred.
func (br *breaker) Done() <-chan struct{} {
    return br.signal
}

// Err returns a non-nil error if the Done channel is closed and nil otherwise.
// After Err returns a non-nil error, successive calls to Err return the same error.
func (br *breaker) Err() error {
    if atomic.LoadInt32(&br.released) == 1 {
       return Interrupted
    }
    return nil
}

// IsReleased returns true if resources associated with the breaker were released.
func (br *breaker) IsReleased() bool {
    return atomic.LoadInt32(&br.released) == 1
}

func (br *breaker) trigger() Interface {
    return br
}

组合多个中断器:

func Multiplex(breakers ...Interface) Interface {
	if len(breakers) == 0 {
		return closedBreaker()
	}
	for len(breakers) < 3 {
		breakers = append(breakers, stub{})
	}
	return newMultiplexedBreaker(breakers).trigger()
}

func newMultiplexedBreaker(breakers []Interface) *multiplexedBreaker {
	return &multiplexedBreaker{newBreaker(), breakers}
}

type multiplexedBreaker struct {
	*breaker
	breakers []Interface
}

// Close closes the Done channel and releases resources associated with it.
func (br *multiplexedBreaker) Close() {
	br.closer.Do(func() {
		each(br.breakers).Close()
		close(br.signal)
	})
}

// trigger starts listening to the all Done channels of multiplexed breakers.
func (br *multiplexedBreaker) trigger() Interface {
	go func() {
		if len(br.breakers) == 3 {
			select {
			case <-br.breakers[0].Done():
			case <-br.breakers[1].Done():
			case <-br.breakers[2].Done():
			}
		} else {
			brs := make([]reflect.SelectCase, 0, len(br.breakers))
			for _, br := range br.breakers {
				brs = append(brs, reflect.SelectCase{
					Dir:  reflect.SelectRecv,
					Chan: reflect.ValueOf(br.Done()),
				})
			}
			reflect.Select(brs)
		}
		br.Close()

		// the goroutine is done
		atomic.StoreInt32(&br.released, 1)
	}()
	return br
}

策略

具体策略可见github.com/kamilsk/retry/v5/strategy包(也可自定义策略):

type Strategy = func(breaker Breaker, attempt uint, err error) bool

// 重试次数
// Limit creates a Strategy that limits the number of attempts
// that Retry will make.
func Limit(value uint) Strategy {
    return func(_ Breaker, attempt uint, _ error) bool {
       return attempt < value
    }
}

// 延迟执行
// Delay creates a Strategy that waits the given duration
// before the first attempt is made.
func Delay(duration time.Duration) Strategy {
    return func(breaker Breaker, attempt uint, _ error) bool {
       keep := true
       if attempt == 0 {
          timer := time.NewTimer(duration)
          select {
          case <-timer.C:
          case <-breaker.Done():
             keep = false
          }
          stop(timer)
       }
       return keep
    }
}

// 延迟退避
// Backoff creates a Strategy that waits before each attempt, with a duration as
// defined by the given backoff.Algorithm.
func Backoff(algorithm func(attempt uint) time.Duration) Strategy {
    return BackoffWithJitter(algorithm, func(duration time.Duration) time.Duration {
       return duration
    })
}

// BackoffWithJitter creates a Strategy that waits before each attempt, with a
// duration as defined by the given backoff.Algorithm and jitter.Transformation.
func BackoffWithJitter(
    algorithm func(attempt uint) time.Duration,
    transformation func(duration time.Duration) time.Duration,
) Strategy {
    return func(breaker Breaker, attempt uint, _ error) bool {
       keep := true
       if attempt > 0 {
          timer := time.NewTimer(transformation(algorithm(attempt)))
          select {
          case <-timer.C:
          case <-breaker.Done():
             keep = false
          }
          stop(timer)
       }
       return keep
    }
}

到此这篇关于Golang中Retry重试实践的文章就介绍到这了,更多相关Golang Retry重试内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • go mod 安装依赖 unkown revision问题的解决方案

    go mod 安装依赖 unkown revision问题的解决方案

    这篇文章主要介绍了go mod 安装依赖 unkown revision问题的解决方案,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2021-05-05
  • Golang中实现数据脱敏处理的go-mask包分享

    Golang中实现数据脱敏处理的go-mask包分享

    这篇文章主要是来和大家分享一个在输出中对敏感数据进行脱敏的工作包:go-mask,可以将敏感信息输出的时候替换成星号或其他字符,感兴趣的小编可以跟随小编一起了解下
    2023-05-05
  • golang开发及数字证书研究分享

    golang开发及数字证书研究分享

    这篇文章主要为大家介绍了golang开发以及数字证书的研究示例分享,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步
    2021-11-11
  • Golang time.Sleep()用法及示例讲解

    Golang time.Sleep()用法及示例讲解

    Go语言中的Sleep()函数用于在至少规定的持续时间d内停止最新的go-routine,这篇文章主要介绍了Golang time.Sleep()用法及示例讲解,需要的朋友可以参考下
    2023-02-02
  • Go语言中通过Lua脚本操作Redis的方法

    Go语言中通过Lua脚本操作Redis的方法

    这篇文章主要给大家介绍了关于Go语言中通过Lua脚本操作Redis的方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考借鉴,下面随着小编来一起学习学习吧。
    2018-01-01
  • 关于Golang变量初始化/类型推断/短声明的问题

    关于Golang变量初始化/类型推断/短声明的问题

    这篇文章主要介绍了关于Golang变量初始化/类型推断/短声明的问题,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-02-02
  • uber go zap 日志框架支持异步日志输出

    uber go zap 日志框架支持异步日志输出

    这篇文章主要为大家介绍了uber go zap 日志框架支持异步日志输出示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-01-01
  • Go Web编程添加服务器错误和访问日志

    Go Web编程添加服务器错误和访问日志

    这篇文章主要为大家介绍了Go Web编程添加服务器错误日志和访问日志的示例解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-06-06
  • Go语言sync锁与对象池的实现

    Go语言sync锁与对象池的实现

    本文介绍了Go语言标准库sync包提供的并发控制工具,主要包括互斥锁(sync.Mutex)和读写锁(sync.RWMutex)两类同步机制,下面就来具体介绍一下这两个的使用,感兴趣的可以了解一下
    2025-08-08
  • Go语言操作金仓数据库之SQL执行,类型映射与超时控制详解

    Go语言操作金仓数据库之SQL执行,类型映射与超时控制详解

    本文详细介绍了使用Go语言操作金仓数据库的关键技术点,主要包括 SQL执行,通过Query方法执行查询并处理结果集,使用Exec执行非查询操作,以及事务处理多条SQL语句的方法,有需要的小伙伴可以了解下
    2026-05-05

最新评论