Django集成Celery之状态监控与任务管理详解

 更新时间:2025年03月18日 09:13:37   作者:三余知行  
这篇文章主要介绍了Django集成Celery之状态监控与任务管理详解,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教

如何通过 Django 来管理 Celery 任务?通过 Django Admin 界面提供任务的查询、查看、重试、终止等功能?下面是一个完整的步骤指南。

使用 Django 管理 Celery Worker

安装 Django 和相关包

首先,创建一个新的虚拟环境并安装所需的包。

python -m venv myenv
source myenv/bin/activate  # Windows 系统使用: myenv\Scripts\activate
pip install django django-celery-results django-celery-beat celery

创建 Django 项目和应用

django-admin startproject myproject
cd myproject
django-admin startapp myapp

配置 Django 和 Celery

myproject/settings.py 文件中添加以下内容:

INSTALLED_APPS = [
    ...,
    'django_celery_results',
    'django_celery_beat',
    'myapp',  # 确保 app 在这个列表里
]

CELERY_BROKER_URL = 'redis://localhost:6379/0'  # 使用 Redis 作为示例,可以根据需求更改
CELERY_RESULT_BACKEND = 'django-db'
CELERY_CACHE_BACKEND = 'django-cache'

CELERY_TRACK_STARTED = True
CELERY_SEND_EVENTS = True

# 确保已经配置了数据库
DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.sqlite3',
        'NAME': BASE_DIR / 'db.sqlite3',
    }
}

# 配置 Django 缓存(可选)
CACHES = {
    'default': {
        'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
    }
}

myproject 目录中创建一个 celery.py 文件:

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings

# 设置 Django 的配置模块
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

app = Celery('myproject')

# 从 Django 的设置中配置 Celery
app.config_from_object('django.conf:settings', namespace='CELERY')

# 自动发现任务
# app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

修改 myproject/__init__.py 文件,使得 Django 在启动时加载 Celery:

from __future__ import absolute_import, unicode_literals

# 这将确保当 Django 启动时加载 app.py
from .celery import app as celery_app

__all__ = ('celery_app',)

创建一个 Celery 任务

myapp/tasks.py 中创建一个简单的 Celery 任务:

from celery import shared_task

@shared_task
def add(x, y):
    return x + y

注册自定义的 TaskResultAdmin

我们需要在自定义 TaskResultAdmin 之前先取消已经注册的模型。

myapp/admin.py 中做如下修改:

from django.contrib import admin
from django_celery_results.models import TaskResult
from django_celery_results.admin import TaskResultAdmin as DefaultTaskResultAdmin
from django.urls import path
from django.shortcuts import redirect
from celery.result import AsyncResult
from myproject.celery import app

# 取消已经注册的 TaskResult
admin.site.unregister(TaskResult)

# 创建一个自定义的 TaskResultAdmin 继承自默认的 TaskResultAdmin
class CustomTaskResultAdmin(DefaultTaskResultAdmin):
    change_list_template = "admin/celery_task_changelist.html"

    def get_urls(self):
        urls = super().get_urls()
        custom_urls = [
            path('retry/<task_id>/', self.admin_site.admin_view(self.retry_task), name='retry-task'),
            path('terminate/<task_id>/', self.admin_site.admin_view(self.terminate_task), name='terminate-task'),
        ]
        return custom_urls + urls

    def retry_task(self, request, task_id, *args, **kwargs):
        AsyncResult(task_id, app=app).reapply()
        self.message_user(request, f'Task {task_id} retried successfully.')
        return redirect('..')

    def terminate_task(self, request, task_id, *args, **kwargs):
        AsyncResult(task_id, app=app).revoke(terminate=True)
        self.message_user(request, f'Task {task_id} terminated successfully.')
        return redirect('..')

# 注册自定义的 TaskResultAdmin
admin.site.register(TaskResult, CustomTaskResultAdmin)

TaskResult 模型已经被 django_celery_results 自动注册到 Django Admin 中了。

我们可以通过继承 django_celery_resultsTaskResultAdmin 并覆盖的方式来避免重复注册模型。

创建 Django Admin 界面的自定义模板

在 Django 项目中创建以下目录结构 templates/admin 并在 admin 文件夹内创建 celery_task_changelist.html

{% extends "admin/change_list.html" %} {% block result_list %} {{ block.super }}
<script>
  function handleTask(action, task_id) {
    fetch(`/${action}/${task_id}/`, {
      method: 'POST',
      headers: {
        'X-CSRFToken': document.querySelector('[name=csrfmiddlewaretoken]')
          .value,
      },
    }).then((response) => {
      if (response.ok) {
        location.reload();
      } else {
        alert('Action failed.');
      }
    });
  }
