Python使用apscheduler模块设置定时任务的实现

 更新时间:2022年05月25日 15:51:52   作者:redrose2100  
本文主要介绍了Python使用apscheduler模块设置定时任务的实现,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

一、安装

pip install apscheduler

二、ApScheduler 简介

1 APScheduler的组件

triggers:触发器
triggers包含任务执行的调度逻辑,决定任务按照什么逻辑进行定时执行

job stores;任务存储器
存储了调度任务

executors:执行器
用例执行任务的,包含线程池以及进程池等的创建和调用等等

schedulers:调度器
属于控制面,将其他几个方面组织起来的作用、

2 调度器的种类

调度器有以下几种常见类型,其中最常用的BackgroundScheduler,即非阻塞式,因为在一般情况下,定时任务都会在放到web服务中,如果使用阻塞式,则无法启动web服务,而使用非阻塞式,则将定时任务设定后,就不管了,继续执行后面的web服务,只要web服务在运行,定时任务就是一直有效的

  • BlockingScheduler: 阻塞式
  • BackgroundScheduler: 非阻塞式(后台运行)
  • AsyncIOScheduler: 当使用asyncio模块时使用
  • GeventScheduler: 当使用gevent模块时使用
  • TornadoScheduler: 构建Tornado应用时使用
  • TwistedScheduler: 构建Twisted应用时使用
  • QtScheduler: 构建Qt应用时使用

3 内置的触发器类型

  • date: 在某个时间点执行一次时使用
  • interval: 固定的时间间隔循环执行时使用
  • cron: 在一天中特定的时间点执行时使用
  • calendarinterval: 当想在在一天中特定时间点或以日历为基础的时间间隔内执行时使用

三、使用举例

这里jiu就以非阻塞式BackgroundScheduler调度器为例展开

1 使用date类型的触发器

如下,使用了三种设置日期和时间的方法

from apscheduler.schedulers.background import BackgroundScheduler
import time
from datetime import date
from datetime import datetime


def do_func(name,age):
    print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))+" in do func : 姓名:"+name+" 年龄:"+str(age))

def main():
    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
    sched=BackgroundScheduler()

    # 通过date 设置指定日期执行
    sched.add_job(do_func,trigger="date",run_date=date(2022,5,25),args=("张三丰",100))

    # 通过datetime,设置指定日期额指定时刻执行
    sched.add_job(do_func, trigger="date", run_date=datetime(2022, 5, 25,14,0,10), args=("张三丰", 100))

    # 直接使用文本的方式指定日期和时刻表
    sched.add_job(do_func, trigger="date", run_date="2022-05-25 14:0:20", args=("张三丰", 100))

    sched.start()

if __name__=="__main__":
    main()
    while True:
        print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
        time.sleep(1)

执行结果如下,可以发现,第一个通过date指定日期的默认是0点执行,显然时间已经过了,不会执行,第二和第三个则在规定的时间点执行了,这里还需要注意的是,通过打印可以看出,main函数执行完成后,已经开始执行main函数下面的while循环打印语句了,而在执行循环的过程中,定时任务仍然生效,这就是非阻塞式调度器的原理,如果是阻塞式,则在此代码中,会一直卡在main函数中,main下面额while循环语句是不会执行的,因此在实际使用中,非阻塞式应用的是非常多的

2022-05-25 14:00:02
2022-05-25 14:00:02
C:\Users\hitre\.virtualenvs\zentaolinkgitlab-QCS5yxD9\lib\site-packages\apscheduler\util.py:436: PytzUsageWarning: The localize method is no longer necessary, as this time zone supports the fold attribute (PEP 495). For more details on migrating to a PEP 495-compliant implementation, see https://pytz-deprecation-shim.readthedocs.io/en/latest/migration.html
  return tzinfo.localize(dt)
Run time of job "do_func (trigger: date[2022-05-25 00:00:00 CST], next run at: 2022-05-25 00:00:00 CST)" was missed by 14:00:02.088260
2022-05-25 14:00:03
2022-05-25 14:00:04
2022-05-25 14:00:05
2022-05-25 14:00:06
2022-05-25 14:00:07
2022-05-25 14:00:08
2022-05-25 14:00:09
2022-05-25 14:00:10 in do func : 姓名:张三丰 年龄:100
2022-05-25 14:00:10
2022-05-25 14:00:11
2022-05-25 14:00:12
2022-05-25 14:00:13
2022-05-25 14:00:14
2022-05-25 14:00:15
2022-05-25 14:00:16
2022-05-25 14:00:17
2022-05-25 14:00:18
2022-05-25 14:00:19
2022-05-25 14:00:20 in do func : 姓名:张三丰 年龄:100
2022-05-25 14:00:20
2022-05-25 14:00:21
2022-05-25 14:00:22

