golang实现延迟队列(delay queue)的两种实现

 更新时间:2025年05月25日 14:28:45   作者:NPE~  
本文主要介绍了golang实现延迟队列(delay queue)的两种实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

1 延迟队列:邮件提醒、订单自动取消

延迟队列:处理需要在未来某个特定时间执行的任务。这些任务被添加到队列中,并且指定了一个执行时间,只有达到指定的时间点时才能从队列中取出并执行。
应用场景:

  • 邮件提醒
  • 订单自动取消(超过多少时间未支付,就取消订单)
  • 对超时任务的处理等

由于任务的执行是在未来的某个时间点,因此这些任务不会立即执行,而是存储在队列中,直到它的预定执行时间才会被执行。

2 实现

2.1 simple简单版:go自带的time包实现

思路:

定义Task结构体,包含

  • ExecuteTime time.Time
  • Job func()

定义DelayQueue

  • TaskQueue []Task
  • func AddTask
  • func RemoveTask
  • ExecuteTask

这种方案存在的问题:

Go程序重启时,存储在slice中的延迟处理任务将全部丢失

完整代码:

package main

import (
	"fmt"
	"time"
)

/*
基于go实现延迟队列
*/
type Task struct {
	ExecuteTime time.Time
	Job         func()
}

type DelayQueue struct {
	Tasks []*Task
}

func (d *DelayQueue) AddTask(t *Task) {
	d.Tasks = append(d.Tasks, t)
}

func (d *DelayQueue) RemoveTask() {
	//FIFO: remove the first task to enqueue
	d.Tasks = d.Tasks[1:]
}

func (d *DelayQueue) ExecuteTask() {
	for len(d.Tasks) > 0 {
		//dequeue a task
		currentTask := d.Tasks[0]
		if time.Now().Before(currentTask.ExecuteTime) {
			//if the task execution time is not up, wait
			time.Sleep(currentTask.ExecuteTime.Sub(time.Now()))
		}
		//execute the task
		currentTask.Job()
		//remove task who has been executed
		d.RemoveTask()
	}

}

func main() {
	fmt.Println("start delayQueue")
	delayQueue := &DelayQueue{}
	firstTask := &Task{
		ExecuteTime: time.Now().Add(time.Second * 1),
		Job: func() {
			fmt.Println("executed task 1 after delay")
		},
	}
	delayQueue.AddTask(firstTask)
	secondTask := &Task{
		ExecuteTime: time.Now().Add(time.Second * 7),
		Job: func() {
			fmt.Println("executed task 2 after delay")
		},
	}
	delayQueue.AddTask(secondTask)
	delayQueue.ExecuteTask()
	fmt.Println("all tasks have been done!!!")
}

效果:

在这里插入图片描述

2.2 complex持久版:go+redis

为了防止Go重启后存储到delayQueue的数据丢失,我们可以将任务持久化到redis中。

思路:

初始化redis连接

延迟队列采用redis的zset(有序集合)实现

前置准备:

# 安装docker
yum install -y yum-utils
yum-config-manager \
    --add-repo \
    https://download.docker.com/linux/centos/docker-ce.repo
yum install docker
systemctl start docker

# docker搭建redis
mkdir -p /Users/ziyi2/docker-home/redis
docker run -d --name redis -v /Users/ziyi2/docker-home/redis:/data -p 6379:6379 redis

完整代码:

package main

import (
	"fmt"
	"github.com/go-redis/redis"
	log "github.com/ziyifast/log"
	"time"
)

/*
基于redis zset实现延迟队列
*/
var redisdb *redis.Client
var DelayQueueKey = "delay-queue"

func initClient() (err error) {
	redisdb = redis.NewClient(&redis.Options{
		Addr:     "localhost:6379",
		Password: "", // not set password
		DB:       0,  //use default db
	})
	_, err = redisdb.Ping().Result()
	if err != nil {
		log.Errorf("%v", err)
		return err
	}
	return nil
}

func main() {
	err := initClient()
	if err != nil {
		log.Errorf("init redis client err: %v", err)
		return
	}
	addTaskToQueue("task1", time.Now().Add(time.Second*3).Unix())
	addTaskToQueue("task2", time.Now().Add(time.Second*8).Unix())
	//执行队列中的任务
	getAndExecuteTask()
}

