Django-celery-beat动态添加周期性任务实现过程解析

 更新时间:2020年11月26日 10:26:21   作者:-零  
这篇文章主要介绍了Django-celery-beat动态添加周期性任务实现过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下

前期准备

1.beat插件安装

pip3 install django-celery-beat

2.注册APP

INSTALLED_APPS = [
....
'django_celery_beat',
]

3.数据库变更

python3 manage.py migrate django_celery_beat

配置工作

目录结构请参考://www.jb51.net/article/200659.htm

1.配置celerypro.py

from __future__ import absolute_import
import os
from celery import Celery
from django.conf import settings
from django.utils import timezone

# set the default Django settings module for the 'celery' program.
# 为celery设置环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'voice_quality_assurance_configure.settings')
# 创建celery app
app = Celery('voice_quality_assurance_configure')
# Using a string here means the worker will not have to
# pickle the object when using Windows.
# 从单独的配置模块中加载配置
app.config_from_object('voice_quality_assurance_configure.celeryconfig')
# 设置app自动加载任务
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
# 解决时区问题,定时任务启动就循环输出
app.now = timezone.now

2.配置celeryconfig.py

from __future__ import absolute_import
from kombu import Queue
from django.conf import settings

# 设置代理人broker
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/2'
# 指定 Backend
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'
# 指定时区,默认是 UTC
CELERY_TIMEZONE='Asia/Shanghai'
# celery 序列化与反序列化配置
CELERY_TASK_SERIALIZER = 'pickle'
CELERY_RESULT_SERIALIZER = 'pickle'
CELERY_ACCEPT_CONTENT = ['pickle', 'json']
CELERY_IGNORE_RESULT = True
# celery 的启动工作数量设置
CELERY_WORKER_CONCURRENCY = 10
# 任务预取功能,会尽量多拿 n 个,以保证获取的通讯成本可以压缩。
CELERYD_PREFETCH_MULTIPLIER = 20
# 有些情况下可以防止死锁
CELERYD_FORCE_EXECV = True
# celery 的 worker 执行多少个任务后进行重启操作
CELERY_WORKER_MAX_TASKS_PER_CHILD = 100
# 禁用所有速度限制,如果网络资源有限,不建议开足马力。
CELERY_DISABLE_RATE_LIMITS = True

# celery beat配置(周期性任务设置)
CELERY_ENABLE_UTC = False
CELERY_TIMEZONE = settings.TIME_ZONE
DJANGO_CELERY_BEAT_TZ_AWARE = False
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'

3.分别启动woker和beta

项目根目录终端执行(voice_quality_assurance_configure为项目名称,简单来说,和manage.py文件同级)

celery -A voice_quality_assurance_configure beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler #

启动beta 调度器使用数据库

celery worker -A voice_quality_assurance_configure --loglevel=info -n worker1 #启动celery worker

4.创建周期性任务

from datetime import datetime, timedelta
import json
import os,django

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "voice_quality_assurance_configure.settings")# project_name 项目名称
django.setup()
from django_celery_beat.models import PeriodicTask, IntervalSchedule
schedule, created = IntervalSchedule.objects.get_or_create(every=10,period=IntervalSchedule.SECONDS,)

# 带参数的创建方法,如下:
PeriodicTask.objects.create(
   interval=schedule,     # 上面创建10秒的间隔 interval 对象
   name='test_task',     # 设置任务的name值
   task='mission.tasks.my_task', # 指定需要周期性执行的任务
   args=json.dumps([10, 2, 76]),
  expires=datetime.utcnow() + timedelta(seconds=30)
)

详解创建周期性任务的方法

创建基于interval的周期性任务

第一步创建间隔对象

schedule, created = IntervalSchedule.objects.get_or_create(
  every=10,
  period=IntervalSchedule.SECONDS,
)

IntervalSchedule.DAYS 固定间隔天数
IntervalSchedule.HOURS 固定间隔小时数
IntervalSchedule.MINUTES 固定间隔分钟数
IntervalSchedule.SECONDS 固定间隔秒数
IntervalSchedule.MICROSECONDS 固定间隔微秒

第二步创建任务

无参数的创建方法:

