Python实现线程池之线程安全队列

 更新时间:2022年05月25日 15:58:28   作者:旺旺小小超  
这篇文章主要为大家详细介绍了Python实现线程池之线程安全队列,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

本文实例为大家分享了Python实现线程池之线程安全队列的具体代码,供大家参考,具体内容如下

一、线程池组成

一个完整的线程池由下面几部分组成,线程安全队列、任务对象、线程处理对象、线程池对象。其中一个线程安全的队列是实现线程池和任务队列的基础,本节我们通过threading包中的互斥量threading.Lock()和条件变量threading.Condition()来实现一个简单的、读取安全的线程队列。

二、线程安全队列的实现

包括put、pop、get等方法,为保证线程安全,读写操作时要添加互斥锁;并且pop操作可以设置等待时间以阻塞当前获取元素的线程,当新元素写入队列时通过条件变量通知解除等待操作。

class ThreadSafeQueue(object):

    def __init__(self, max_size=0):
        self.queue = []
        self.max_size = max_size  # max_size为0表示无限大
        self.lock = threading.Lock()  # 互斥量
        self.condition = threading.Condition()  # 条件变量

    def size(self):
        """
        获取当前队列的大小
        :return: 队列长度
        """
        # 加锁
        self.lock.acquire()
        size = len(self.queue)
        self.lock.release()
        return size

    def put(self, item):
        """
        将单个元素放入队列
        :param item:
        :return:
        """
        # 队列已满 max_size为0表示无限大
        if self.max_size != 0 and self.size() >= self.max_size:
            return ThreadSafeException()

        # 加锁
        self.lock.acquire()
        self.queue.append(item)
        self.lock.release()
        self.condition.acquire()
        # 通知等待读取的线程
        self.condition.notify()
        self.condition.release()

        return item

    def batch_put(self, item_list):
        """
        批量添加元素
        :param item_list:
        :return:
        """
        if not isinstance(item_list, list):
            item_list = list(item_list)

        res = [self.put(item) for item in item_list]

        return res

    def pop(self, block=False, timeout=0):
        """
        从队列头部取出元素
        :param block: 是否阻塞线程
        :param timeout: 等待时间
        :return:
        """
        if self.size() == 0:
            if block:
                self.condition.acquire()
                self.condition.wait(timeout)
                self.condition.release()
            else:
                return None

        # 加锁
        self.lock.acquire()
        item = None
        if len(self.queue):
            item = self.queue.pop()
        self.lock.release()

        return item

    def get(self, index):
        """
        获取指定位置的元素
        :param index:
        :return:
        """
        if self.size() == 0 or index >= self.size():
            return None

        # 加锁
        self.lock.acquire()
        item = self.queue[index]
        self.lock.release()

        return item


class ThreadSafeException(Exception):
    pass

三、测试逻辑

3.1、测试阻塞逻辑

def thread_queue_test_1():
    thread_queue = ThreadSafeQueue(10)

    def producer():
        while True:
            thread_queue.put(random.randint(0, 10))
            time.sleep(2)

    def consumer():
        while True:
            print('current time before pop is %d' % time.time())
            item = thread_queue.pop(block=True, timeout=3)
            # item = thread_queue.get(2)
            if item is not None:
                print('get value from queue is %s' % item)
            else:
                print(item)
            print('current time after pop is %d' % time.time())

    t1 = threading.Thread(target=producer)
    t2 = threading.Thread(target=consumer)
    t1.start()
    t2.start()
    t1.join()
    t2.join()

测试结果:

我们可以看到生产者线程每隔2s向队列写入一个元素,消费者线程当无数据时默认阻塞3s。通过执行时间发现消费者线程确实发生了阻塞,当生产者写入数据时结束当前等待操作。

3.2、测试读写加锁逻辑

def thread_queue_test_2():
    thread_queue = ThreadSafeQueue(10)

    def producer():
        while True:
            thread_queue.put(random.randint(0, 10))
            time.sleep(2)

    def consumer(name):
        while True:
            item = thread_queue.pop(block=True, timeout=1)
            # item = thread_queue.get(2)
            if item is not None:
                print('%s get value from queue is %s' % (name, item))
            else:
                print('%s get value from queue is None' % name)

    t1 = threading.Thread(target=producer)
    t2 = threading.Thread(target=consumer, args=('thread1',))
    t3 = threading.Thread(target=consumer, args=('thread2',))
    t1.start()
    t2.start()
    t3.start()
    t1.join()
    t2.join()
    t3.join()

测试结果:

生产者还是每2s生成一个元素写入队列,消费者开启两个线程进行消费,默认阻塞时间为1s,打印结果显示通过加锁确保每次只有一个线程能获取数据,保证了线程读写的安全。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

相关文章

  • 关于Python中object类特殊方法的解释

    关于Python中object类特殊方法的解释

    在学习Python的过程中我们会发现有一个类 Object类 ,它是所有类的父类,Object类规定了python用于类的内置函数,今天我们就来看看几个常用的特殊方法吧
    2023-03-03
  • 一个小示例告诉你Python语言的优雅之处

    一个小示例告诉你Python语言的优雅之处

    本篇中, 我们展示一下一段非常小的代码, 这段代码十分吸引我们, 因为它使用十分优雅和直接的方式解决了一个常见的问题.
    2014-07-07
  • Python易忽视知识点小结

    Python易忽视知识点小结

    这篇文章主要介绍了Python易忽视知识点,实例分析了Python中容易被忽视的常见操作技巧,需要的朋友可以参考下
    2015-05-05
  • 对python中raw_input()和input()的用法详解

    对python中raw_input()和input()的用法详解

    下面小编就为大家分享一篇对python中raw_input()和input()的用法详解,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-04-04
  • pandas中Series的代码实例解析

    pandas中Series的代码实例解析

    这篇文章主要介绍了pandas中Series的代码实例解析,Series序列,是一种一维的结构,类似于一维列表和ndarray中的一维数组,但是功能比他们要更为强大,Series由两部分组成:索引index和数值values,需要的朋友可以参考下
    2023-07-07
  • 基于scrapy的redis安装和配置方法

    基于scrapy的redis安装和配置方法

    今天小编就为大家分享一篇基于scrapy的redis安装和配置方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-06-06
  • python初学者,用python实现基本的学生管理系统(python3)代码实例

    python初学者,用python实现基本的学生管理系统(python3)代码实例

    这篇文章主要介绍了用python实现学生管理系统,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-04-04
  • Python 获取div标签中的文字实例

    Python 获取div标签中的文字实例

    今天小编就为大家分享一篇Python 获取div标签中的文字实例,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-12-12
  • python [::-1] [::-1,::-1]的具体使用

    python [::-1] [::-1,::-1]的具体使用

    本文主要介绍了python [::-1] [::-1,::-1]的具体使用,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-05-05
  • Pytorch pth 格式转ONNX 格式的详细过程

    Pytorch pth 格式转ONNX 格式的详细过程

    PyTorch 训练的模型,需要在Jetson nano 上部署,jetson 原生提供了TensorRT 的支持,所以一个比较好的方式是把它转换成ONNX 格式,然后在通过ONNX 转换成TensorRT 格式,这篇文章主要介绍了Pytorch pth 格式转ONNX 格式,需要的朋友可以参考下
    2023-05-05

最新评论