Python异步编程中asyncio.gather的并发控制详解

 更新时间:2025年03月24日 15:49:02   作者:傻啦嘿哟  
在Python异步编程生态中,asyncio.gather是并发任务调度的核心工具,本文将通过实际场景和代码示例,展示如何结合信号量机制实现精准并发控制,希望对大家有所帮助

在Python异步编程生态中,asyncio.gather是并发任务调度的核心工具。然而当面对海量任务时,不加控制的并发可能引发资源耗尽、服务降级等问题。本文将通过实际场景和代码示例,展示如何结合信号量机制实现精准并发控制,既保证吞吐量又避免系统过载。

一、asyncio.gather的原始行为解析

asyncio.gather的设计初衷是批量执行异步任务,其默认行为类似于"全速冲刺":

import asyncio
 
async def task(n):
    print(f"Task {n} started")
    await asyncio.sleep(1)
    print(f"Task {n} completed")
    return n
 
async def main():
    tasks = [task(i) for i in range(10)]
    results = await asyncio.gather(*tasks)
    print(f"Total results: {len(results)}")
 
asyncio.run(main())

在这个示例中,10个任务会立即全部启动,1秒后几乎同时完成。这种"全并发"模式在以下场景存在隐患:

网络请求:同时发起数千个HTTP请求可能被目标服务器封禁

文件IO:磁盘IO密集型操作会拖慢系统响应

数据库连接:超过连接池限制导致报错

二、信号量控制法:给并发装上"节流阀"

asyncio.Semaphore通过限制同时执行的任务数,实现精准并发控制。其核心机制是:

初始化时设定最大并发数(如10)

每个任务执行前必须获取信号量

任务完成后释放信号量

async def controlled_task(sem, n):
    async with sem:  # 获取信号量
        print(f"Task {n} acquired semaphore")
        await asyncio.sleep(1)
        print(f"Task {n} released semaphore")
        return n
 
async def main():
    sem = asyncio.Semaphore(3)  # 最大并发3
    tasks = [controlled_task(sem, i) for i in range(10)]
    results = await asyncio.gather(*tasks)
    print(f"Total results: {len(results)}")
 
asyncio.run(main())

执行效果:

始终只有3个任务在执行
每完成1个任务,立即启动新任务
总耗时≈4秒(10/3向上取整)

三、进阶控制策略

3.1 动态调整并发数

通过监控队列长度动态调整信号量:

async def dynamic_control():
    sem = asyncio.Semaphore(5)
    task_queue = asyncio.Queue()
    
    # 生产者
    async def producer():
        for i in range(20):
            await task_queue.put(i)
    
    # 消费者
    async def consumer():
        while True:
            item = await task_queue.get()
            async with sem:
                print(f"Processing {item}")
                await asyncio.sleep(1)
            task_queue.task_done()
    
    # 动态调整
    def monitor(queue):
        while True:
            size = queue.qsize()
            if size > 10:
                sem._value = max(1, sem._value - 1)
            elif size < 5:
                sem._value = min(10, sem._value + 1)
            asyncio.sleep(1)
    
    await asyncio.gather(
        producer(),
        *[consumer() for _ in range(3)],
        asyncio.to_thread(monitor, task_queue)
    )
 
asyncio.run(dynamic_control())

3.2 分批执行策略

对于超大规模任务集,可采用分批处理:

def chunked(iterable, chunk_size):
    for i in range(0, len(iterable), chunk_size):
        yield iterable[i:i+chunk_size]
 
async def batch_processing():
    all_tasks = [task(i) for i in range(100)]
    
    for batch in chunked(all_tasks, 10):
        print(f"Processing batch: {len(batch)} tasks")
        await asyncio.gather(*batch)
 
asyncio.run(batch_processing())

优势:

  • 避免内存爆炸
  • 方便进度跟踪
  • 支持中间状态保存

四、性能对比与最佳实践

控制方式吞吐量资源占用实现复杂度适用场景
无控制小型任务集
固定信号量通用场景
动态信号量中高中低需要弹性控制的场景
分批处理超大规模任务集

最佳实践建议:

网络请求类任务:并发数控制在5-20之间

