Python实现定时任务利器之apscheduler使用详解

 更新时间:2022年10月11日 14:52:01   作者:芥末拌饭  
在Python中,还可以用第三方包来管理定时任务,比如celery、apscheduler。相对来说apscheduler使用起来更简单一些,这里来介绍一下apscheduler的使用方法

前言

之前有介绍了用Linux crontab的方式来实现定时任务,这是使用Linux内置模块来实现的。而在Python中,还可以用第三方包来管理定时任务,比如celery、apscheduler。相对来说apscheduler使用起来更简单一些,这里来介绍一下apscheduler的使用方法。

首先安装起来很简单,运行pip install apscheduler即可。

初识apscheduler

来个简单的例子看看apscheduler是如何使用的。

#encoding:utf-8

from apscheduler.schedulers.blocking import BlockingScheduler
import datetime

def sch_test():
    now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print('时间:{}, 测试apscheduler'.format(now))

task = BlockingScheduler()
task.add_job(func=sch_test, trigger='cron', second='*/10')
task.start()

上述例子很简单,我们首先要定义一个apscheduler的对象,然后add_job添加任务,最后start开启任务就行了。

例子是每隔10秒运行一次sch_test任务,运行结果如下:

时间:2022-10-08 15:16:30, 测试apscheduler
时间:2022-10-08 15:16:40, 测试apscheduler
时间:2022-10-08 15:16:50, 测试apscheduler
时间:2022-10-08 15:17:00, 测试apscheduler

如果我们要在执行任务函数时携带参数,只要在add_job函数中添加args就行,比如task.add_job(func=sch_test, args=('a'), trigger='cron', second='*/10')

apscheduler有哪些模块

上面例子中我们初步了解到如何使用apschedulerl了,接下来需要知道apscheduler的设计框架。apscheduler有四个主要模块,分别是:触发器triggers任务存储器job_stores执行器executors调度器schedulers

1. 触发器triggers:

触发器指的是任务指定的触发方式,例子中我们用的是“cron”方式。我们可以选择cron、date、interval中的一个。

1.cron表示的是定时任务,类似linux crontab,在指定的时间触发。

可用参数如下:

参数释义
year年份(4位数,如2022)
month月份(1-12)
day一个月的第几天(1-31)
week一年的第几周(1-53)
day_of_week一星期的第几天(0-6)
hour小时
minute分钟
second
start_date开始时间
end_date结束时间
timezone时区
jitter触发的误差时间

除此之外,我们还可用表达式类型去设置cron。比如常用的有:

表达式释义
*每个值都触发
*/n每隔n触发一次
a-b在a-b内任何时间都触发
a,b,c分别在a,b,c时间触发

使用方法示例,在每天7点20分执行一次:

task.add_job(func=sch_test, args=('定时任务',), trigger='cron',

hour='7', minute='20')

2.date表示具体到某个时间的一次性任务;

使用方法示例:

# 使用run_date指定运行时间
task.add_job(func='sch_test', trigger='date', run_date=datetime.datetime(2022 ,10 , 8, 16, 1, 30))
# 或者用next_run_time
task.add_job(func=sch_test,trigger='date', next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=3))

3.interval表示的是循环任务,指定一个间隔时间,每过间隔时间执行一次。

interval可设置如下的参数:

参数释义
weeks
days一个月的第几天
hours小时
minutes分钟
seconds
start_date间隔触发的开始时间
end_date间隔触发的结束时间
jitter触发的时间误差

使用方法示例,每隔3秒执行一次sch_test任务:

task.add_job(func=sch_test, args=('循环任务',), trigger='interval', seconds=3)

来个例子把3种触发器都使用一遍:

# encoding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime

def sch_test(job_type):
    now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print('时间:{}, {}测试apscheduler'.format(now, job_type))

task = BlockingScheduler()
task.add_job(func=sch_test, args=('一次性任务',),trigger='date', next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=3))
task.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5')
task.add_job(func=sch_test, args=('循环任务',), trigger='interval', seconds=3)
task.start()

打印部分结果:

时间:2022-10-08 15:45:49, 一次性任务测试apscheduler
时间:2022-10-08 15:45:49, 循环任务测试apscheduler
时间:2022-10-08 15:45:50, 定时任务测试apscheduler
时间:2022-10-08 15:45:52, 循环任务测试apscheduler
时间:2022-10-08 15:45:55, 定时任务测试apscheduler
时间:2022-10-08 15:45:55, 循环任务测试apscheduler
时间:2022-10-08 15:45:58, 循环任务测试apscheduler

通过代码示例和结果展示,我们可清晰的知道不同触发器的使用区别。

2. 任务存储器job_stores