PeriodicTask.objects.create(
   interval=schedule,         # we created this above.
   name='test_task',     # simply describes this periodic task.
   task='app名.tasks.任务函数名', # name of task.)

有参数的创建方法:

PeriodicTask.objects.create(
   interval=schedule,         # we created this above.
   name='test'_task',     # simply describes this periodic task.
   task='app名.tasks.任务函数名', # name of task. 
   args=json.dumps(['arg1', 'arg2']), 
   kwargs=json.dumps({ 'be_careful': True, }), 
   expires=datetime.utcnow() + timedelta(seconds=30) )
class MonitorDeviceTask(object):
  """
  设备创建,增加周期性任务
  """

  def __init__(self, device_obj):
    self.device_obj = device_obj
    self.periodic_task = PeriodicTask.objects.create(
      interval=schedule,
      name='test_task',
      task='mission.tasks.my_task',
      args=json.dumps([self.device_obj.ip])
    )

  def starttask(self):
    """
    启动任务
    """
    self.periodic_task.enabled = True
    self.periodic_task.save()

  def stoptask(self):
    """
    停止任务
    """
    self.periodic_task.enabled = False
    self.periodic_task.save()

  def deltask(self):
    """
    删除任务
    """
    self.periodic_task.delete()  
    self.periodic_task.save()

创建基于 crontab 的周期性任务

from django_celery_beat.models import CrontabSchedule, PeriodicTask
schedule, _ = CrontabSchedule.objects.get_or_create(
   minute='30',
   hour='*',
   day_of_week='*',
   day_of_month='*',
   month_of_year='*',
   timezone=pytz.timezone('Canada/Pacific')
)

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

相关文章

  • python将pandas datarame保存为txt文件的实例

    python将pandas datarame保存为txt文件的实例

    今天小编就为大家分享一篇python将pandas datarame保存为txt文件的实例,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2019-02-02
  • python print出共轭复数的方法详解

    python print出共轭复数的方法详解

    在本篇内容里小编给大家分享的是关于python print出共轭复数的方法总结内容,有需要的读者们可以学习下。
    2019-06-06
  • Python报错:ModuleNotFoundError的解决办法

    Python报错:ModuleNotFoundError的解决办法

    "ModuleNotFoundError: No module named 'xxx'"这个报错是个非常常见的报错,几乎每个python程序员都遇到过,下面这篇文章主要给大家介绍了关于Python报:ModuleNotFoundError错误的解决办法,需要的朋友可以参考下
    2022-06-06
  • 简单了解什么是神经网络

    简单了解什么是神经网络

    这篇文章主要介绍了简单了解什么是神经网络,具有一定借鉴价值,需要的朋友可以参考下。
    2017-12-12
  • python itertools包内置无限迭代器

    python itertools包内置无限迭代器

    这篇文章主要介绍了python itertools包内置无限迭代器, Python的内建模块itertools提供了非常有用的用于操作迭代对象的函数,itertools提供的几个“无限”迭代器。下文更多相关内容,需要的朋友可以参考一下
    2022-03-03
  • pytest配置文件pytest.ini的具体使用

    pytest配置文件pytest.ini的具体使用

    本文主要介绍了pytest配置文件pytest.ini的具体使用,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-07-07
  • Python实现批量下载ts文件并合并为mp4

    Python实现批量下载ts文件并合并为mp4

    这篇文章主要为大家详细介绍了如何通过Python语言实现批量下载ts文件并合并为mp4视频的功能,文中的示例代码讲解详细,感兴趣的小伙伴可以了解一下
    2023-06-06
  • Django自定义User模型、认证、权限控制的操作

    Django自定义User模型、认证、权限控制的操作

    这篇文章主要介绍了Django自定义User模型、认证、权限控制的操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2021-04-04
  • python实现同级目录调用的示例详解

    python实现同级目录调用的示例详解

    同级目录指的是位于同一级别的文件夹,这些文件夹具有相同的层级结构,它们相互平行,没有一个被包含在另一个之中,本文将给大家介绍python实现同级目录调用的示例,需要的朋友可以参考下
    2024-06-06
  • 关于Python 内置库 itertools

    关于Python 内置库 itertools

    今天得这篇文章就来给大家介绍一下Python的系统库itertools的 相关资料,需要的小伙伴可以参考下面文章的具体内容
    2021-09-09

最新评论