Golang分布式应用定时任务示例详解

 更新时间:2022年07月29日 17:10:26   作者:qingwave  
这篇文章主要为大家介绍了Golang分布式应用定时任务示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

正文

在系统开发中,有一类任务不是立即执行,而是在未来某个时间点或者按照一定间隔去执行,比如日志定期压缩、报表制作、过期数据清理等,这就是定时任务。

在单机中,定时任务通常需要实现一个类似crontab的系统,一般有两种方式:

  • 最小堆,按照任务执行时间建堆,每次取最近的任务执行
  • 时间轮,将任务放到时间轮列表中,每次转动取对应的任务列表执行

最小堆

最小堆是一种特殊的完全二叉树,任意非叶子节点的值不大于其子节点,如图

通过最小堆,根据任务最近执行时间键堆,每次取堆顶元素即最近需要执行的任务,设置timer定时器,到期后触发任务执行。由于堆的特性每次调整的时间复杂度为O(lgN),相较于普通队列性能更快。

container/heap中已经实现操作堆的相关函数,我们只需要实现定期任务核心逻辑即可。

// 运行
func (c *Cron) Run() error {
    // 设置cron已启动,atomic.Bool来保证并发安全
	c.started.Store(true)
    // 主循环
	for {
        // 如果停止则退出
		if !c.started.Load() {
			break
		}
		c.runTask()
	}
	return nil
}
// 核心逻辑
func (c *Cron) runTask() {
	now := time.Now()
	duration := infTime
	// 获取堆顶元素
	task, ok := c.tasks.Peek()
	if ok {
		// 如果已删除则弹出
		if !c.set.Has(task.Name()) {
			c.tasks.Pop()
			return
		}
		// 计算于当前时间查找,设置定时器
		if task.next.After(now) {
			duration = task.next.Sub(now)
		} else {
			duration = 0
		}
	}
	timer := time.NewTimer(duration)
	defer timer.Stop()
	// 当有新元素插入直接返回,防止新元素执行时间小于当前堆顶元素
	select {
	case <-c.new:
		return
	case <-timer.C:
	}
	// 弹出任务,执行
	go task.Exec()
	// 计算下次执行时间,如果为0说明任务已结束,否则重新入堆
	task.next = task.Next(time.Now())
	if task.next.IsZero() {
		c.set.Delete(task.Name())
	} else {
		c.tasks.Push(task)
	}
}

主要逻辑可总结为:

  • 将任务按照下次执行时间建最小堆
  • 每次取堆顶任务,设置定时器
  • 如果中间有新加入任务,转入步骤2
  • 定时器到期后执行任务
  • 再次取下个任务,转入步骤2,依次执行

时间轮

另一种实现Cron的方式是时间轮,时间轮通过一个环形队列,每个插槽放入需要到期执行的任务,按照固定间隔转动时间轮,取插槽中任务列表执行,如图所示:

时间轮可看作一个表盘,如图中时间间隔为1秒,总共60个格子,如果任务在3秒后执行则放为插槽3,每秒转动次取插槽上所有任务执行。

如果执行时间超过最大插槽,比如有个任务需要63秒后执行(超过了最大格子刻度),一般可以通过多层时间轮,或者设置一个额外变量圈数,只执行圈数为0的任务。

时间轮插入的时间复杂度为O(1),获取任务列表复杂度为O(1),执行列表最差为O(n)。对比最小堆,时间轮插入删除元素更快。

核心代码如下:

// 定义
type TimeWheel struct {
	interval    time.Duration // 触发间隔
	slots       int // 总插槽数
	currentSlot int // 当前插槽数
	tasks       []*list.List // 环形列表,每个元素为对应插槽的任务列表
	set         containerx.Set[string] // 记录所有任务key值,用来检查任务是否被删除
	tricker *time.Ticker // 定时触发器
	logger logr.Logger
}
func (tw *TimeWheel) Run() error {
	tw.tricker = time.NewTicker(tw.interval)
	for {
		// 通过定时器模拟时间轮转动
		now, ok := <-tw.tricker.C
		if !ok {
			break
		}
		// 转动一次,执行任务列表
		tw.RunTask(now, tw.currentSlot)
		tw.currentSlot = (tw.currentSlot + 1) % tw.slots
	}
	return nil
}
func (tw *TimeWheel) RunTask(now time.Time, slot int) {
	// 一次执行任务列表
	for item := taskList.Front(); item != nil; {
		task, ok := item.Value.(*TimeWheelTask)
		// 任务圈数大于0,不需要执行,将圈数减一
		if task.circle > 0 {
			task.circle--
			item = item.Next()
			continue
		}
		// 运行任务
		go task.Exec()
		// 计算任务下次运行时间
		next := item.Next()
		taskList.Remove(item)
		item = next
		task.next = task.Next(now)
		if !task.next.IsZero() {
			tw.add(now, task)
		} else {
			tw.Remove(task.Name())
		}
	}
}
// 添加任务,计算下一次任务执行的插槽与圈数
func (tw *TimeWheel) add(now time.Time, task *TimeWheelTask) {
	if !task.initialized {
		task.next = task.Next(now)
		task.initialized = true
	}
	duration := task.next.Sub(now)
	if duration <= 0 {
		task.slot = tw.currentSlot + 1
		task.circle = 0
	} else {
		mult := int(duration / tw.interval)
		task.slot = (tw.currentSlot + mult) % tw.slots
		task.circle = mult / tw.slots
	}
	tw.tasks[task.slot].PushBack(task)
	tw.set.Insert(task.Name())
}