顾名思义,任务存储器是存储任务的地方,默认都是存储在内存中。我们也可自定义存储方式,比如将任务存到mysql中。这里有以下几种选择:

存储器类型释义
MemoryJobStore任务存储在内存中
SQLAlchemyJobStore使用sqlalchemy作为存储方式,存储在数据库
MongoDBJobStore存储在mongodb中
RedisJobStore存储在redis中

通常默认存储在内存即可,但若程序故障重启的话,会重新拉取任务运行了,如果你对任务的执行要求高,那么可以选择其他的存储器。

使用SQLAlchemyJobStore存储器示例:

from apscheduler.schedulers.blocking import BlockingScheduler

def sch_test(job_type):
    now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print('时间:{}, {}测试apscheduler'.format(now, job_type))

sched = BlockingScheduler()
# 使用mysql存储任务
sql_url = 'mysql+pymysql://root:root@localhost:3306/db_name?charset=utf8'
sched.add_jobstore('sqlalchemy',url=sql_url)
# 添加任务
sched.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5')
sched.start()

3. 执行器executors

执行器的功能就是将任务放到线程池或进程池中运行。有以下几种选择:

执行器类型释义
ThreadPoolExecutor线程池执行器
ProcessPoolExecutor进程池执行器
GeventExecutorGevent 程序执行器
TornadoExecutorTornado 程序执行器
TwistedExecutorTwisted 程序执行器
AsyncIOExecutorasyncio 程序执行器

默认是ThreadPoolExecutor, 常用的也就是第线程和进程池执行器。如果应用是CPU密集型操作,可用ProcessPoolExecutor来执行。

4. 调度器schedulers

调度器属于apscheduler的核心,它扮演着统筹整个apscheduler系统的角色,存储器、执行器、触发器在它的调度下正常运行。调度器有以下几个:

调度器使用场景
BlockingScheduler当调度器是你应用中唯一要运行的,start开启后会阻塞
BackgroundScheduler适用于调度程序在应用程序的后台运行,start开启后不会阻塞
AsyncIOScheduler当程序使用了asyncio的异步框架时使用。
GeventScheduler当程序用了Tornado的时候用
TwistedScheduler当程序用了Twisted的时候用
QtScheduler当应用是QT应用的时候用

不是特定场景下,我们最常用的是BlockingScheduler调度器。

异常监听

定时任务在运行时,若出现错误,需要设置监听机制,我们通常结合logging模块记录错误信息。

使用示例:

from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
from apscheduler.events import EVENT_JOB_EXECUTED , EVENT_JOB_ERROR
import logging

# logging日志配置打印格式及保存位置
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S',
                    filename='sche.log',
                    filemode='a')

def log_listen(event):
	if event.exception :
		print ( '任务出错,报错信息:{}'.format(event.exception))
	else:
		print ( '任务正常运行...' )

def sch_test(job_type):
    now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print('时间:{}, {}测试apscheduler'.format(now, job_type))
    print(1/0)

sched = BlockingScheduler()
# 使用mysql存储任务
sql_url = 'mysql+pymysql://root:root@localhost:3306/db?charset=utf8'
sched.add_jobstore('sqlalchemy',url=sql_url)
# 添加任务
sched.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5')