</script>
<div>
  <form method="post">
    {% csrf_token %} {% for result in cl.result_list %}
    <button type="button" onclick="handleTask('retry', '{{ result.task_id }}')">
      Retry
    </button>
    <button
      type="button"
      onclick="handleTask('terminate', '{{ result.task_id }}')"
    >
      Terminate
    </button>
    {% endfor %}
  </form>
</div>
{% endblock %}

确保自定义的模板路径正确。对于默认 Django 项目模板目录,模板文件夹应该在 myproject/templates/admin/celery_task_changelist.html

运行 Django 和 Celery

  • 应用数据库迁移:
python manage.py migrate
  • 启动 Django 服务器:
python manage.py runserver
  • 启动 Celery worker:
celery -A myproject worker -l info

使用 Django Admin 管理 Celery 任务

打开浏览器并访问 http://127.0.0.1:8000/admin/,Celery 任务将会在 Django admin 界面中显示,并且可以通过点击按钮来进行查询、查看、重试和终止等操作。

这样就完成了通过 Django Admin 界面管理 Celery 任务的完整步骤。如有需要可以进一步定制和优化界面和功能。

启动 Django 本地的 Celery Worker

为了在启动 Celery Worker 后向 Worker 发起任务,并在 Django Admin 界面演示查询、查看、重试和终止任务,可以按以下步骤进行操作:

创建 Celery 任务

myapp/tasks.py 中定义一些示例任务:

# myapp/tasks.py
from celery import shared_task
import time

@shared_task
def add(x, y):
    time.sleep(10)  # 模拟长时间运行的任务
    return x + y

@shared_task
def long_running_task(duration):
    time.sleep(duration)
    return f"Task completed after {duration} seconds"# myapp/tasks.py
from celery import shared_task
import time

@shared_task
def add(x, y):
    time.sleep(10)  # 模拟长时间运行的任务
    return x + y

@shared_task
def long_running_task(duration):
    time.sleep(duration)
    return f"Task completed after {duration} seconds"

创建触发任务的视图

为了便于演示,可以创建一些视图来触发这些任务。更新 urls.pyviews.py 文件。

  • myapp/views.py 中:
# myapp/views.py
from django.http import JsonResponse
from myapp.tasks import add, long_running_task

def trigger_add_task(request):
    add.delay(3, 4)
    return JsonResponse({'status': 'Task add (3, 4) triggered'})

def trigger_long_running_task(request):
    long_running_task.delay(30)  # 任务运行30秒
    return JsonResponse({'status': 'Long running task for 30 seconds triggered'})
  • myapp/urls.py 中:
# myapp/urls.py
from django.urls import path
from .views import trigger_add_task, trigger_long_running_task

urlpatterns = [
    path('trigger-add-task/', trigger_add_task, name='trigger-add-task'),
    path('trigger-long-task/', trigger_long_running_task, name='trigger-long-task'),
]
  • myproject/urls.py 中:
# myproject/urls.py
from django.contrib import admin
from django.urls import path, include

urlpatterns = [
    path('admin/', admin.site.urls),
    path('tasks/', include('myapp.urls')),
]

更新 Celery 配置

确保 settings.py 中配置了 Celery:

# myproject/settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0'  # 使用 Redis 作为示例,可以根据需求更改
CELERY_RESULT_BACKEND = 'django-db'

启动 Celery Worker 和 Django 服务器

确保已经启动了 Redis 服务:

redis-server

然后分别启动 Django 服务器和 Celery Worker:

# 启动 Django 服务器
python manage.py runserver

# 启动 Celery Worker
celery -A myproject worker -l info

触发任务并在 Django Admin 界面中查看

打开浏览器并访问以下 URL 以触发任务:

  • http://127.0.0.1:8000/tasks/trigger-add-task/ - 触发增加任务
  • http://127.0.0.1:8000/tasks/trigger-long-task/ - 触发长时间运行任务

通过这些 URL 触发 Celery 任务。然后可以通过 Django Admin 界面进行查询、查看、重试和终止这些任务。

在 Django Admin 界面查看任务状态

打开浏览器并访问 http://127.0.0.1:8000/admin/,登陆 Django Admin 界面,导航到 Task Results 部分。应该能看到适当的任务列表,并通过之前在自定义 TaskResultAdmin 中定义的操作进行重试和终止任务。