时间轮的主要逻辑如下:

  • 将任务存在对应插槽的时间
  • 通过定时间模拟时间轮转动
  • 每次到期后遍历当前插槽的任务列表,若任务圈数为0则执行
  • 如果任务未结束,计算下次执行的插槽与圈数
  • 转入步骤2,依次执行

总结

本文主要总结了定时任务的两种实现方式,最小堆与时间轮,并分析其核心实现逻辑。

对于执行分布式定时任务,可以借助延时消息队列或者直接使用Kubernetes的CronJob。

自己开发的话可以借助Etcd:

  • 中心节点Coordinator将任务按照一定算法(Hash、轮询、或者更复杂的分配算法)将任务与工作节点Worker绑定
  • 每个Worker添加到有绑定到自己的任务则取出放到本地的Cron中
  • 如果Worker挂掉,执行将其上任务重新绑定即可

本文所有代码见github.com/qingwave/go…

以上就是Golang分布式应用定时任务示例详解的详细内容,更多关于Golang分布式定时的资料请关注脚本之家其它相关文章!

相关文章

  • 关于golang 字符串 int uint int64 uint64 互转问题

    关于golang 字符串 int uint int64 uint64&

    这篇文章主要介绍了golang 字符串 int uint int64 uint64 互转,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2023-01-01
  • VSCode Golang dlv调试数据截断问题及处理方法

    VSCode Golang dlv调试数据截断问题及处理方法

    这篇文章主要介绍了VSCode Golang dlv调试数据截断问题,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-06-06
  • 深入理解go slice结构

    深入理解go slice结构

    这篇文章主要介绍了go slice结构,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧
    2021-09-09
  • 如何解析golang中Context在HTTP服务中的角色

    如何解析golang中Context在HTTP服务中的角色

    这篇文章主要介绍了如何解析golang中Context在HTTP服务中的角色问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-03-03
  • Go常问的一些面试题汇总(附答案)

    Go常问的一些面试题汇总(附答案)

    通常我们去面试肯定会有些不错的Golang的面试题目的,所以总结下,让其他Golang开发者也可以查看到,同时也用来检测自己的能力和提醒自己的不足之处,这篇文章主要给大家介绍了关于Go常问的一些面试题以及答案的相关资料,需要的朋友可以参考下
    2023-10-10
  • 深入理解golang中io.Writer接口的使用

    深入理解golang中io.Writer接口的使用

    io 是一个 Golang 标准库包,它为围绕输入和输出的许多操作和用例定义了灵活的接口,这篇文章主要为大家介绍了Go中Writer接口的使用,需要的可以参考下
    2023-10-10
  • Go语言字符串基础示例详解

    Go语言字符串基础示例详解

    这篇文章主要为大家介绍了Go语言字符串基础的示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2021-11-11
  • 使用Viper处理Go应用程序的配置方法

    使用Viper处理Go应用程序的配置方法

    Viper是一个应用程序配置解决方案,用于Go应用程序,它支持JSON、TOML、YAML、HCL、envfile和Java properties配置文件格式,这篇文章主要介绍了使用Viper处理Go应用程序的配置,需要的朋友可以参考下
    2023-09-09
  • Go语言Gin处理响应方式详解

    Go语言Gin处理响应方式详解

    gin框架封装了常用的数据格式方法响应于客户端,下面这篇文章主要给大家介绍了关于Go语言Gin处理响应方式的相关资料,文中通过图文介绍的非常详细,需要的朋友可以参考下
    2023-01-01
  • Go语言中的函数式编程实践

    Go语言中的函数式编程实践

    这篇文章主要介绍了Go语言中的函数式编程实践,主要讲解Go语言中的函数式编程概念和使用。小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-05-05

最新评论