Python中asyncio.Queue异步队列的实现

 更新时间:2026年05月15日 09:12:07   作者:无风听海  
本文主要介绍了Python中asyncio.Queue异步队列的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

一、概述

asyncio.Queue 是 Python 标准库 asyncio 模块中的异步队列实现,位于 Lib/asyncio/queues.py。它的设计思路与线程安全的 queue.Queue 类似,但专门为 async/await 协程模型 设计,不是线程安全的

核心定位:在异步并发环境下,作为生产者-消费者模式的协调桥梁,实现协程之间的数据传递与流量控制。

import asyncio

queue = asyncio.Queue(maxsize=10)  # 有界队列,容量10
queue = asyncio.Queue()            # 无界队列(maxsize=0)

二、内部实现原理

从 CPython 源码来看,asyncio.Queue 的核心数据结构非常精巧:

class Queue(mixins._LoopBoundMixin):
    def __init__(self, maxsize=0):
        self._maxsize = maxsize
        self._getters = collections.deque()   # 等待获取的 Future 队列
        self._putters = collections.deque()   # 等待放入的 Future 队列
        self._unfinished_tasks = 0            # 未完成任务计数
        self._finished = locks.Event()        # join() 阻塞用的事件
        self._finished.set()
        self._init(maxsize)
        self._is_shutdown = False

关键设计

内部属性作用
_queue实际存储数据的 collections.deque(FIFO)
_getters当队列为空时,get() 的调用者会创建一个 Future 并挂入此 deque 等待
_putters当队列已满时,put() 的调用者会创建一个 Future 并挂入此 deque 等待
_unfinished_tasks配合 task_done() / join() 实现任务追踪
_finished一个 asyncio.Event,当未完成任务归零时被 set,唤醒 join()

协程挂起与唤醒机制

put()get() 的阻塞并非真正的线程阻塞,而是通过 Future + await 实现的协程挂起:

# put() 核心逻辑(简化)
async def put(self, item):
    while self.full():
        putter = self._get_loop().create_future()
        self._putters.append(putter)
        await putter                  # 协程在此挂起
    self.put_nowait(item)

# get() 核心逻辑(简化)
async def get(self):
    while self.empty():
        getter = self._get_loop().create_future()
        self._getters.append(getter)
        await getter                  # 协程在此挂起
    return self.get_nowait()

当对面操作发生时,通过 _wakeup_next() 唤醒等待者:

def _wakeup_next(self, waiters):
    while waiters:
        waiter = waiters.popleft()
        if not waiter.done():
            waiter.set_result(None)   # 唤醒一个等待的协程
            break

这种 逐个唤醒(one-by-one wakeup) 设计避免了"惊群效应",保证公平性:先等待的协程先被唤醒。

三、完整 API 详解

3.1 构造函数

asyncio.Queue(maxsize=0)
  • maxsize <= 0:无界队列,put() 永远不会阻塞(内存允许的前提下)
  • maxsize > 0:有界队列,队列满时 put() 会挂起等待

3.2 核心方法

async put(item)

将元素放入队列。如果队列已满,挂起当前协程直到有空间。

put_nowait(item)

非阻塞放入。队列满时立即抛出 QueueFull 异常。

async get()

从队列取出并返回一个元素。如果队列为空,挂起当前协程直到有元素可用。

get_nowait()

非阻塞获取。队列空时立即抛出 QueueEmpty 异常。

3.3 任务追踪方法

task_done()

标记一个之前通过 get() 获取的任务已完成。内部将 _unfinished_tasks 减 1。当计数归零时,触发 _finished 事件,解除 join() 的阻塞。

注意:调用次数超过 put() 的次数会抛出 ValueError

async join()

阻塞直到队列中所有任务都被处理完毕(每个 put 进去的任务都收到了对应的 task_done() 调用)。

# 典型用法
await queue.join()   # 等待所有任务完成

3.4 状态查询

方法说明
qsize()返回队列当前元素数量
empty()队列是否为空
full()队列是否已满(maxsize=0 时永远返回 False)
maxsize属性,返回队列容量上限

threading.Queue 不同,由于单线程事件循环的特性,qsize() 的返回值在 await 之前是可靠的——不会被其他线程中断。

3.5 关闭方法(Python 3.13+)

shutdown(immediate=False)

queue.shutdown()              # 优雅关闭:允许消费完剩余元素
queue.shutdown(immediate=True) # 立即关闭:清空队列,中断所有等待
  • 优雅关闭(immediate=False):
    • put() 立即抛出 QueueShutDown
    • get() 可以继续取出已有元素,队列空后才抛出 QueueShutDown
  • 立即关闭(immediate=True):
    • 队列被清空,_unfinished_tasks 相应减少
    • 所有阻塞的 get() 和 put() 立即被唤醒并抛出 QueueShutDown
    • 如果 _unfinished_tasks 归零,join() 也会被解除阻塞