2 使用interval类型的触发器

如下代码演示了时间间隔循环执行的使用例子

from apscheduler.schedulers.background import BackgroundScheduler
import time
from datetime import date
from datetime import datetime


def do_func(name,age):
    print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))+" in do func : 姓名:"+name+" 年龄:"+str(age))

def main():
    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
    sched=BackgroundScheduler()

    # 每3秒执行一次
    sched.add_job(do_func,trigger="interval",args=("张三丰",100),seconds=3)
    # 每3分钟执行一次
    sched.add_job(do_func, trigger="interval", args=("张三丰", 100), minutes=3)
    # 每3小时执行一次
    sched.add_job(do_func, trigger="interval", args=("张三丰", 100), hours=3)
    # 每3天执行一次
    sched.add_job(do_func, trigger="interval", args=("张三丰", 100), days=3)
    # 每3周执行一次
    sched.add_job(do_func, trigger="interval", args=("张三丰", 100), weeks=3)

    sched.start()

if __name__=="__main__":
    main()
    while True:
        print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
        time.sleep(1)

上面的代码中因为时间跨度比较大,这里只演示妹3秒执行一次的代码

代码如下:

from apscheduler.schedulers.background import BackgroundScheduler
import time
from datetime import date
from datetime import datetime


def do_func(name,age):
    print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))+" in do func : 姓名:"+name+" 年龄:"+str(age))

def main():
    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
    sched=BackgroundScheduler()

    # 每3秒执行一次
    sched.add_job(do_func,trigger="interval",args=("张三丰",100),seconds=3)

    sched.start()

if __name__=="__main__":
    main()
    while True:
        print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
        time.sleep(1)

执行结果如下:

2022-05-25 14:14:04
2022-05-25 14:14:04
2022-05-25 14:14:05
2022-05-25 14:14:06
2022-05-25 14:14:07 in do func : 姓名:张三丰 年龄:100
2022-05-25 14:14:07
2022-05-25 14:14:08
2022-05-25 14:14:09
2022-05-25 14:14:10 in do func : 姓名:张三丰 年龄:100
2022-05-25 14:14:10
2022-05-25 14:14:11
2022-05-25 14:14:12
2022-05-25 14:14:13 in do func : 姓名:张三丰 年龄:100
2022-05-25 14:14:13

3 使用cron类型的触发器

cron的触发器有点类似linux上crontab定时器的使用,代码如下:

from apscheduler.schedulers.background import BackgroundScheduler
import time
from datetime import date
from datetime import datetime


def do_func(name,age):
    print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))+" in do func : 姓名:"+name+" 年龄:"+str(age))

def main():
    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
    sched=BackgroundScheduler()

    # 任务会在6月、7月、8月、11月和12月的第三个周五,00:00、01:00、02:00和03:00触发
    sched.add_job(do_func,trigger="cron",month='6-8,11-12', day='3rd fri', hour='0-3',args=("张三丰",100))

    sched.start()

if __name__=="__main__":
    main()
    while True:
        print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
        time.sleep(1)

这里需要注意的是,可以省略不需要的字段。当省略时间参数时,在显式指定参数之前的参数会被设定为*,之后的参数会被设定为最小值,week 和day_of_week的最小值为*

day=1, minute=20
等同于
year='*', month='*', day=1, week='*', day_of_week='*', hour='*', minute=20, second=0

此外,也可以直接使用crontab表达式,如下:

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
import time
from datetime import date
from datetime import datetime


def do_func(name,age):
    print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))+" in do func : 姓名:"+name+" 年龄:"+str(age))

def main():
    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
    sched=BackgroundScheduler()

    # 任务会在6月、7月、8月、11月和12月的第三个周五,00:00、01:00、02:00和03:00触发
    sched.add_job(do_func,trigger=CronTrigger.from_crontab('48 10 1-15 sep-nov *'),args=("张三丰",100))

    sched.start()

if __name__=="__main__":
    main()
    while True:
        print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
        time.sleep(1)

四、定时器使用装饰器的方法

以间隔时间循环执行的代码为例,如下为未使用装饰器的方式

