关于Django使用 django-celery-beat动态添加定时任务的方法

 更新时间:2021年10月22日 10:29:50   作者:以王姓自居  
本文给大家介绍Django使用 django-celery-beat动态添加定时任务的方法,安装对应的是celery版本,文中给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧

版本信息

# 插件安装 
Django==2.2.2
django-celery-beat==2.1.0
django-redis==4.8.0
mysqlclient==2.0.0
django-mysql==3.2.0
redis==3.2.1
uWSGI==2.0.17.1
django-redis-cache==2.1.0

安装与配置

  1. 安装上面的对应的celery版本
  2. 配置settings.py
# django时区配置
TIME_ZONE = 'Asia/Shanghai'
# 如果USE_TZ设置为True时,Django会使用系统默认设置的时区,此时的TIME_ZONE不管有没有设置都不起作用
# 如果USE_TZ 设置为False,TIME_ZONE = 'Asia/Shanghai', 则使用上海的UTC时间。
USE_TZ = False

INSTALLED_APPS = (
    ...,
    'django_celery_beat',
)

# celery beat配置
# CELERY_ENABLE_UTC = False
CELERY_TIMEZONE = TIME_ZONE
DJANGO_CELERY_BEAT_TZ_AWARE = False
CELERY_BEAT_SCHEDULER = 'django-celery-beat.schedulers.DatabaseScheduler'
# celery 的启动工作数量设置
CELERY_WORKER_CONCURRENCY = 10
# 任务预取功能,会尽量多拿 n 个,以保证获取的通讯成本可以压缩。
CELERYD_PREFETCH_MULTIPLIER = 20
# 有些情况下可以防止死锁
CELERYD_FORCE_EXECV = True
# celery 的 worker 执行多少个任务后进行重启操作
CELERY_WORKER_MAX_TASKS_PER_CHILD = 100
# 禁用所有速度限制,如果网络资源有限,不建议开足马力。
CELERY_DISABLE_RATE_LIMITS = True
# 设置代理人broker
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/2'
# 指定 Backend
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'

生成数据库

python manage.py migrate

# 迁移之后生成的表结构
django_celery_beat.models.PeriodicTask 此模型定义要运行的单个周期性任务。
django_celery_beat.models.IntervalSchedule 以特定间隔(例如,每5秒)运行的计划。
django_celery_beat.models.CrontabSchedule 与像在cron项领域的时间表 分钟小时日的一周 DAY_OF_MONTH month_of_year
django_celery_beat.models.PeriodicTasks 此模型仅用作索引以跟踪计划何时更改

在工作目录下配置celery.py

# -*- coding: utf-8 -*-
# @File:    celeryc.py
# @Content: celery定时任务配置


import os
from celery import Celery, platforms
from celery.schedules import crontab
from django.conf import settings

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "django_server.settings")
app = Celery("django_server")
app.config_from_object("django.conf:settings", namespace="CELERY")
# 定时任务的存放位置
app.autodiscover_tasks(["monitoring.tasks", "wechat.tasks"])

创建tasks任务

from celery import shared_task

@shared_task
def alarm_monitor_task(**kwargs):
  print("定时任务!!!")

创建定时任务

from django_celery_beat.models import PeriodicTask, IntervalSchedule

-----周期性任务
#  创建10分钟的间隔 interval 对象
schedule, _ = IntervalSchedule.objects.update_or_create(every=10, period=IntervalSchedule.MINUTES)
# 如果任务存在就更新,不存在则创建
PeriodicTask.objects.update_or_create(
  defaults={
    "interval": schedule,  # 上面创建10分钟的间隔 interval 对象
    "task": "monitoring.tasks.alarm_monitor_task",  # 指定需要周期性执行的任务
    "args"=json.dumps(['arg1', 'arg2']),
    "kwargs": json.dumps({"a": 1, "b": 2}, ensure_ascii=False)  # 传入的参数
  },
  name="定时任务-task",
)
# 周期性任务可选参数
IntervalSchedule.DAYS 固定间隔天数
IntervalSchedule.HOURS 固定间隔小时数
IntervalSchedule.MINUTES 固定间隔分钟数
IntervalSchedule.SECONDS 固定间隔秒数
IntervalSchedule.MICROSECONDS 固定间隔微秒


