Golang微服务框架Kratos实现分布式任务队列Asynq的方法详解

 更新时间:2023年09月02日 08:36:57   作者:喵个咪  
任务队列(Task Queue) 一般用于跨线程或跨计算机分配工作的一种机制,在Golang语言里面,我们有像Asynq和Machinery这样的类似于Celery的分布式任务队列,本文就给大家详细介绍一下Golang微服务框架Kratos实现分布式任务队列Asynq的方法,需要的朋友可以参考下

Golang微服务框架Kratos实现分布式任务队列Asynq

任务队列(Task Queue) 一般用于跨线程或跨计算机分配工作的一种机制。其本质是生产者消费者模型,生产者发送任务到消息队列,消费者负责处理任务。

任务队列的输入是称为 任务(Task) 的工作单元。专用的工作进程不断监视任务队列以查找要执行的新工作。

在Golang语言里面,我们有像AsynqMachinery这样的类似于 Celery 的分布式任务队列。

什么是任务队列

消息队列(Message Queue),一般来说知道的人不少。比如常见的:kafka、Rabbitmq、RocketMQ等。

任务队列(Task Queue),听说过这个概念的人不会太多,清楚它的概念的人怕是更少。

这两个概念是有关系的,他们是怎样的关系呢?任务队列(Task Queue)是消息队列(Message Queue)的超集。任务队列是构建在消息队列之上的。消息队列是任务队列的一部分。

提起分布式任务队列(Distributed Task Queue),就不得不提 Python Celery。故而,下面我们来看Celery的架构图,以此来讲解。其他的任务队列也并不会与之有太大的差异性,基础的原理是一致的。

celery_framework.png

Celery 的架构中,由多台 Server 发起 异步任务(Async Task) ,发送任务到 Broker 的队列中,其中的 Celery Beat 进程可负责发起定时任务。当 Task 到达 Broker 后,会将其分发给相应的 Celery Worker 进行处理。当 Task 处理完成后,其结果存储至 Backend

在上述过程中的 Broker Backend Celery 并没有去实现,而是使用了已有的开源实现,例如 RabbitMQ 作为 Broker 提供消息队列服务, Redis 作为 Backend 提供结果存储服务。Celery 就像是抽象了消息队列架构中 Producer Consumer 的实现,将消息队列中基本单位 “消息” 抽象成了任务队列中的“任务”,并将异步、定时任务的发起和结果存储等操作进行了封装,让开发者可以忽略 AMQP、RabbitMQ 等实现细节,为开发带来便利。

综上所述,Celery 作为任务队列是基于消息队列的进一步封装,其实现依赖消息队列。

任务队列的应用场景

我们现在知道了任务队列是什么,也知道了它的工作原理。但是,我们并不知道它可以用来做什么。下面,我们就来看看,它到底用在什么样的场景下。

  1. 分布式任务:可以将任务分发到多个工作者进程或机器上执行,以提高任务处理速度。
  2. 定时任务:可以在指定时间执行任务。例如:每天定时备份数据、日志归档、心跳测试、运维巡检。支持 crontab 定时模式
  3. 后台任务:可以在后台执行耗时任务,例如图像处理、数据分析等,不影响用户界面的响应。
  4. 解耦任务:可以将任务与主程序解耦,以提高代码的可读性和可维护性,解耦应用程序最直接的好处就是可扩展性和并发性能的提高。支持并发执行任务,同时支持自动动态扩展。
  5. 实时处理:可以支持实时处理任务,例如即时通讯、消息队列等。

Asynq概述

Asynq是一个使用Go语言实现的分布式任务队列和异步处理库,它由Redis提供支持,它提供了轻量级的、易于使用的API,并且具有高可扩展性和高可定制化性。其作者Ken Hibino,任职于Google。

Asynq主要由以下几个组件组成:

  • 任务(Task):需要被异步执行的操作;
  • 处理器(Processor):负责执行任务的工作进程;
  • 队列(Queue):存放待执行任务的队列;
  • 调度器(Scheduler):根据规则将任务分配给不同的处理器进行执行。

asynq_framework.png

通过使用Asynq,我们可以非常轻松的实现异步任务处理,同时还可以提供高效率、高可扩展性和高自定义性的处理方案。

Asynq的特点

  • 保证至少执行一次任务
  • 任务写入Redis后可以持久化
  • 任务失败之后,会自动重试
  • worker崩溃自动恢复
  • 可是实现任务的优先级
  • 任务可以进行编排
  • 任务可以设定执行时间或者最长可执行的时间
  • 支持中间件
  • 可以使用 unique-option 来避免任务重复执行,实现唯一性
  • 支持 Redis Cluster 和 Redis Sentinels 以达成高可用性
  • 作者提供了Web UI & CLI Tool让大家查看任务的执行情况