# 配置任务执行完成及错误时的监听
sched.add_listener(log_listen, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
# 配置日志监听
sched._logger = logging

sched.start()

apscheduler的封装使用

上面介绍了apscheduler框架的主要模块,我们基本能掌握怎样使用apscheduler了。下面就来封装一下apscheduler吧,以后要用直接在这份代码上修改就行了。

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.events import EVENT_JOB_EXECUTED , EVENT_JOB_ERROR
import logging
import logging.handlers
import os
import datetime


class LoggerUtils():
    def init_logger(self, logger_name):
        # 日志格式
        formatter = logging.Formatter('%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s')
        log_obj = logging.getLogger(logger_name)
        log_obj.setLevel(logging.INFO)
        # 设置log存储位置
        path = '/data/logs/'
        filename = '{}{}.log'.format(path, logger_name)
        if not os.path.exists(path):
            os.makedirs(path)
        # 设置日志按照时间分割
        timeHandler = logging.handlers.TimedRotatingFileHandler(
           filename,
           when='D',  # 按照什么维度切割, S:秒,M:分,H:小时,D:天,W:周
           interval=1, # 多少天切割一次
           backupCount=10  # 保留几天
        )
        timeHandler.setLevel(logging.INFO)
        timeHandler.setFormatter(formatter)
        log_obj.addHandler(timeHandler)
        return log_obj


class Scheduler(LoggerUtils):
    def __init__(self):
        # 执行器设置
        executors = {
            'default': ThreadPoolExecutor(10),  # 设置一个名为“default”的ThreadPoolExecutor,其worker值为10
            'processpool': ProcessPoolExecutor(5)  # 设置一个名为“processpool”的ProcessPoolExecutor,其worker值为5
        }
        self.scheduler = BlockingScheduler(timezone="Asia/Shanghai", executors=executors)
        # 存储器设置
        # 这里使用sqlalchemy存储器,将任务存储在mysql
        sql_url = 'mysql+pymysql://root:root@localhost:3306/db?charset=utf8'
        self.scheduler.add_jobstore('sqlalchemy',url=sql_url)

        def log_listen(event):
            if event.exception:
                # 日志记录
                self.scheduler._logger.error(event.traceback)
    
        # 配置任务执行完成及错误时的监听
        self.scheduler.add_listener(log_listen, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
        # 配置日志监听
        self.scheduler._logger = self.init_logger('sche_test')

    def add_job(self, *args, **kwargs):
        """添加任务"""
        self.scheduler.add_job(*args, **kwargs)

    def start(self):
        """开启任务"""
        self.scheduler.start()

# 测试任务
def sch_test(job_type):
    now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print('时间:{}, {}测试apscheduler'.format(now, job_type))
    print(1/0)


# 添加任务,开启任务
sched = Scheduler()
# 添加任务
sched.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5')
# 开启任务
sched.start()

小结

这篇文章介绍了Python实现定时任务的又一利器apscheduler,通过简单例子及apscheduler框架的主要模块分解,我们可以根据实际需求配置好模块信息,再结合logging模块,我们可以实时监控到定时任务的运行情况。

以上就是Python实现定时任务利器之apscheduler使用详解的详细内容,更多关于Python apscheduler定时任务的资料请关注脚本之家其它相关文章!

相关文章

  • Python解决%matplotlib inline标红底报错问题

    Python解决%matplotlib inline标红底报错问题

    在使用非Jupyter环境如Spyder或PyCharm时,%matplotlib inline会因为是Jupyter特有的魔法命令而导致报错,这条命令是用于Jupyter Notebook或Jupyter Qt Console中,主要作用是将matplotlib的图表直接嵌入到Notebook中显示
    2024-09-09
  • django框架forms组件用法实例详解

    django框架forms组件用法实例详解

    这篇文章主要介绍了django框架forms组件用法,结合实例形式详细分析了Django框架forms组件源码及常用操作方法与使用注意事项,需要的朋友可以参考下
    2019-12-12
  • python激活虚拟环境(venv)的实现

    python激活虚拟环境(venv)的实现

    本文主要介绍了python激活虚拟环境(venv)的实现,包括修改PATH环境变量、设置VIRTUAL_ENV变量、修改终端提示符、使用虚拟环境中的python和pip、加载虚拟环境的依赖等操作,感兴趣的可以了解一下
    2025-05-05
  • Python中functools模块的常用函数解析

    Python中functools模块的常用函数解析

    这篇文章主要介绍了Python中functools模块的常用函数解析,分别讲解了partial、update_wrapper、wraps、total_ordering的用法,需要的朋友可以参考下
    2016-06-06
  • python中dir()与__dict__属性的区别浅析

    python中dir()与__dict__属性的区别浅析

    这篇文章主要给大家介绍了关于python中dir()与__dict__属性的区别的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2018-12-12
  • Python实现曲线拟合操作示例【基于numpy,scipy,matplotlib库】

    Python实现曲线拟合操作示例【基于numpy,scipy,matplotlib库】

    这篇文章主要介绍了Python实现曲线拟合操作,结合实例形式分析了Python基于numpy,scipy,matplotlib库读取csv数据、计算曲线拟合及图形绘制相关操作技巧,需要的朋友可以参考下
    2018-07-07
  • python numpy元素的区间查找方法

    python numpy元素的区间查找方法

    今天小编就为大家分享一篇python numpy元素的区间查找方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-11-11
  • Python 中将值附加到集合的操作方法

    Python 中将值附加到集合的操作方法

    这篇文章主要介绍了Python 中将值附加到集合的操作方法,通过使用 add() 方法或 update() 方法,你可以向 Python 中的集合中添加元素,在添加元素时,需要注意不允许重复元素和集合是无序的,本文通过示例代码给大家介绍的非常详细,需要的朋友可以参考下
    2023-05-05
  • Python学习笔记整理3之输入输出、python eval函数

    Python学习笔记整理3之输入输出、python eval函数

    这篇文章主要介绍了Python学习笔记整理3之输入输出、python eval函数的相关资料,需要的朋友可以参考下
    2015-12-12
  • python pow函数的底层实现原理介绍

    python pow函数的底层实现原理介绍

    这篇文章主要介绍了python pow函数的底层实现原理介绍,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2021-03-03

最新评论