apschedule的定时任务实例(python)
更新时间:2026年01月07日 16:03:00 作者:小徐敲java
文章介绍了四种配置定时任务的方法,包括直接代码配置、YAML配置文件管理、装饰器方式自动注册任务以及关闭定时任务的方法
1:方法一:直接代码配置定时任务
使用APScheduler的BackgroundScheduler直接配置多个定时任务,通过环境变量控制执行间隔
import os
from apscheduler.schedulers.background import BackgroundScheduler
def init_scheduler():
"""初始化多个定时任务"""
scheduler = BackgroundScheduler(timezone="Asia/Shanghai")
# 定义任务配置
tasks_config = [
{
'func': sync_analysis_s7_data,
'name': '同步AnalysisS7数据',
'interval': int(os.getenv("TASK_INTERVAL_S7", 10)),
'id': 'sync_analysis_s7'
},
{
'func': sync_user_data,
'name': '同步用户数据',
'interval': int(os.getenv("TASK_INTERVAL_USER", 30)),
'id': 'sync_user_data'
},
{
'func': generate_report,
'name': '生成报表',
'interval': int(os.getenv("TASK_INTERVAL_REPORT", 3600)),
'id': 'generate_report'
}
]
# 动态添加任务
for task in tasks_config:
scheduler.add_job(
func=task['func'],
trigger="interval",
seconds=task['interval'],
id=task['id'],
name=task['name'],
replace_existing=True
)
print(f"任务已添加:{task['name']},间隔:{task['interval']}秒")
logger.info(f"任务已添加:{task['name']},间隔:{task['interval']}秒")
scheduler.start()
print("所有定时任务已启动")
logger.info("所有定时任务已启动")
return scheduler
2:方法二:YAML配置文件管理
建一个配置文件 tasks.yaml
通过YAML文件定义任务配置,支持interval和cron两种触发器类型
tasks:
- name: "同步AnalysisS7数据"
function: "sync_analysis_s7_data"
interval_seconds: 10
id: "sync_analysis_s7"
- name: "同步用户数据"
function: "sync_user_data"
interval_seconds: 30
id: "sync_user_data"
- name: "生成报表"
function: "generate_report"
interval_seconds: 3600
id: "generate_report"
- name: "清理日志"
function: "clean_logs"
cron: "0 2 * * *" # 每天凌晨2点执行
id: "clean_logs"
对应的实现代码
import yaml
from apscheduler.schedulers.background import BackgroundScheduler
def load_tasks_from_yaml(config_path):
"""从YAML配置文件加载任务"""
with open(config_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
return config['tasks']
def get_function_by_name(name):
"""根据名称获取对应的函数对象"""
functions = {
'sync_analysis_s7_data': sync_analysis_s7_data,
'sync_user_data': sync_user_data,
'generate_report': generate_report,
'clean_logs': clean_logs
}
return functions.get(name)
def init_scheduler():
"""基于配置文件初始化定时任务"""
scheduler = BackgroundScheduler(timezone="Asia/Shanghai")
# 加载任务配置
tasks = load_tasks_from_yaml('tasks.yaml')
# 添加任务
for task in tasks:
func = get_function_by_name(task['function'])
if not func:
logger.warning(f"未找到函数: {task['function']}")
continue
# 支持interval和cron两种触发器
if 'interval_seconds' in task:
scheduler.add_job(
func=func,
trigger="interval",
seconds=task['interval_seconds'],
id=task['id'],
name=task['name'],
replace_existing=True
)
print(f"间隔任务已添加:{task['name']},间隔:{task['interval_seconds']}秒")
elif 'cron' in task:
cron_parts = task['cron'].split()
scheduler.add_job(
func=func,
trigger="cron",
minute=cron_parts[0],
hour=cron_parts[1],
day=cron_parts[2],
month=cron_parts[3],
day_of_week=cron_parts[4],
id=task['id'],
name=task['name'],
replace_existing=True
)
print(f"Cron任务已添加:{task['name']},调度:{task['cron']}")
scheduler.start()
print("所有定时任务已启动")
return scheduler
3: 装饰器方式自动注册任务
使用装饰器自动收集任务配置,简化任务注册过程
from functools import wraps
from apscheduler.schedulers.background import BackgroundScheduler
# 全局任务注册表
_task_registry = []
def scheduled_task(name, interval_seconds=None, cron=None, task_id=None):
"""装饰器:注册定时任务"""
def decorator(func):
_task_registry.append({
'func': func,
'name': name,
'interval_seconds': interval_seconds,
'cron': cron,
'id': task_id or func.__name__
})
return func
return decorator
# 使用装饰器定义任务
@scheduled_task(name="同步AnalysisS7数据", interval_seconds=10)
def sync_analysis_s7_data():
pass
@scheduled_task(name="同步用户数据", interval_seconds=30)
def sync_user_data():
pass
@scheduled_task(name="每日报告", cron="0 9 * * *") # 每天9点执行
def daily_report():
pass
def init_scheduler():
"""初始化所有装饰器注册的任务"""
scheduler = BackgroundScheduler(timezone="Asia/Shanghai")
for task_info in _task_registry:
kwargs = {
'func': task_info['func'],
'id': task_info['id'],
'name': task_info['name'],
'replace_existing': True
}
# 根据配置选择触发器类型
if task_info['interval_seconds']:
kwargs.update({
'trigger': "interval",
'seconds': task_info['interval_seconds']
})
elif task_info['cron']:
cron_parts = task_info['cron'].split()
kwargs.update({
'trigger': "cron",
'minute': cron_parts[0],
'hour': cron_parts[1],
'day': cron_parts[2],
'month': cron_parts[3],
'day_of_week': cron_parts[4]
})
scheduler.add_job(**kwargs)
print(f"任务已添加:{task_info['name']}")
scheduler.start()
print("所有定时任务已启动")
return scheduler
4:注意最终记得要关闭定时任务
4-1:方法一
def signal_handler(signum, frame):
"""信号处理函数"""
print("接收到关闭信号,正在停止定时任务...")
scheduler.shutdown()
print("定时任务已停止")
sys.exit(0)
# 注册信号处理器
signal.signal(signal.SIGINT, signal_handler) # Ctrl+C
signal.signal(signal.SIGTERM, signal_handler) # 终止信号
4-2:方法二
import atexit
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler(timezone="Asia/Shanghai")
scheduler.add_job(sync_analysis_s7_data, "interval", seconds=10)
scheduler.start()
def cleanup():
"""清理函数"""
print("正在清理资源...")
scheduler.shutdown()
# 注册退出时的清理函数
atexit.register(cleanup)
4-3:方法三
只是依赖FastApi框架
@app.on_event("shutdown")
async def shutdown_event():
"""应用关闭时执行"""
# 停止定时任务
scheduler.shutdown()
logger.info("===== AnalysisS7数据服务关闭 =====")
5:总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。
相关文章
Sentinel网关限流与SpringCloud Gateway整合过程
本文介绍了如何通过SpringCloudGateway集成阿里的Sentinel进行网关限流,Sentinel作为流量防卫兵,提供了丰富的应用场景和完备的实时监控功能,通过配置路由维度和自定义API维度的限流规则,实现了对微服务的保护2024-11-11
Springboot服务引用Nacos中新增的配置文件失败问题及解决
这篇文章主要介绍了Springboot服务引用Nacos中新增的配置文件失败问题及解决,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教2025-06-06


最新评论