Asynq可视化监控

Asynq提供了两种监控手段:CLI和Web UI。

命令行工具CLI

go install github.com/hibiken/asynq/tools/asynq@latest

Web UI

Asynqmon是一个基于Web的工具,用于监视管理Asynq的任务和队列,有关详细的信息可以参阅工具的README。

Web UI我们可以通过Docker的方式来进行安装:

docker pull hibiken/asynqmon:latest
docker run -d \
    --name asynq \
    -p 8080:8080 \
    hibiken/asynqmon:latest --redis-addr=host.docker.internal:6379

安装好Web UI之后,我们就可以打开浏览器访问管理后台了:http://localhost:8080

  • 仪表盘

asynq_web_ui_dashboard.png

  • 任务视图

asynq_web_ui_task_view.png

  • 性能

asynq_web_ui_metrics.png

Kratos下实现分布式任务队列

我们将分布式任务队列以 transport.Server 的形式整合进微服务框架 Kratos

目前,go里面有两个分布式任务队列可用:

我已经对这两个库进行了支持:

创建Kratos服务端

因为它依赖Redis,因此,我们可以使用Docker的方式安装Redis的服务器:

docker pull bitnami/redis:latest
docker run -itd \
    --name redis-test \
    -p 6379:6379 \
    -e ALLOW_EMPTY_PASSWORD=yes \
    bitnami/redis:latest

然后,我们需要在项目中安装Asynq的依赖库:

go get -u github.com/tx7do/kratos-transport/transport/asynq

接着,我们在代码当中引入库,并且创建出来 Server

import github.com/tx7do/kratos-transport/transport/asynq
const (
	localRedisAddr = "127.0.0.1:6379"
)
ctx := context.Background()
srv := asynq.NewServer(
    asynq.WithAddress(localRedisAddr),
)
if err := srv.Start(ctx); err != nil {
    panic(err)
}
defer srv.Stop(ctx)

注册任务回调

const (
	testTask1        = "test_task_1"
	testDelayTask    = "test_delay_task"
	testPeriodicTask = "test_periodic_task"
)
type DelayTask struct {
	Message string `json:"message"`
}
func DelayTaskBinder() Any { return &DelayTask{} }
func handleTask1(taskType string, taskData *DelayTask) error {
	LogInfof("Task Type: [%s], Payload: [%s]", taskType, taskData.Message)
	return nil
}
func handleDelayTask(taskType string, taskData *DelayTask) error {
	LogInfof("Delay Task Type: [%s], Payload: [%s]", taskType, taskData.Message)
	return nil
}
func handlePeriodicTask(taskType string, taskData *DelayTask) error {
	LogInfof("Periodic Task Type: [%s], Payload: [%s]", taskType, taskData.Message)
	return nil
}
var err error
err = srv.RegisterSubscriber(testTask1,
    func(taskType string, payload MessagePayload) error {
        switch t := payload.(type) {
        case *DelayTask:
            return handleTask1(taskType, t)
        default:
            LogError("invalid payload struct type:", t)
            return errors.New("invalid payload struct type")
        }
    },
    DelayTaskBinder,
)
err = srv.RegisterSubscriber(testDelayTask,
    func(taskType string, payload MessagePayload) error {
        switch t := payload.(type) {
        case *DelayTask:
            return handleDelayTask(taskType, t)
        default:
            LogError("invalid payload struct type:", t)
            return errors.New("invalid payload struct type")
        }
    },
    DelayTaskBinder,
)
err = srv.RegisterSubscriber(testPeriodicTask,
    func(taskType string, payload MessagePayload) error {
        switch t := payload.(type) {
        case *DelayTask:
            return handlePeriodicTask(taskType, t)
        default:
            LogError("invalid payload struct type:", t)
            return errors.New("invalid payload struct type")
        }
    },
    DelayTaskBinder,
)

此步骤,相当于是异步队列中订阅了某一类型任务。最终它由 asynq.Server 来执行。

创建新任务

新建任务,有两个方法: NewTask NewPeriodicTask ,内部分别对应着 asynq.Client asynq.Scheduler

NewTask 是通过 asynq.Client 将任务直接入了队列。

普通任务

普通任务通常是入列后立即执行的(如果不需要排队的),下面就是最简单的任务,一个类型(Type),一个负载数据(Payload)就构成了一个最简单的任务:

err = srv.NewTask(testTask1, 
    &DelayTask{Message: "delay task"},
)