四、三种队列变体

asyncio 提供了三种队列,它们通过重写 _init_get_put 三个内部方法实现不同的出队策略:

4.1Queue(FIFO 先进先出)

def _init(self, maxsize):
    self._queue = collections.deque()
def _put(self, item):
    self._queue.append(item)
def _get(self):
    return self._queue.popleft()

4.2PriorityQueue(优先级队列)

def _init(self, maxsize):
    self._queue = []
def _put(self, item, heappush=heapq.heappush):
    heappush(self._queue, item)
def _get(self, heappop=heapq.heappop):
    return heappop(self._queue)

元素通常为 (priority, data) 元组,数值越小优先级越高

4.3LifoQueue(后进先出 / 栈)

def _init(self, maxsize):
    self._queue = []
def _put(self, item):
    self._queue.append(item)
def _get(self):
    return self._queue.pop()   # 从尾部取出

五、异常体系

异常触发时机
asyncio.QueueEmptyget_nowait() 在空队列上调用
asyncio.QueueFullput_nowait() 在满队列上调用
asyncio.QueueShutDown在已 shutdown() 的队列上调用 put() 或 get()(3.13+)

六、经典模式:生产者-消费者

import asyncio
import random

async def producer(queue: asyncio.Queue, name: str):
    for i in range(5):
        item = f"{name}-item-{i}"
        await queue.put(item)
        print(f"[{name}] 生产: {item}")
        await asyncio.sleep(random.uniform(0.1, 0.5))

async def consumer(queue: asyncio.Queue, name: str):
    while True:
        item = await queue.get()
        print(f"  [{name}] 消费: {item}")
        await asyncio.sleep(random.uniform(0.2, 0.8))
        queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=3)  # 有界队列,产生背压

    # 启动生产者和消费者
    producers = [asyncio.create_task(producer(queue, f"P{i}")) for i in range(2)]
    consumers = [asyncio.create_task(consumer(queue, f"C{i}")) for i in range(3)]

    # 等待所有生产完成
    await asyncio.gather(*producers)
    # 等待队列中所有任务被消费完
    await queue.join()

    # 取消消费者(它们在 await queue.get() 处无限等待)
    for c in consumers:
        c.cancel()

asyncio.run(main())

关键要点

  1. maxsize 实现背压(backpressure):当消费者处理速度跟不上时,生产者自动被挂起,防止内存无限增长
  2. task_done() + join() 配对使用:确保每个任务都被处理完毕后再收尾
  3. 消费者用 while True + cancel() 模式:消费者持续监听,生产结束后由外部取消

七、超时控制

asyncio.Queue 的方法本身不接受 timeout 参数,需要配合 asyncio.wait_for() 使用:

try:
    item = await asyncio.wait_for(queue.get(), timeout=5.0)
except asyncio.TimeoutError:
    print("等待超时,队列中没有新数据")
try:
    await asyncio.wait_for(queue.put(item), timeout=3.0)
except asyncio.TimeoutError:
    print("队列已满,放入超时")

八、线程安全问题与跨线程使用

asyncio.Queue 不是线程安全的。 如果你需要从非 asyncio 线程向队列中投递任务,必须使用 loop.call_soon_threadsafe()

# 从其他线程安全地放入数据
loop.call_soon_threadsafe(queue.put_nowait, item)

或者使用 asyncio.run_coroutine_threadsafe() 调用异步方法:

future = asyncio.run_coroutine_threadsafe(queue.put(item), loop)
future.result()  # 阻塞等待完成

九、与queue.Queue的对比

特性queue.Queueasyncio.Queue
线程安全是(内置锁)否(单线程事件循环)
阻塞方式线程阻塞(真正挂起 OS 线程)协程挂起(让出事件循环)
timeout 参数get(timeout=5)需配合 asyncio.wait_for()
qsize() 可靠性不可靠(多线程竞争)在 await 点之间可靠
join() / task_done()支持支持
shutdown()3.13+ 支持3.13+ 支持
适用场景多线程asyncio 协程

十、高级用法与最佳实践

10.1 优雅关闭(Python 3.13+)

async def main():
    queue = asyncio.Queue()
    # ... 启动 workers ...

    # 生产完毕后,优雅关闭队列
    queue.shutdown()
    await queue.join()

10.2 Python 3.12 及之前的优雅关闭——哨兵值模式

SENTINEL = object()

async def consumer(queue):
    while True:
        item = await queue.get()
        if item is SENTINEL:
            queue.task_done()
            break
        process(item)
        queue.task_done()