文件IO操作:并发数不超过CPU逻辑核心数*2

数据库操作:遵循连接池最大连接数限制

始终设置合理的超时时间:

try:
    await asyncio.wait_for(task(), timeout=10)
except asyncio.TimeoutError:
    print("Task timed out")

五、常见错误与解决方案

错误1:信号量未正确释放

# 错误示例:缺少async with
sem = asyncio.Semaphore(3)
sem.acquire()
await task()
sem.release()  # 容易忘记释放

解决方案:

# 正确用法
async with sem:
    await task()  # 自动获取和释放

错误2:任务异常导致信号量泄漏

async def risky_task():
    async with sem:
        raise Exception("Oops!")  # 异常导致sem未释放

解决方案:

async def safe_task():
    sem_acquired = False
    try:
        async with sem:
            sem_acquired = True
            # 执行可能出错的操作
    finally:
        if not sem_acquired:
            sem.release()

结语

asyncio.gather配合信号量机制,就像给异步程序装上了智能节流阀。通过合理设置并发参数,既能让程序高效运行,又能避免系统过载。实际开发中应根据任务类型、资源限制和SLA要求,选择最合适的并发控制策略。记住:优秀的并发控制不是追求最大速度,而是找到性能与稳定性的最佳平衡点。

到此这篇关于Python异步编程中asyncio.gather的并发控制详解的文章就介绍到这了,更多相关Python asyncio.gather内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • python实现的希尔排序算法实例

    python实现的希尔排序算法实例

    这篇文章主要介绍了python实现的希尔排序算法,实例分析了基于Python实现希尔排序的相关技巧,需要的朋友可以参考下
    2015-07-07
  • Python基于OpenCV的视频图像处理详解

    Python基于OpenCV的视频图像处理详解

    OpenCV是一个开源的,跨平台的计算机视觉库,它采用优化的C/C++代码编写,能够充分利用多核处理器的优势。本文主要和大家来聊聊基于Python OpenCv的视频图像处理,感兴趣的可以了解一下
    2023-02-02
  • 利用python控制Autocad:pyautocad方式

    利用python控制Autocad:pyautocad方式

    这篇文章主要介绍了利用python控制Autocad:pyautocad方式,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-06-06
  • Python中的并发处理之asyncio包使用的详解

    Python中的并发处理之asyncio包使用的详解

    本篇文章主要介绍了Python中的并发处理之asyncio包使用的详解,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-04-04
  • 只用四步修改jupyter的工作路径/存储路径

    只用四步修改jupyter的工作路径/存储路径

    为了方便用户使用以及减少系统盘的占用,可以将Jupyter的默认工作路径修改到电脑中常用的路径中,这篇文章主要给大家介绍了关于如何只用四步修改jupyter的工作路径/存储路径的相关资料,需要的朋友可以参考下
    2023-12-12
  • Python用正则表达式实现爬取古诗文网站信息

    Python用正则表达式实现爬取古诗文网站信息

    这篇文章主要给大家介绍了关于Python如何利用正则表达式爬取爬取古诗文网站信息,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-12-12
  • python多线程和多进程关系详解

    python多线程和多进程关系详解

    在本篇文章里小编给大家整理的是一篇关于python多线程和多进程之间的联系的基础内容,有兴趣的朋友们可以学习下。
    2020-12-12
  • python字典各式各样操作从基础到高级全面示例详解

    python字典各式各样操作从基础到高级全面示例详解

    在Python中,字典(Dictionary)是一种强大而灵活的数据结构,它允许你存储和检索键值对,本文将深入探讨Python中各式各样的字典操作,包括基本操作、高级操作以及一些实用的技巧,通过全面的示例代码,将展示如何充分发挥字典在Python编程中的优势
    2023-12-12
  • python+opencv+caffe+摄像头做目标检测的实例代码

    python+opencv+caffe+摄像头做目标检测的实例代码

    今天小编就为大家分享一篇python+opencv+caffe+摄像头做目标检测的实例代码,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-08-08
  • pytorch+lstm实现的pos示例

    pytorch+lstm实现的pos示例

    今天小编就为大家分享一篇pytorch+lstm实现的pos示例,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-01-01

最新评论