Queue队列中join()与task_done()的关系及说明

 更新时间:2023年02月25日 09:52:49   作者:Terry_dong  
这篇文章主要介绍了Queue队列中join()与task_done()的关系及说明,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

join()与task_done()的关系

在网上大多关于join()与task_done()的结束原话是这样的:

  • Queue.task_done() 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号 
  • Queue.join() 实际上意味着等到队列为空,再执行别的操作

但是可能很多人还是不太理解,这里以我自己的理解来阐述这两者的关联。

理解

如果线程里每从队列里取一次,但没有执行task_done(),则join无法判断队列到底有没有结束,在最后执行个join()是等不到结果的,会一直挂起。

可以理解为,每task_done一次 就从队列里删掉一个元素,这样在最后join的时候根据队列长度是否为零来判断队列是否结束,从而执行主线程。

下面看个自己写的例子:

下面这个例子,会在join()的地方无限挂起,因为join在等队列清空,但是由于没有task_done,它认为队列还没有清空,还在一直等。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
'''threading test'''
import threading
import queue
from time import sleep
#之所以为什么要用线程,因为线程可以start后继续执行后面的主线程,可以put数据,如果不是线程直接在get阻塞。
class Mythread(threading.Thread):
 def __init__(self,que):
 threading.Thread.__init__(self)
 self.queue = que
 def run(self):
 while True:
 sleep(1)
 if self.queue.empty(): #判断放到get前面,这样可以,否则队列最后一个取完后就空了,直接break,走不到print
 break
 item = self.queue.get()
 print(item,'!')
 #self.queue.task_done()
 return
que = queue.Queue()
tasks = [Mythread(que) for x in range(1)]
for x in range(10):
 
 que.put(x) #快速生产
for x in tasks:
 t = Mythread(que) #把同一个队列传入2个线程
 t.start()
 
que.join()
 
print('---success---')

如果把self.queue.task_done()  注释去掉,就会顺利执行完主程序。

这就是“ Queue.task_done()函数向任务已经完成的队列发送一个信号”这句话的意义,能够让join()函数能判断出队列还剩多少,是否清空了。

而事实上我们看下queue的源码可以看出确实是执行一次未完成队列减一:

 def task_done(self):
 '''Indicate that a formerly enqueued task is complete.
 Used by Queue consumer threads. For each get() used to fetch a task,
 a subsequent call to task_done() tells the queue that the processing
 on the task is complete.
 If a join() is currently blocking, it will resume when all items
 have been processed (meaning that a task_done() call was received
 for every item that had been put() into the queue).
 Raises a ValueError if called more times than there were items
 placed in the queue.
 '''
 with self.all_tasks_done:
 unfinished = self.unfinished_tasks - 1
 if unfinished <= 0:
 if unfinished < 0:
 raise ValueError('task_done() called too many times')
 self.all_tasks_done.notify_all()
 self.unfinished_tasks = unfinished
 

快速生产-快速消费

上面的演示代码是快速生产-慢速消费的场景,我们可以直接用task_done()与join()配合,来让empty()判断出队列是否已经结束。

当然,queue我们可以正确判断是否已经清空,但是线程里的get队列是不知道,如果没有东西告诉它,队列空了,因此get还会继续阻塞,那么我们就需要在get程序中加一个判断,如果empty()成立,break退出循环,否则get()还是会一直阻塞。

慢速生产-快速消费

但是如果生产者速度与消费者速度相当,或者生产速度小于消费速度,则靠task_done()来实现队列减一则不靠谱,队列会时常处于供不应求的状态,常为empty,所以用empty来判断则不靠谱。

那么这种情况会导致 join可以判断出队列结束了,但是线程里不能依靠empty()来判断线程是否可以结束。

我们可以在消费队列的每个线程最后塞入一个特定的“标记”,在消费的时候判断,如果get到了这么一个“标记”,则可以判定队列结束了,因为生产队列都结束了,也不会再新增了。