这些步骤能够通过 Django 和 Celery 演示触发任务并在 Django Admin 界面中进行查询、查看、重试、终止等操作。

启动远程的 Celery Worker

要通过 Django Admin 管理和监控在远程服务器上单独运行且由独立代码仓库维护的 Celery Worker,需要配置和协调多个独立的系统。

安装 Celery Worker

在远程服务器上,创建一个独立的项目(假设名字为 worker_project),并安装所需的依赖:

# 在远程服务器上
python -m venv venv
source venv/bin/activate
pip install celery redis

配置 Celery Worker

  • worker_project 内部配置 Celery(worker_project/celery.py):
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# 设置 Django 的配置模块
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'worker_project.settings')

app = Celery('worker_project')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')
  • worker_project/settings.py 中配置 Celery:
CELERY_BROKER_URL = 'redis://your_redis_server:6379/0'  # 替换为实际的 Redis 地址
CELERY_RESULT_BACKEND = 'redis://your_redis_server:6379/0'

定义任务

创建一些测试任务(worker_project/tasks.py):

from celery import shared_task
import time

@shared_task
def add(x, y):
    time.sleep(10)  # 模拟长时间运行的任务
    return x + y

@shared_task
def long_running_task(duration):
    time.sleep(duration)
    return f"Task completed after {duration} seconds"

启动 Celery Worker

celery -A worker_project worker -l info

启动和测试

启动本地 Django 服务器:

python manage.py runserver

确保远程服务器上的 Celery Worker 已经在运行。

触发任务并在 Django Admin 中查看:

  • 访问 http://127.0.0.1:8000/tasks/trigger-add-task/ - 触发增加任务
  • 访问 http://127.0.0.1:8000/tasks/trigger-long-task/ - 触发长时间运行任务

通过 http://127.0.0.1:8000/admin/ 登录 Django Admin 界面,导航到 Task Results 部分,您应该能看到这些任务并管理它们(如重试和终止)。

以上配置实现了在本地的 Django 项目中通过 Django Admin 管理和监控在远程服务器上单独运行的 Celery Worker,并通过 Redis 进行通信。这种架构可以在实际生产环境中更好地分离职责并提高系统的健壮性和扩展性。

使用 Django Admin 管理 Flask 启动的 Celery Worker 的常见问题

在使用 Flask App 启动远程 Celery Worker,并在 Django Admin 对这些 Worker 进行监控和管理时,可能会遇到诸如 Django Admin 界面没有显示 Celery Worker 任务和任务执行结果的问题,可能有以下几个原因:

  1. 结果后端配置错误:确保 Flask 和 Django 使用相同的结果后端(result backend)。
  2. Django 配置错误:确保 Django 已正确配置 Celery 结果后端。
  3. Flask 应用没有保存结果:确保 Flask 的 Celery 配置没有禁用结果保存功能。

要修复这个问题,请按以下步骤检查和修正设置:

检查 Flask 应用的 Celery 配置

确保 Flask 应用中的 Celery 配置了正确的结果后端,并且没有禁用任务结果的存储。例如:

celery_app = Celery(
    configs.celery.name,
    task_cls=FlaskTask,
    broker=app.config["CELERY_BROKER_URL"],
    backend=app.config["CELERY_BACKEND"],  # 确保配置了结果后端
    task_ignore_result=False,  # 确保不忽略任务结果
)

celery_app.conf.update(
    result_backend=app.config["CELERY_RESULT_BACKEND"],  # 确保配置了结果后端
    broker_connection_retry_on_startup=True,
)

# 确保没有不必要的配置禁用结果存储

检查 Django 的 Celery 配置

settings.py 中,确保定义了 Celery 结果后端,并且配置与 Flask 中的配置一致:

# Celery 配置
CELERY_BROKER_URL = 'redis://localhost:6379/0'  # 替换为实际的 Broker URL
CELERY_RESULT_BACKEND = 'django-db'  # 使用 Django 数据库作为结果后端
CELERY_CACHE_BACKEND = 'django-cache'
CELERY_RESULT_PERSISTENT = True

# 安装的应用程序
INSTALLED_APPS = [
    # 其他应用
    'django_celery_results',
    'django_celery_beat',
]

# 其他配置

同步数据库

确保 Django 数据库与 Celery 结果模型一致:

python manage.py migrate django_celery_results
python manage.py migrate django_celery_beat

确保 Django Admin 中注册了相关模型