当然,你也可以添加一些的参数,比如重试次数、超时时间、过期时间等……

// 最多重试3次,10秒超时,20秒后过期
err = srv.NewTask(testTask1, 
    &DelayTask{Message: "delay task"},
    asynq.MaxRetry(10),
    asynq.Timeout(10*time.Second),
    asynq.Deadline(time.Now().Add(20*time.Second)),
)

延迟任务(Delay Task)

延迟任务,顾名思义,也就是推迟到指定时间执行的任务,我们可以有两个参数可以注入: ProcessAt ProcessIn

ProcessIn 指的是从现在开始推迟多少时间执行:

// 3秒后执行
err = srv.NewTask(testDelayTask,
    &DelayTask{Message: "delay task"},
    asynq.ProcessIn(3*time.Second),
)

ProcessAt 指的是在指定的某一个具体时间执行:

// 1小时后的时间点执行
oneHourLater := now.Add(time.Hour)
err = srv.NewTask(testDelayTask,
    &DelayTask{Message: "delay task"},
    asynq.ProcessAt(oneHourLater),
)

周期性任务(Periodic Task)

周期性任务 asynq.Scheduler 内部是通过Crontab来实现定时的,定时器到点之后,就调度任务。它默认使用的是UTC时区。

// 每分钟执行一次
_, err = srv.NewPeriodicTask(
    "*/1 * * * ?",
    testPeriodicTask,
    &DelayTask{Message: "periodic task"},
)

需要注意的是,若要保证周期性任务的持续调度执行, asynq.Scheduler 必须要一直运行着,否则调度将不会发生。调度器本身不参与任务的执行,但是没有它的存在,调度将不不复存在,也不会发生。

示例代码

示例代码可以在单元测试代码中找到:kratos-transport/transport/asynq/server_test.go at main · tx7do/kratos-transport · GitHub

以上就是Golang微服务框架Kratos实现分布式任务队列Asynq的方法详解的详细内容,更多关于Golang Kratos实现Asynq的资料请关注脚本之家其它相关文章!

相关文章

  • 解决go build不去vendor下查找包的问题

    解决go build不去vendor下查找包的问题

    这篇文章主要介绍了解决go build不去vendor下查找包的问题,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-12-12
  • Go语言中的错误处理最佳实践详解

    Go语言中的错误处理最佳实践详解

    这篇文章主要为大家详细介绍了Go语言中的错误处理的相关知识,文中的示例代码讲解详细,对我们深入了解Go语言有一定的帮助,需要的可以参考下
    2023-08-08
  • 使用Go语言实现常见hash算法

    使用Go语言实现常见hash算法

    这篇文章主要为大家详细介绍了使语言实现各种常见hash算法的相关知识,文中的示例代码讲解详细,具有一定的借鉴价值,需要的小伙伴可以参考下
    2024-01-01
  • 详解golang开发中select多路选择

    详解golang开发中select多路选择

    这篇文章主要介绍了golang开发中select多路选择,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-09-09
  • go语言中的return语句

    go语言中的return语句

    这篇文章主要介绍了go语言中的return语句,文章围绕主题展开详细的内容介绍,具有一定的参考价值,需要的小伙伴可以参考一下,希望对你的学习有所帮助
    2022-05-05
  • Go 语言数组和切片的区别详解

    Go 语言数组和切片的区别详解

    本文主要介绍了Go 语言数组和切片的区别详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-04-04
  • go通过benchmark对代码进行性能测试详解

    go通过benchmark对代码进行性能测试详解

    在开发中我们要想编写高性能的代码,或者优化代码的性能时,你首先得知道当前代码的性能,在go中可以使用testing包的benchmark来做基准测试 ,文中有详细的代码示例,感兴趣的小伙伴可以参考一下
    2023-04-04
  • go语言定义零值可用的类型学习教程

    go语言定义零值可用的类型学习教程

    这篇文章主要为大家介绍了go语言定义零值可用的类型教程学习,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-06-06
  • 深入了解Golang中的数据类型

    深入了解Golang中的数据类型

    在计算机编程中,数据类型是非常重要的一个概念。这篇文章将详细介绍 Golang中的数据类型,包括基本类型、复合类型、引用类型以及自定义类型,希望对大家有所帮助
    2023-04-04
  • 一文详解Golang中的切片数据类型

    一文详解Golang中的切片数据类型

    这篇文章主要介绍了一文详解Golang中的切片数据类型,切片是一个种特殊的数组。是对数组的一个连续片段的引用,所以切片是一个引用类型
    2022-09-09

最新评论