代码如下:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
'''threading test'''
import threading
import queue
from time import sleep
#之所以为什么要用线程,因为线程可以start后继续执行后面的主线程,可以put数据,如果不是线程直接在get阻塞。
class Mythread(threading.Thread):
 def __init__(self,que):
 threading.Thread.__init__(self)
 self.queue = que
 def run(self):
 while True:
 item = self.queue.get()
 self.queue.task_done() #这里要放到判断前,否则取最后最后一个的时候已经为空,直接break,task_done执行不了,join()判断队列一直没结束
 if item == None:
 break
 print(item,'!')
return
que = queue.Queue()
tasks = [Mythread(que) for x in range(1)]
 #快速生产
for x in tasks:
 t = Mythread(que) #把同一个队列传入2个线程
 t.start()
for x in range(10):
 sleep(1)
 que.put(x)
for x in tasks:
 que.put(None)
que.join()
print('---success---')

注意点

put队列完成的时候千万不能用task_done(),否则会报错:

task_done() called too many times

因为该方法仅仅表示get成功后,执行的一个标记。

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • python基础之文件处理知识总结

    python基础之文件处理知识总结

    今天带大家了解python文件处理的相关知识,文中介绍的非常详细,对正在学习python的小伙伴们很有帮助,需要的朋友可以参考下
    2021-05-05
  • Django模型验证器介绍与源码分析

    Django模型验证器介绍与源码分析

    这篇文章主要给大家介绍了关于Django模型验证器与源码分析的相关资料,文中通过图文介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-09-09
  • python常用的正则表达式大全

    python常用的正则表达式大全

    正则表达式是一个特殊的字符序列,它能帮助你方便的检查一个字符串是否与某种模式匹配,下面这篇文章主要给大家介绍了关于python常用正则表达式的相关资料,文中通过图文以及实例代码介绍的非常详细,需要的朋友可以参考下
    2022-02-02
  • 详解Python自动化中这八大元素定位

    详解Python自动化中这八大元素定位

    今天给大家带来的是关于Python的相关知识,文章围绕着Python自动化中这八大元素定位展开,文中有非常详细的介绍及代码示例,需要的朋友可以参考下
    2021-06-06
  • Python PDF转化wolrd代码的写法小结

    Python PDF转化wolrd代码的写法小结

    将PDF文件转换为Word文档的过程通常需要使用一些外部库来实现,因为Python本身并不直接支持这种转换,这篇文章主要介绍了Python PDF转化wolrd代码的写法小结,需要的朋友可以参考下
    2024-06-06
  • Python+selenium实现自动循环扔QQ邮箱漂流瓶

    Python+selenium实现自动循环扔QQ邮箱漂流瓶

    这篇文章主要为大家详细介绍了Python+selenium实现自动循环扔QQ邮箱漂流瓶,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2018-05-05
  • Python必备技巧之Pandas数据合并函数

    Python必备技巧之Pandas数据合并函数

    Pandas中一共有五个数据合并函数,分别为:concat、append、merge、join、combine,本文详细讲解这五个函数的使用方法,需要的可以参考一下
    2022-03-03
  • python 装饰器详解与应用范例

    python 装饰器详解与应用范例

    装饰器是 Python 的一个重要部分。简单地说:他们是修改其他函数的功能的函数。他们有助于让我们的代码更简短,也更Pythonic。大多数初学者不知道在哪儿使用它们,所以我将要分享下,哪些区域里装饰器可以让你的代码更简洁。 首先,让我们讨论下如何写你自己的装饰器
    2021-11-11
  • 解读torch.nn.GRU的输入及输出示例

    解读torch.nn.GRU的输入及输出示例

    这篇文章主要介绍了解读torch.nn.GRU的输入及输出示例,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-01-01
  • pycharm安装opencv-python报错的解决

    pycharm安装opencv-python报错的解决

    本文主要介绍了pycharm安装opencv-python报错的解决,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-07-07

最新评论