# 向每个 consumer 发送一个哨兵
for _ in range(num_consumers):
    await queue.put(SENTINEL)
await queue.join()

10.3 扇出/扇入(Fan-out / Fan-in)

async def pipeline():
    raw_queue = asyncio.Queue()
    processed_queue = asyncio.Queue()

    # Stage 1: 多个 fetcher 往 raw_queue 放数据
    fetchers = [asyncio.create_task(fetch(raw_queue)) for _ in range(5)]
    # Stage 2: 多个 processor 从 raw_queue 取出,处理后放入 processed_queue
    processors = [asyncio.create_task(process(raw_queue, processed_queue)) for _ in range(3)]
    # Stage 3: 单个 writer 从 processed_queue 取出并写入
    writer = asyncio.create_task(write(processed_queue))

10.4 注意事项

  1. 无界队列的内存风险maxsize=0 时生产速度远大于消费速度会导致内存暴涨,生产环境应始终设置合理的 maxsize
  2. 忘记调用 task_done():会导致 join() 永远阻塞
  3. 异常处理:消费者中如果处理逻辑抛出异常,必须确保 task_done() 仍被调用(用 try/finally):
    async def safe_consumer(queue):
        while True:
            item = await queue.get()
            try:
                await process(item)
            finally:
                queue.task_done()
    
  4. 不要混用线程:除非通过 call_soon_threadsaferun_coroutine_threadsafe

十一、总结

asyncio.Queue 是异步编程中不可或缺的协调原语。它的设计简洁优雅——底层通过 Future 实现挂起/唤醒,通过 deque 保证 FIFO 公平性,通过 _unfinished_tasks + Event 实现 join 语义。理解它的内部机制,能帮助你在构建异步任务调度器、流水线处理、背压控制等场景中做出更好的架构决策。

到此这篇关于Python中asyncio.Queue异步队列的实现的文章就介绍到这了,更多相关Python asyncio.Queue异步队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • python批处理将图片进行放大实例代码

    python批处理将图片进行放大实例代码

    最近处理一些规格不一的照片,需要修改成指定尺寸便于打印,下面这篇文章主要给大家介绍了关于python批处理将图片进行放大的相关资料,文中通过实例代码介绍的非常详细,需要的朋友可以参考下
    2021-12-12
  • Python检查判断一个数是不是另一个数的整数次幂实例深究

    Python检查判断一个数是不是另一个数的整数次幂实例深究

    在数学和计算中,确定一个数是否为另一个数的整数次幂是一个常见而重要的问题,例如,我们可能需要判断一个数是否是某个数的平方、立方或其他幂次,本文将探讨在Python中如何实现这一功能,通过数学方法和算法检查一个数是否是另一个数的整数次幂
    2023-12-12
  • Python正则表达式中group与groups的用法详解

    Python正则表达式中group与groups的用法详解

    本文主要介绍了Python正则表达式中group与groups的用法详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-02-02
  • 使用pytorch提取卷积神经网络的特征图可视化

    使用pytorch提取卷积神经网络的特征图可视化

    这篇文章主要给大家介绍了关于使用pytorch提取卷积神经网络的特征图可视化的相关资料,文中给出了详细的思路以及示例代码,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2022-03-03
  • 用Python将一个列表分割成小列表的实例讲解

    用Python将一个列表分割成小列表的实例讲解

    今天小编就为大家分享一篇用Python将一个列表分割成小列表的实例讲解,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-07-07
  • Python使用Tabulate库实现格式化表格数据

    Python使用Tabulate库实现格式化表格数据

    在数据分析和软件开发中,表格数据的展示是一个常见的需求,无论是简单的数据报告,还是复杂的数据可视化,表格都是一种直观且有效的信息展示方式,tabulate库是一个非常实用的工具,它可以帮助我们轻松地将数据格式化为各种表格形式,本文将详细介绍tabulate库的使用方法
    2025-02-02
  • Python字符串格式化f-string多种功能实现

    Python字符串格式化f-string多种功能实现

    这篇文章主要介绍了Python字符串格式化f-string格式多种功能实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-05-05
  • Python之csv文件从MySQL数据库导入导出的方法

    Python之csv文件从MySQL数据库导入导出的方法

    今天小编就为大家分享一篇Python之csv文件从MySQL数据库导入导出的方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-06-06
  • 返回最大值的index pytorch方式

    返回最大值的index pytorch方式

    这篇文章主要介绍了返回最大值的index pytorch方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-07-07
  • Python常见异常处理总结

    Python常见异常处理总结

    这篇文章主要介绍了Python常见异常处理总结,文章围绕主题展开详细的内容介绍,具有一定的参考价值,需要的朋友可以参考一下
    2022-07-07

最新评论