确保在 admin.py 中注册了 django_celery_resultsdjango_celery_beat 的模型,以便在 Admin 界面中查看:

from django.contrib import admin
from django_celery_results.models import TaskResult
from django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabSchedule

admin.site.register(TaskResult)
admin.site.register(PeriodicTask)
admin.site.register(IntervalSchedule)
admin.site.register(CrontabSchedule)

测试 Celery 任务

确保从 Flask 发送的 Celery 任务能正确存储结果:

@celery_app.task(bind=True)
def debug_task(self, *args, **kwargs):
    print(f'Request: {self.request!r}')
    return 'Test Result'

在 Flask 应用中调用这个任务:

debug_task.delay()

然后检查 Django Admin 界面中的任务结果是否显示。

检查 Celery Worker 配置

确保 celery worker 是在一个共享的 Broker 和 Backend 上运行:

celery -A your_flask_app_name worker --loglevel=info

通过这些步骤,应该能确保在 Django Admin 界面中正确显示 Flask 应用中 Celery Worker 发起的任务和任务执行结果。

如果问题仍然存在,检查日志和配置是否有任何错误,并确保 Flask 和 Django 的所有 Celery 配置和数据库访问是有效且一致的。

总结

通过 Django Admin 管理 Celery Worker 任务是一种方便的方式,可以通过简单的配置和定制来实现任务的查询、查看、重试和终止等操作。

通过本文提供的步骤和示例,您可以轻松地在 Django 项目中集成 Celery Worker,并通过 Django Admin 界面对任务进行管理和监控。

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • Python异常对代码运行性能的影响实例解析

    Python异常对代码运行性能的影响实例解析

    这篇文章主要介绍了Python异常对代码运行性能的影响实例解析,分享了相关代码示例,小编觉得还是挺不错的,具有一定借鉴价值,需要的朋友可以参考下
    2018-02-02
  • 讯飞webapi语音识别接口调用示例代码(python)

    讯飞webapi语音识别接口调用示例代码(python)

    这篇文章主要介绍了如何使用Python3调用讯飞WebAPI语音识别接口,重点解决了在处理语音识别结果时判断是否为最后一帧的问题,通过运行代码并总结经验,解决了常见的模块和属性错误,需要的朋友可以参考下
    2025-03-03
  • python 实现提取某个索引中某个时间段的数据方法

    python 实现提取某个索引中某个时间段的数据方法

    今天小编就为大家分享一篇python 实现提取某个索引中某个时间段的数据方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2019-02-02
  • python爬虫入门教程--快速理解HTTP协议(一)

    python爬虫入门教程--快速理解HTTP协议(一)

    http协议是互联网里面最重要,最基础的协议之一,我们的爬虫需要经常和http协议打交道。下面这篇文章主要给大家介绍了关于python爬虫入门之快速理解HTTP协议的相关资料,文中介绍的非常详细,需要的朋友可以参考借鉴,下面来一起看看吧。
    2017-05-05
  • python爬虫MeterSphere平台执行报告使用进阶

    python爬虫MeterSphere平台执行报告使用进阶

    这篇文章主要为大家介绍了python爬虫MeterSphere平台执行报告使用进阶示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-12-12
  • Python decimal模块的使用示例详解

    Python decimal模块的使用示例详解

    decimal 模块decimal意思为十进制,这个模块提供了十进制浮点运算支持,本篇文章主要给大家讲解Python decimal模块的使用,需要的朋友可以参考下
    2023-03-03
  • python中random模块详解

    python中random模块详解

    Python中的random模块用于生成随机数,它提供了很多函数,本文给大家分享常用函数总结,感兴趣的朋友跟随小编一起看看吧
    2021-03-03
  • 初步认识Python中的列表与位运算符

    初步认识Python中的列表与位运算符

    这篇文章主要介绍了Python中的列表与位运算符,是Python入门学习中的基础知识,需要的朋友可以参考下
    2015-10-10
  • 关于Python中进度条的六个实用技巧分享

    关于Python中进度条的六个实用技巧分享

    在项目开发过程中加载、启动、下载项目难免会用到进度条,下面这篇文章主要给大家介绍了关于Python中进度条的六个实用技巧,文中通过示例代码介绍的非常详细,需要的朋友可以参考下
    2022-04-04
  • PyCharm 专业版安装图文教程

    PyCharm 专业版安装图文教程

    这篇文章主要介绍了PyCharm 专业版安装图文教程,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-02-02

最新评论