Python使用signal定时结束AsyncIOScheduler任务的问题

 更新时间:2021年07月26日 14:20:08   作者:返回主页临渊(v:superz-han)  
这篇文章主要介绍了Python使用signal定时结束AsyncIOScheduler任务,在使用aiohttp结合apscheduler的AsyncIOScheduler模拟定点并发的时候遇到两个问题,针对每个问题给大家详细介绍,需要的朋友可以参考下

在使用aiohttp结合apscheduler的AsyncIOScheduler模拟定点并发的时候遇到两个问题

  1. 在调度器scheduler.start()后,程序直接退出(在Jupiter中任务可以正常启动)
  2. 如何在指定时间调用scheduler.shutdown()? (因为程序直接退出了)

原调试代码如下:

from datetime import datetime, timedelta

import aiohttp
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
async def get(session):
    url = 'https://httpbin.org/get?a=1'
    async with session.get(url) as res:
        print('get', res.status)
        return await res.text()

async def post(session):
    url = 'https://httpbin.org/post?b=2'
    async with session.post(url) as res:
        print('post', res.status)
        return await res.text()
async def main():
    async with aiohttp.ClientSession() as session:
        await get(session)
        await post(session)

if __name__ == '__main__':
    jobstores = {'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')}
    scheduler = AsyncIOScheduler(jobstores=jobstores)
    for i in range(10):  # 添加10个任务
        job = scheduler.add_job(main, 'date', run_date=datetime.now() + timedelta(seconds=10))
    scheduler.start()

Google后发现AsyncIOScheduler的使用需要在scheduler启动后,需要自己调用asyncio.get_event_loop().run_forever()来启动协程任务。
但是一旦run_forever()则就会阻塞至死。除非有KeyboardInterrupt, SystemExit等异常或者强杀来停止其运行。
此时想到使用Python的signal来定时发送信号,修改后程序如下,可以正常延迟停止(感觉有点像模拟Go的defer)。

# -*- coding: utf-8 -*-
"""
@Time : 2021/7/23
@Auth : hanzhichao
@Desc:
"""
from datetime import datetime, timedelta
import signal
import asyncio

import aiohttp
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

async def get(session):
    url = 'https://httpbin.org/get?a=1'
    async with session.get(url) as res:
        print('get', res.status)
        return await res.text()

async def post(session):
    url = 'https://httpbin.org/post?b=2'
    async with session.post(url) as res:
        print('post', res.status)
        return await res.text()

async def main():
    async with aiohttp.ClientSession() as session:
        await get(session)
        await post(session)

if __name__ == '__main__':
    jobstores = {'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')}
    scheduler = AsyncIOScheduler(jobstores=jobstores)
    for i in range(10):  # 添加10个任务
        job = scheduler.add_job(main, 'date', run_date=datetime.now() + timedelta(seconds=10))
    scheduler.start()
    signal.alarm(20)  # 20秒后终止程序
    asyncio.get_event_loop().run_forever()  # 永远运行

到此这篇关于Python使用signal定时结束AsyncIOScheduler任务的文章就介绍到这了,更多相关Python定时结束AsyncIOScheduler任务内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • python opencv圆、椭圆与任意多边形的绘制实例详解

    python opencv圆、椭圆与任意多边形的绘制实例详解

    在本篇文章里小编给大家整理的是关于python-opencv-圆、椭圆与任意多边形的绘制内容,需要的朋友们可以学习参考下。
    2020-02-02
  • 使用Python实现调整Excel中的行列顺序

    使用Python实现调整Excel中的行列顺序

    调整Excel 行列顺序指的是改变工作表中行或列的位置,以便更好地展示和分析数据,本文将介绍如何通过Python高效地调整Excel 行列顺序,感兴趣的可以了解下
    2025-01-01
  • 使用Python根据一个列表的顺序对其他列表进行排序

    使用Python根据一个列表的顺序对其他列表进行排序

    这篇文章主要介绍了使用Python根据一个列表的顺序对其他列表进行排序,根据列表B中每个元素的下标来获取列表A中对应位置的元素,将其作为排序依据即可,需要的朋友可以参考下
    2023-10-10
  • python实现aes加密及pycryptodome库使用

    python实现aes加密及pycryptodome库使用

    AES算法是高级加密标准,它是一种对称加密算法,AES只有一个密钥,这个密钥既用来加密,也用于解密,这篇文章主要给大家介绍了关于python实现aes加密及pycryptodome库使用的相关资料,需要的朋友可以参考下
    2023-10-10
  • Pytest框架之fixture详解(一)

    Pytest框架之fixture详解(一)

    本文详细讲解了Pytest框架之fixture,文中通过示例代码介绍的非常详细。对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2022-06-06
  • 快速排序的四种python实现(推荐)

    快速排序的四种python实现(推荐)

    这篇文章主要介绍了python实现快速排序算法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-04-04
  • python中django框架通过正则搜索页面上email地址的方法

    python中django框架通过正则搜索页面上email地址的方法

    这篇文章主要介绍了python中django框架通过正则搜索页面上email地址的方法,涉及django框架及正则表达式的使用技巧,需要的朋友可以参考下
    2015-03-03
  • Django中template for如何使用方法

    Django中template for如何使用方法

    这篇文章主要介绍了Django中template for如何使用方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-01-01
  • python 文件读写操作示例源码解读

    python 文件读写操作示例源码解读

    这篇文章主要为大家介绍了python 文件读写操作示例源码解读,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-03-03
  • python实现根据图标提取分类应用程序实例

    python实现根据图标提取分类应用程序实例

    这篇文章主要介绍了python实现根据图标提取分类应用程序实例,是非常实用的应用程序技巧,需要的朋友可以参考下
    2014-09-09

最新评论