// executeTime为unix时间戳,作为zset中的score。允许redis按照task应该执行时间来进行排序
func addTaskToQueue(task string, executeTime int64) {
	err := redisdb.ZAdd(DelayQueueKey, redis.Z{
		Score:  float64(executeTime),
		Member: task,
	}).Err()
	if err != nil {
		panic(err)
	}
}

// 从redis中取一个task并执行
func getAndExecuteTask() {
	for {
		tasks, err := redisdb.ZRangeByScore(DelayQueueKey, redis.ZRangeBy{
			Min:    "-inf",
			Max:    fmt.Sprintf("%d", time.Now().Unix()),
			Offset: 0,
			Count:  1,
		}).Result()
		if err != nil {
			time.Sleep(time.Second * 1)
			continue
		}
		//处理任务
		for _, task := range tasks {
			fmt.Println("Execute task: ", task)
			//执行完任务之后用 ZREM 移除该任务
			redisdb.ZRem(DelayQueueKey, task)
		}
		time.Sleep(time.Second * 1)
	}
}

效果:

redis一直从延迟队列中取数据,如果处理完一批则睡眠1s

  • 具体根据大家的业务调整,此处主要介绍思路

在这里插入图片描述

到此这篇关于golang实现延迟队列(delay queue)的示例代码的文章就介绍到这了,更多相关golang 延迟队列(delay queue)内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家! 

相关文章

  • Golang并发控制之errgroup使用详解

    Golang并发控制之errgroup使用详解

    errgroup 是 Go 官方库 x 中提供的一个非常实用的工具,用于并发执行多个 goroutine,并且方便的处理错误,下面就跟随小编一起来了解下的它的具体使用吧
    2024-11-11
  • Golang实现密码加密的示例详解

    Golang实现密码加密的示例详解

    数据库在存储密码时,不能明文存储,需要加密后存储,而Golang中的加密算法有很多种,下面小编就来通过简单的示例和大家简单聊聊吧
    2023-07-07
  • golang使用sort接口实现排序示例

    golang使用sort接口实现排序示例

    这篇文章主要介绍了golang使用sort接口实现排序的方法,简单分析了sort接口的功能并实例演示了基于sort接口的排序实现方法,需要的朋友可以参考下
    2016-07-07
  • Go语言中的速率限流策略全面详解

    Go语言中的速率限流策略全面详解

    这篇文章主要为大家介绍了Go语言中的速率限流策略全面详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-11-11
  • golang下的viper包的简单使用方式

    golang下的viper包的简单使用方式

    这篇文章主要介绍了golang下的viper包的简单使用方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-06-06
  • go mutex互斥锁使用Lock和Unlock方法占有释放资源

    go mutex互斥锁使用Lock和Unlock方法占有释放资源

    Go号称是为了高并发而生的,在高并发场景下,势必会涉及到对公共资源的竞争,当对应场景发生时,我们经常会使用 mutex 的 Lock() 和 Unlock() 方法来占有或释放资源,虽然调用简单,但 mutex 的内部却涉及挺多的,本文来好好研究一下
    2023-09-09
  • 浅析Go语言中的Range关键字

    浅析Go语言中的Range关键字

    Range是go语言中很独特的一个关键词,也相当好用。下面就跟着小编来再聊聊这个Range关键字,有需要的朋友们可以参考借鉴。
    2016-09-09
  • go MethodByName()不能获取私有方法的解决

    go MethodByName()不能获取私有方法的解决

    本文主要介绍了go MethodByName()不能获取私有方法的解决,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-02-02
  • Go语言中内存管理逃逸分析详解

    Go语言中内存管理逃逸分析详解

    所谓的逃逸分析(Escape analysis)是指由编译器决定内存分配的位置吗不需要程序员指定。本文就来和大家简单分析一下Go语言中内存管理逃逸吧
    2023-03-03
  • Sublime Text3安装Go语言相关插件gosublime时搜不到gosublime的解决方法

    Sublime Text3安装Go语言相关插件gosublime时搜不到gosublime的解决方法

    本文主要介绍了Sublime Text3安装Go语言相关插件gosublime时搜不到gosublime的解决方法,具有一定的参考价值,感兴趣的可以了解一下
    2022-01-01

最新评论