python异步编程之asyncio高阶API的使用详解

 更新时间:2024年01月09日 14:00:19   作者:金色旭光  
asyncio中函数可以分为高阶函数和低阶函数,通常开发中使用更多的是高阶函数,本文主要为大家介绍了asyncio中常用的高阶函数,需要的可以参考下

1.asyncio 高阶API列表

asyncio中函数可以分为高阶函数和低阶函数。低阶函数用于调用事件循环、linux 套接字、信号等更底层的功能,高阶函数是屏蔽了更多底层细节的任务并发,任务执行函数。通常开发中使用更多的是高阶函数。本篇主要介绍asyncio中常用的高阶函数。

由于asyncio在不同的版本中有差异,本文以及本系列都以python3.10为准。

函数功能
run()创建事件循环,运行一个协程,关闭事件循环。
create_task()创建一个asyncio的Task对象
await sleep()休眠几秒
await gather()并发执行所有事件的调度和等待
await wait_for()有超时控制的运行
await shield()屏蔽取消操作
await wait()完成情况的监控器
current_task()返回当前Task对象
all_tasks()返回事件循环中所有的task对象
TaskTask对象
to_thread()在不同的 OS 线程中异步地运行一个函数
run_coroutine_threadsafe()从其他OS线程中调度一个协程
for in as_completed()用 for 循环监控完成情况

2.run

函数原型:

asyncio.run(coro, *, debug=False)

功能:创建事件循环,运行传入的协程。该函数总是会创建一个新的事件循环并在结束时关闭它,应该被当做asyncio程序的主入口点。run() 函数是用来创建事件,将task加入事件,运行事件的函数。

async def main():
    await asyncio.sleep(1)
    print('hello')

asyncio.run(main())

run() 从功能上等价于以下低阶API。获取一个事件循环,创建一个task,加入事件循环。

loop = asyncio.get_event_loop()
task = loop.create_task(main())
loop.run_until_complete(task)

3.create_task

函数原型:

asyncio.create_task(coro, *, name=None)

功能:将协程函数封装成一个Task。协程函数没有生命周期,但是Task有生命周期。

将协程打包为一个 Task 并自动寻找事件循环加入。返回 Task 对象。该任务会在 get_running_loop() 返回的循环中执行,如果当前线程没有在运行的循环则会引发 RuntimeError。

async def coro():
    await asyncio.sleep(1)
    print("i am coro")


async def main():
    task = asyncio.create_task(coro())
    print(f"task状态:{task._state}")
    await asyncio.sleep(2)
    print(f"task状态:{task._state}")
    print("i am main")

asyncio.run(main())

结果:

task状态:PENDING
i am coro task
状态:FINISHED
i am main

结果分析:

可以看到task运行中的状态和结束的生命周期状态

4.gather

函数原型:

asyncio.gather(*aws, return_exceptions=False)

功能:

并发执行所有可等待对象,收集任务结果,返回所有已经完成的task的结果。结果将是一个由所有返回值组成的列表。结果值的顺序与传入的task的顺序一致。可等待对象可以是协程和task。

如果序列中是协程而不是task,那么会将其自动封装成task加入事件循环。

import asyncio

async def coro(value):
    print(f"hello coro{value}")
    return f"coro{value}"

async def main():
    tasks = [coro(i) for i in range(5)]
    res = await asyncio.gather(*tasks)
    for i in res:
        print(i)

asyncio.run(main())

结果:

hello coro0
hello coro1
hello coro2
hello coro3
hello coro4
coro0
coro1
coro2
coro3
coro4

结果分析:

获取了所有协程的返回值,并且返回的顺序和任务的顺序一致。

5.wait

函数原型:

asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED)

功能:

并发地运行序列中的可等待对象,并进入阻塞状态直到满足 return_when 所指定的条件。将task任务结果收集起来,返回两个 Task/Future 集合: (done, pending)。done是已经完成的任务,pending是未完成的任务,未完成的原因可能是超时或return_when策略。

aws:

aws中保存的是task而不是协程,从3.8起不建议传入协程,3.11将不再支持传入协程。

timeout:

如指定 timeout (float 或 int 类型) 则它将被用于控制返回之前等待的最长秒数。

请注意此函数不会引发 asyncio.TimeoutError。当超时发生时,未完成的 Future 或 Task 将不会继续执行,不会返回结果。

