Python中Celery分布式任务队列的介绍与使用指南

 更新时间:2025年11月12日 08:58:08   作者:檀越@新空间  
Celery 是一个开源的、分布式的任务队列系统,使用 Python 编写,它的核心目的是处理异步任务和定时任务,下面小编就为大家详细介绍一下Celery的具体使用吧

第一部分:Celery 是什么

Celery 是一个开源的、分布式的任务队列系统,使用 Python 编写。它的核心目的是处理异步任务定时任务

核心概念类比:餐厅

想象一个繁忙的餐厅:

  • 顾客/服务员 (Web 服务器):接收点单(HTTP 请求)。
  • 订单 (Task):需要完成的具体工作,比如“做一份披萨”。
  • 厨房 (Celery Workers):专门负责处理订单的后台团队。
  • 订单队列 (Message Broker):服务员和厨房之间的传菜口,上面挂着待处理的订单。

服务员将订单放到传菜口后,就可以立刻回去服务下一个顾客,而不必在厨房等待披萨做完。厨房的工作人员会从传菜口按顺序取出订单并制作披萨。这就是 Celery 的核心理念:将耗时的操作放到后台执行,让主程序(如 Web 服务器)能够快速响应客户端

第二部分:为什么需要使用 Celery?

在 Web 开发中,有些操作执行起来很慢,如果让用户在网页上一直等待直到操作完成,体验会非常差,甚至导致请求超时。

典型使用场景:

1.耗时任务

  • 发送电子邮件(用户注册后发送验证邮件)。
  • 图像/视频处理(生成缩略图、添加水印、转码)。
  • 数据分析和报告生成。

2.定时任务

  • 每天凌晨备份数据库。
  • 每周一早上给所有用户发送周报。
  • 每隔 5 分钟检查一次系统状态。

3.大规模处理

同时处理成千上万个 API 调用或数据清洗任务。

第三部分:Celery 的核心组件

一个完整的 Celery 系统通常包含四个部分:

Celery Client (客户端)

  • 这是你的 Python 应用程序(如 Django, Flask 项目)。
  • 它负责定义任务发送(调用)任务到消息代理。

Message Broker (消息代理)

  • 这是 Celery 的中枢神经系统,负责传递消息。
  • Client 将任务发送到 Broker,Worker 从 Broker 接收任务。
  • 常用选择
    • Redis:非常流行,速度快,同时可作为结果后端。
    • RabbitMQ:功能最全、最可靠的选择,是 Celery 的官方推荐。
    • Amazon SQS / 其他…

Celery Worker (工作者)

  • 这是一个(或多个)独立的进程,持续监视消息代理。
  • 它从 Broker 中获取任务并执行它们。
  • 你可以根据任务量启动多个 Worker 来并行处理,实现分布式。

Result Backend (结果后端) (可选):

  • 用于存储任务执行后的结果和状态(如“成功”、“失败”、“进度”)。
  • 如果你的应用需要知道任务是否完成并获取返回值,就需要配置它。
  • 常用选择:Redis, Django 数据库, Memcached 等。

数据流总结:Client -(发送任务)-> Broker -(分配任务)-> Worker -(存储结果)-> Result Backend

第四部分:快速入门与实践

我们将使用 Redis 作为消息代理和结果后端,因为它安装简单且功能强大。

步骤 1:安装必要的库

pip install celery redis

确保你已安装并启动了 Redis 服务。

步骤 2:创建 Celery 应用和任务 (tasks.py)

# tasks.py
from celery import Celery

# 创建 Celery 实例,'my_app' 是当前项目的名称
app = Celery('my_app',
             broker='redis://localhost:6379/0',      # 使用 Redis 作为 Broker
             backend='redis://localhost:6379/0')     # 使用 Redis 作为 Backend

# 定义一个任务
@app.task
def add(x, y):
    print(f"正在计算 {x} + {y}...")
    # 模拟一个耗时操作
    import time
    time.sleep(5)
    result = x + y
    print(f"计算完成!结果是 {result}")
    return result

@app.task
def send_email(to, subject, body):
    # 模拟发送邮件的逻辑
    print(f"正在向 {to} 发送邮件,主题:{subject}")
    # ... 实际的发信代码 ...
    return f"邮件已发送至 {to}"

步骤 3:启动 Worker

在终端中,进入 tasks.py 所在的目录,运行以下命令启动 Worker:

celery -A tasks worker --loglevel=info
  • -A tasks:指定 Celery 应用的位置(tasks.py 文件中的 app)。
  • --loglevel=info:指定日志级别,方便查看运行信息。

如果成功,你会看到类似下面的输出,表示 Worker 已启动并在等待任务:

 -------------- celery@YourComputer v5.3.0 (emerald-rush)
--- ***** -----
-- ******* ----- [Configuration]
- *** --- * --- . broker:      redis://localhost:6379/0
- ** ---------- . result:      redis://localhost:6379/0
- ** ---------- . app:         my_app:0x...
- ** ---------- . concurrency: 8 (prefork)
- ** ---------- . tasks:       ['tasks.add', 'tasks.send_email']
--- ***** ----- [Queues]
 -------------- . celery           exchange=celery(direct) key=celery

[tasks]
  . tasks.add
  . tasks.send_email

[2024-...] [INFO] Task tasks.add[1234-...] received

步骤 4:调用任务

现在,打开一个 Python Shell(在另一个终端中),来调用我们定义的任务。

# 在 Python Shell 中
from tasks import add, send_email