from apscheduler.schedulers.background import BackgroundScheduler
import time
from datetime import date
from datetime import datetime


def do_func(name,age):
    print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))+" in do func : 姓名:"+name+" 年龄:"+str(age))

def main():
    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
    sched=BackgroundScheduler()

    # 每3秒执行一次
    sched.add_job(do_func,trigger="interval",args=("张三丰",100),seconds=3)

    sched.start()

if __name__=="__main__":
    main()
    while True:
        print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
        time.sleep(1)

修改为使用装饰器的方式如下:

from apscheduler.schedulers.background import BackgroundScheduler
import time

sched=BackgroundScheduler()

@sched.scheduled_job(trigger="interval",args=("张三丰",100),seconds=3)
def do_func(name,age):
    print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))+" in do func : 姓名:"+name+" 年龄:"+str(age))

if __name__=="__main__":
    sched.start()
    while True:
        print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
        time.sleep(1)

执行结果如下:

2022-05-25 14:34:10
2022-05-25 14:34:11
2022-05-25 14:34:12
2022-05-25 14:34:13 in do func : 姓名:张三丰 年龄:100
2022-05-25 14:34:13
2022-05-25 14:34:14
2022-05-25 14:34:15
2022-05-25 14:34:16 in do func : 姓名:张三丰 年龄:100
2022-05-25 14:34:16
2022-05-25 14:34:17
2022-05-25 14:34:18
2022-05-25 14:34:19 in do func : 姓名:张三丰 年龄:100
2022-05-25 14:34:19
2022-05-25 14:34:20

到此这篇关于Python使用apscheduler模块设置定时任务的实现的文章就介绍到这了,更多相关Python apscheduler定时任务内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 在 Django/Flask 开发服务器上使用 HTTPS

    在 Django/Flask 开发服务器上使用 HTTPS

    使用 Django 或 Flask 这种框架开发 web app 的时候一般都会用内建服务器开发和调试程序,等程序完成后再移交到生产环境部署。问题是这些内建服务器通常都不支持 HTTPS,那么我们来探讨下开启https吧
    2014-07-07
  • python 实用工具状态机transitions

    python 实用工具状态机transitions

    这篇文章主要介绍了python 实用工具状态机transitions的使用,帮助大家更好的理解和学习python,感兴趣的朋友可以了解下
    2020-11-11
  • Python使用Numpy模块读取文件并绘制图片

    Python使用Numpy模块读取文件并绘制图片

    这篇文章主要介绍了Python使用Numpy模块读取文件并绘制图片,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-05-05
  • pycharm中使用request和Pytest进行接口测试的方法

    pycharm中使用request和Pytest进行接口测试的方法

    这篇文章主要介绍了pycharm中使用request和Pytest进行接口测试的方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-07-07
  • pycharm查看变量值的4种方法汇总

    pycharm查看变量值的4种方法汇总

    因为Python是脚本语言,不会进行编译,所以只有执行到那一行,才能知道那个变量的类型,下面这篇文章主要给大家介绍了关于pycharm查看变量值的4种方法,需要的朋友可以参考下
    2022-04-04
  • 使用python绘制人人网好友关系图示例

    使用python绘制人人网好友关系图示例

    这篇文章主要介绍了使用python绘制人人网好友关系图示例,需要的朋友可以参考下
    2014-04-04
  • 学习python之编写简单简单连接数据库并执行查询操作

    学习python之编写简单简单连接数据库并执行查询操作

    这篇文章主要介绍了学习python之编写简单简单连接数据库并执行查询操作,需要的朋友可以参考下
    2016-02-02
  • 解锁Python并发编程中多线程与多进程的应用

    解锁Python并发编程中多线程与多进程的应用

    本文我们将先从基本概念开始,然后通过详细举例探讨每一种机制,特别关注多线程和多进程的应用,最后分享一些实战经验以及一种优雅的编程技巧,希望对大家有所帮助
    2023-05-05
  • 深度剖析使用python抓取网页正文的源码

    深度剖析使用python抓取网页正文的源码

    平时打开一个网页,除了文章的正文内容,通常会有一大堆的导航,广告和其他方面的信息。本文的目的,在于说明如何从一个网页中提取出文章的正文内容,而过渡掉其他无关的的信息。
    2014-06-06
  • python 线程的五个状态

    python 线程的五个状态

    这篇文章主要介绍了python 线程的五个状态,帮助大家更好的理解和学习python,感兴趣的朋友可以了解下
    2020-09-09

最新评论