return_when:

return_when 指定此函数应在何时返回。它必须为以下参数之一:

参数描述
FIRST_COMPLETED函数将在任意可等待对象结束或取消时返回。
FIRST_EXCEPTION函数将在任意可等待对象因引发异常而结束时返回。当没有引发任何异常时它就相当于 ALL_COMPLETED。
ALL_COMPLETED函数将在所有可等待对象结束或取消时返回。

基础使用示例:

import asyncio


async def coro(value):
    print(f"hello coro{value}")
    return f"coro{value}"

async def main():
    tasks = [asyncio.create_task(coro(i)) for i in range(5)]
    done, pending = await asyncio.wait(tasks)
    for i in done:
        print(i.result())

asyncio.run(main())

结果:

hello coro0
hello coro1
hello coro2
hello coro3
hello coro4
coro1
coro2
coro0
coro3
coro4

结果分析:

返回结果和执行顺序并不是一致的

指定超时时间:

import asyncio
from asyncio import FIRST_COMPLETED

async def coro(value):
    print(f"hello coro{value}")
    await asyncio.sleep(value)
    return f"coro{value}"

async def main():
    tasks = [asyncio.create_task(coro(i)) for i in range(5)]
    done, pending = await asyncio.wait(tasks, timeout=3)

    print("---------finish----------")
    for i in done:
        print(i.result())

    print("---------pending----------")
    for i in pending:
        print(i)

asyncio.run(main())

hello coro0
hello coro1
hello coro2
hello coro3
hello coro4
---------finish----------
coro1
coro2
coro0
---------pending----------
<Task pending name='Task-5' coro=<coro() running at /Users/ljk/Documents/code/daily_dev/async_demo/wait_demo.py:6> wait_for=<Future finished result=None>>
<Task pending name='Task-6' coro=<coro() running at /Users/ljk/Documents/code/daily_dev/async_demo/wait_demo.py:6> wait_for=<Future pending cb=[Task.task_wakeup()]>>

结果分析:

超时未完成的task会保存在pending中,未完成的task在超时之后不会继续执行,没有返回结果。

return_when配置任意任务完成就返回:

import asyncio
from asyncio import FIRST_COMPLETED

async def coro(value):
    print(f"hello coro{value}")
    await asyncio.sleep(value)
    return f"coro{value}"

async def main():
    tasks = [asyncio.create_task(coro(i)) for i in range(5)]
    done, pending = await asyncio.wait(tasks, return_when=FIRST_COMPLETED)

    print("---------finish----------")
    for i in done:
        print(i.result())

    print("---------pending----------")
    for i in pending:
        print(i)

asyncio.run(main())

结果:

hello coro0
hello coro1
hello coro2
hello coro3
hello coro4
---------finish----------
coro0
---------pending----------
<Task pending name='Task-5' coro=<coro() running at /Users/ljk/Documents/code/daily_dev/async_demo/wait_demo.py:6> wait_for=<Future pending cb=[Task.task_wakeup()]>>
<Task pending name='Task-3' coro=<coro() running at /Users/ljk/Documents/code/daily_dev/async_demo/wait_demo.py:6> wait_for=<Future pending cb=[Task.task_wakeup()]>>
<Task pending name='Task-4' coro=<coro() running at /Users/ljk/Documents/code/daily_dev/async_demo/wait_demo.py:6> wait_for=<Future pending cb=[Task.task_wakeup()]>>
<Task pending name='Task-6' coro=<coro() running at /Users/ljk/Documents/code/daily_dev/async_demo/wait_demo.py:6> wait_for=<Future pending cb=[Task.task_wakeup()]>>

结果分析:

获取到任意结果就返回,未完成的task保存在pending中。未完成的task在超时之后不会继续执行。

6.as_completed

函数原型:

asyncio.as_completed(aws, *, timeout=None)

说明:并发执行aws中保存的可等待对象,返回一个协程的迭代器。可以从迭代器中取出最先执行完成的task的结果。返回结果和执行顺序不一致。aws中可以是task或协程序列。

import asyncio


async def coro(value):
    print(f"hello coro{value}")
    return f"coro{value}"

async def main():
    tasks = [coro(i) for i in range(5)]
    for item in asyncio.as_completed(tasks):
        res = await item
        print(res)