# 1. 异步调用 - 最常用的方式
# 这行代码会立即返回一个 AsyncResult 对象,而不会阻塞程序
result = add.delay(4, 6) # .delay() 是快捷的异步调用方法
print(f"任务ID:{result.id}") # 立即打印任务ID,此时任务正在后台运行

# 2. 获取任务结果(可选)
# 因为我们的任务有 5 秒休眠,所以需要等待一下再获取结果
print("正在等待结果...")
value = result.get(timeout=10) # .get() 会阻塞,直到任务完成并返回结果
print(f"任务结果是:{value}")

# 3. 检查任务状态(可选)
print(f"任务状态:{result.status}") # 可能是 PENDING, SUCCESS, FAILURE 等

# 4. 调用发送邮件的任务
email_result = send_email.delay('user@example.com', '欢迎光临', '感谢您注册!')
print(f"邮件任务ID:{email_result.id}")

步骤 5:监控任务(可选)

Celery 提供了一个强大的命令行工具来监控 Worker 的状态和任务队列。你还可以使用更强大的工具如 Flower

# 安装 Flower
pip install flower

# 启动 Flower (在项目目录下)
celery -A tasks flower

# 然后在浏览器中访问 http://localhost:5555

第五部分:在 Web 框架(以 Flask 为例)中使用

将 Celery 集成到 Web 框架中是其最常见的用法。

# app.py
from flask import Flask, jsonify
from celery import Celery

# 配置 Flask
app = Flask(__name__)

# 配置 Celery(通常从配置文件读取)
def make_celery(app):
    celery = Celery(
        app.import_name,
        broker='redis://localhost:6379/0',
        backend='redis://localhost:6379/0'
    )
    # 可选:更新Celery配置,使其与Flask配置保持一致
    celery.conf.update(app.config)
    return celery

celery = make_celery(app)

@celery.task
def background_task():
    # 一个模拟的耗时任务
    import time
    time.sleep(10)
    return "后台任务完成!"

@app.route('/start_task', methods=['POST'])
def start_task():
    # 在视图函数中触发后台任务
    task = background_task.delay()
    return jsonify({'task_id': task.id}), 202 # 202 Accepted 表示请求已被接受处理

@app.route('/check_status/<task_id>')
def check_status(task_id):
    task = background_task.AsyncResult(task_id)
    if task.state == 'PENDING':
        response = {'state': task.state, 'status': '任务正在排队或执行中...'}
    elif task.state == 'SUCCESS':
        response = {'state': task.state, 'result': task.result}
    else: # FAILURE 等其他状态
        response = {'state': task.state, 'status': str(task.info)}
    return jsonify(response)

if __name__ == '__main__':
    app.run(debug=True)

总结

特性描述
本质分布式任务队列
核心价值异步处理、解耦、提高响应速度
关键组件Client, Broker, Worker, Result Backend
工作流程应用发布任务 -> Broker 传递 -> Worker 执行
使用场景邮件发送、文件处理、定时任务、复杂计算

Celery 是 Python 生态中处理后台任务的标杆工具,熟练掌握它能极大地提升你构建复杂、高性能应用的能力。

到此这篇关于Python中Celery分布式任务队列的介绍与使用指南的文章就介绍到这了,更多相关Python Celery使用内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • python使用7z解压apk包的方法

    python使用7z解压apk包的方法

    这篇文章主要介绍了python使用7z解压apk包的方法,涉及Python的shell命令调用技巧,非常具有实用价值,需要的朋友可以参考下
    2015-04-04
  • 合并Excel工作薄中成绩表的VBA代码,非常适合教育一线的朋友

    合并Excel工作薄中成绩表的VBA代码,非常适合教育一线的朋友

    每次学生考试,评分完毕之后,把每个科的成绩收集起来,就得到了一个有若干工作表,每个表有学生学号、分数等列的Excel工作薄。
    2009-04-04
  • Java实现的执行python脚本工具类示例【使用jython.jar】

    Java实现的执行python脚本工具类示例【使用jython.jar】

    这篇文章主要介绍了Java实现的执行python脚本工具类,结合实例形式分析了java使用jython.jar执行Python脚本的具体操作技巧,需要的朋友可以参考下
    2018-03-03
  • python使用多线程+socket实现端口扫描

    python使用多线程+socket实现端口扫描

    这篇文章主要为大家详细介绍了python使用多线程+socket实现端口扫描,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2020-05-05
  • 基于Python实现拆分和合并GIF动态图

    基于Python实现拆分和合并GIF动态图

    这篇文章主要介绍了Python拆分和合并GIF动态图,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-10-10
  • Python NumPy教程之数组的基本操作详解

    Python NumPy教程之数组的基本操作详解

    Numpy 中的数组是一个元素表(通常是数字),所有元素类型相同,由正整数元组索引。本文将通过一些示例详细讲一下NumPy中数组的一些基本操作,需要的可以参考一下
    2022-08-08
  • python中数字列表转化为数字字符串的实例代码

    python中数字列表转化为数字字符串的实例代码

    先前学习过,数字和字符串都可以存储到变量当中,下面这篇文章主要给大家介绍了关于python中数字列表转化为数字字符串的相关资料,文中通过实例代码介绍的非常详细,需要的朋友可以参考下
    2023-02-02
  • 详解python中读取和查看图片的6种方法

    详解python中读取和查看图片的6种方法

    本文主要介绍了详解python中读取和查看图片的6种方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-04-04
  • python dlib人脸识别代码实例

    python dlib人脸识别代码实例

    这篇文章主要介绍了python dlib人脸识别,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-04-04
  • Python对象的属性访问过程详解

    Python对象的属性访问过程详解

    这篇文章主要介绍了Python对象的属性访问过程详解,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-03-03

最新评论