----Crontab 周期性任务
from django_celery_beat.models import CrontabSchedule, PeriodicTask
# 创建间隔30分钟执行的任务
crontab, _ = CrontabSchedule.objects.update_or_create(
  minute="*/30",
  hour="*",
  day_of_week="*",
  day_of_month='*',
  month_of_year='*',
  timezone=pytz.timezone("Asia/Shanghai"),
)
# 任务存在则更新,不存在创建
PeriodicTask.objects.update_or_create(
  name=task_name,
  defaults={
    "kwargs": json.dumps(kwargs, ensure_ascii=False),
    "task": "wechat.tasks.subscribe_task",
    "crontab": crontab,
  },
)

# 删除任务
task = PeriodicTask.objects.filter(name__startswith=sub_id)
if task:
    task.update(enabled=False)
    task.delete()
    
    
# 暂停当前任务
tasks = PeriodicTask.objects.filter(name__startswith=sub_id)
if tasks:
    tasks.update(enabled=True if status else False)

运行任务

# 启动任务 work
celery -A django_server worker -l INFO --logfile=/var/log/dec_server/worker.log

# 启动定时器触发 beat
celery -A django_server beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler --logfile=/var/log/dec_server/beat.log

Tips:

参考链接:

https://github.com/celery/django-celery-beat

https://pypi.org/project/django-celery-beat/

到此这篇关于Django使用 django-celery-beat动态添加定时任务的文章就介绍到这了,更多相关django celery beat动态添加定时任务内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • pandas 将list切分后存入DataFrame中的实例

    pandas 将list切分后存入DataFrame中的实例

    今天小编就为大家分享一篇pandas 将list切分后存入DataFrame中的实例,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-07-07
  • python实现获取电脑所连接的wifi密码

    python实现获取电脑所连接的wifi密码

    电脑连接wifi后,很难直观地看到当前连接wifi的密码,需要借助命令行公管局才可以查看到相关信息,本文为大家介绍一下如何利用python获取电脑所连接的wifi密码,感兴趣的可以了解下
    2023-11-11
  • Pytorch中关于RNN输入和输出的形状总结

    Pytorch中关于RNN输入和输出的形状总结

    这篇文章主要介绍了Pytorch中关于RNN输入和输出的形状总结,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-06-06
  • python 通过 socket 发送文件的实例代码

    python 通过 socket 发送文件的实例代码

    这篇文章主要介绍了python 通过 socket 发送文件的实例代码,非常不错,具有一定的参考借鉴价值,需要的朋友可以参考下
    2018-08-08
  • 聊聊通过celery_one避免Celery定时任务重复执行的问题

    聊聊通过celery_one避免Celery定时任务重复执行的问题

    Celery Once 也是利用 Redis 加锁来实现, Celery Once 在 Task 类基础上实现了 QueueOnce 类,该类提供了任务去重的功能,今天通过本文给大家介绍通过celery_one避免Celery定时任务重复执行的问题,感兴趣的朋友一起看看吧
    2021-10-10
  • 详解Python的三种拷贝方式

    详解Python的三种拷贝方式

    Python中有三种拷贝方式分别是浅拷贝、深拷贝和赋值拷贝,这篇文章通过实例代码给大家介绍了Python的三种拷贝方式,需要的朋友可以参考下
    2020-02-02
  • python3 googletrans超时报错问题及翻译工具优化方案 附源码

    python3 googletrans超时报错问题及翻译工具优化方案 附源码

    这篇文章主要介绍了python3 googletrans超时报错问题及翻译工具优化方案 附源码,本文给大家分享解决方法,通过实例代码相结合给大家介绍的非常详细,需要的朋友可以参考下
    2020-12-12
  • python多线程抽象编程模型详解

    python多线程抽象编程模型详解

    这篇文章主要为大家详细介绍了python多线程抽象编程模型,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2019-03-03
  • python实现中文文本分句的例子

    python实现中文文本分句的例子

    今天小编就为大家分享一篇python实现中文文本分句的例子,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2019-07-07
  • Sentry错误日志监控使用方法解析

    Sentry错误日志监控使用方法解析

    这篇文章主要介绍了Sentry错误日志监控使用方法解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-11-11

最新评论