asyncio.run(main())

结果:

hello coro2
hello coro3
hello coro4
hello coro1
hello coro0
coro2
coro3
coro4
coro1
coro0

结果分析:

所有任务都会执行完成,没有超时配置。返回顺序和执行顺序无关。

7.gather、wait、as_completed 异同点小结

asyncio协程体系中可以实现创建多个任务并发执行的函数有以下三个:

  • asyncio.gather
  • asyncio.wait
  • asyncio.as_completed

不同之处比较:

特性/函数gatherwaitas_completed
入参同时支持task和协程序列只支持task序列同时支持task和协程序列
获取结果顺序有序,和并发序列顺序相同无序,和并发序列无关无序,和并发序列无关
返回返回结果列表,保存的是函数返回值。返回元组done、pending。元组中保存的是task,而非task 的函数返回值返回一个迭代器,从中可迭代出函数返回值。

8.wait for

函数原型:

asyncio.wait_for(aw, timeout)

功能:执行单个可等待对象,指定 timeout 秒数后超时

等待可等待对象完成,指定timeout秒数后超时。和gather类似,可以自动将协程转化成任务加入循环。

timeout 可以为 None,也可以为 float 或 int 型数值表示的等待秒数。如果 timeout 为 None,则等待直到完成。
如果发生超时,任务将取消并引发 asyncio.TimeoutError。

async def coro():
    # 睡眠5s
    await asyncio.sleep(3600)
    print('finish!')

async def main():
    # Wait for at most 1 second
    try:
        await asyncio.wait_for(coro(), timeout=1.0)
    except asyncio.TimeoutError:
        print('timeout!')

asyncio.run(main())

结果:

timeout!

以上就是python异步编程之asyncio高阶API的使用详解的详细内容,更多关于python asyncio高阶API的资料请关注脚本之家其它相关文章!

相关文章

  • python实现二分查找算法

    python实现二分查找算法

    这篇文章主要为大家详细介绍了python实现二分查找算法,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-09-09
  • Django之编辑时根据条件跳转回原页面的方法

    Django之编辑时根据条件跳转回原页面的方法

    今天小编就为大家分享一篇Django之编辑时根据条件跳转回原页面的方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2019-08-08
  • 通过Python编写一个简单登录功能过程解析

    通过Python编写一个简单登录功能过程解析

    这篇文章主要介绍了通过Python编写一个简单登录功能过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-09-09
  • Python中线程锁的使用介绍

    Python中线程锁的使用介绍

    大家好,本篇文章主要讲的是Python中线程锁的使用介绍,感兴趣的同学赶快来看一看吧,对你有帮助的话记得收藏一下,方便下次浏览
    2022-01-01
  • 解决Djang2.0.1中的reverse导入失败的问题

    解决Djang2.0.1中的reverse导入失败的问题

    今天小编就为大家分享一篇解决Djang2.0.1中的reverse导入失败的问题,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2019-08-08
  • PO模式在selenium自动化测试框架的优势

    PO模式在selenium自动化测试框架的优势

    大家都知道po模式可以提高代码的可读性和减少了代码的重复,但是相对的缺点还有,今天通过本文一起学习下PO模式在selenium自动化测试框架的优势,需要的朋友可以参考下
    2022-03-03
  • 你知道怎么用Python监控聊天记录吗

    你知道怎么用Python监控聊天记录吗

    今天有位同事和我吐槽关于公司 XX 的问题,我告诉他不要在公司电脑上说这些,因为很可能会被狙击,这位同事刚开始还不信,直到我写了这边文章,他才恍然大悟
    2021-10-10
  • 详解python实现数据归一化处理的方式:(0,1)标准化

    详解python实现数据归一化处理的方式:(0,1)标准化

    这篇文章主要介绍了详解python实现数据归一化处理的方式:(0,1)标准化,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-07-07
  • Python标准库之zipfile和tarfile模块的使用

    Python标准库之zipfile和tarfile模块的使用

    zipfile和tarfile是Python中常用的压缩包模块,本文将通过示例详细讲解一下这两个模块的使用方法,快跟随小编一起学习学习吧
    2022-06-06
  • 浅谈Python中range和xrange的区别

    浅谈Python中range和xrange的区别

    本篇文章主要介绍了浅谈Python中range和xrange的区别,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-12-12

最新评论