Python 多线程通信的常用方法汇总

 更新时间:2026年01月14日 09:10:01   作者:Looooking  
Python中常用的线程通信方式主要有共享变量、Queue、Event、Condition、Semaphore和Barrier,这些方法都通过锁机制保证线程安全,用于实现多线程之间的通信和数据交换,本文介绍Python 之多线程通信的几种常用方法,感兴趣的朋友跟随小编一起看看吧

一般来说,大部分遇到的多线程,只要能各自完成好各自的任务即可。少数情况下,不同线程可能需要在线程安全的情况下,进行通信和数据交换。Python 中常用的线程通信有以下方法。

共享变量

共享变量是最简单的线程通信方式,比如进行计数更新等操作,但需要配合锁来保证线程安全。

import threading
# 共享变量
shared_data = 0
lock = threading.Lock()
def worker():
    global shared_data
    with lock:  # 使用锁保证线程安全
        shared_data += 1
threads = []
for i in range(5):
    t = threading.Thread(target=worker)
    threads.append(t)
    t.start()
for t in threads:
    t.join()
print(f"最终结果: {shared_data}")  # 应该是5

Queue队列

最常用的线程安全通信方式,是生产者-消费者模式的理想选择。比如使用优先级队列优先消费高优先级的数据(序号越低,优先级越高,越优先消费)。

from time import sleep
from random import random, randint
from threading import Thread
from queue import PriorityQueue
queue = PriorityQueue()
def producer(queue):
    print('Producer: Running')
    for i in range(5):
        # create item with priority
        value = random()
        priority = randint(0, 5)
        item = (priority, value)
        queue.put(item)
    # wait for all items to be processed
    queue.join()
    queue.put(None)
    print('Producer: Done')
def consumer(queue):
    print('Consumer: Running')
    while True:
        # get a unit of work
        item = queue.get()
        if item is None:
            break
        sleep(item[1])
        print(item)
        queue.task_done()
    print('Consumer: Done')
producer = Thread(target=producer, args=(queue,))
producer.start()
consumer = Thread(target=consumer, args=(queue,))
consumer.start()
producer.join()
consumer.join()
Producer: Running
Consumer: Running
(0, 0.9945246262101098)
(2, 0.35853829355476663)
(2, 0.4794139132317813)
(3, 0.8460111545035349)
(5, 0.6047655828611674)
Producer: Done
Consumer: Done

Event事件

线程模提供了 Event 用于线程间的简单信号传递。Event 对象管理内部标志的状态。标志初始为False,并通过 set() 方法变为 True,通过 clear() 方法重新设置为 False。wait() 方法会阻塞,直到标志变为 True。

比如下面使用 Event 通知,模拟交通信号灯周期性变化及车辆通行之间的协同运行。车辆根据信号灯的状态判断是否通行还是等待;车辆通行完毕以后,只剩下信号灯周期性变化。

from threading import *
import time
def signal_state():
    while True:
        time.sleep(5)
        print("Traffic Police Giving GREEN Signal")
        event.set()
        time.sleep(10)
        print("Traffic Police Giving RED Signal")
        event.clear()
def traffic_flow():
    num = 0
    while num < 10:
        print("Waiting for GREEN Signal")
        event.wait()
        print("GREEN Signal ... Traffic can move")
        while event.is_set():
            num = num + 1
            print("Vehicle No:", num, " Crossing the Signal")
            time.sleep(2)
        print("RED Signal ... Traffic has to wait")
event = Event()
t1 = Thread(target=signal_state)
t2 = Thread(target=traffic_flow)
t1.start()
t2.start()
Waiting for GREEN Signal
Traffic Police Giving GREEN Signal
GREEN Signal ... Traffic can move
Vehicle No: 1  Crossing the Signal
Vehicle No: 2  Crossing the Signal
Vehicle No: 3  Crossing the Signal
Vehicle No: 4  Crossing the Signal
Vehicle No: 5  Crossing the Signal
Traffic Police Giving RED Signal
RED Signal ... Traffic has to wait
Waiting for GREEN Signal
Traffic Police Giving GREEN Signal
GREEN Signal ... Traffic can move
Vehicle No: 6  Crossing the Signal
Vehicle No: 7  Crossing the Signal
Vehicle No: 8  Crossing the Signal
Vehicle No: 9  Crossing the Signal
Vehicle No: 10  Crossing the Signal
Traffic Police Giving RED Signal
RED Signal ... Traffic has to wait
Traffic Police Giving GREEN Signal
Traffic Police Giving RED Signal
Traffic Police Giving GREEN Signal
Traffic Police Giving RED Signal
...

Condition条件对象

线程模块中的 Condition 类实现了条件变量对象。条件对象会强制一个或多个线程等待,直到被另一个线程通知。Condition 用于更复杂的线程同步,允许线程等待特定条件。比如上面的 Event 的实现,其内部也是在使用 Condition。

import threading
import time
# 共享资源
buffer = []
MAX_ITEMS = 5
condition = threading.Condition()
def producer():
    """生产者"""
    for i in range(10):
        time.sleep(0.2)
        with condition:
            while len(buffer) >= MAX_ITEMS:
                print("Buffer full,wait...")
                condition.wait()  # 等待缓冲区有空位
            item = f"item-{i}"
            buffer.append(item)
            print(f"Producer: {item}, Buffer: {len(buffer)}")
            condition.notify_all()  # 通知消费者
def consumer():
    """消费者"""
    for i in range(10):
        time.sleep(0.8)
        with condition:
            while len(buffer) == 0:
                print("Buffer empty,wait...")
                condition.wait()  # 等待缓冲区有数据
            item = buffer.pop(0)
            print(f"Consumer: {item}, Buffer: {len(buffer)}")
            condition.notify_all()  # 通知生产者
# 创建线程
prod = threading.Thread(target=producer)
cons = threading.Thread(target=consumer)
prod.start()
cons.start()
prod.join()
cons.join()
Producer: item-0, Buffer: 1
Producer: item-1, Buffer: 2
Producer: item-2, Buffer: 3
Consumer: item-0, Buffer: 2
Producer: item-3, Buffer: 3
Producer: item-4, Buffer: 4
Producer: item-5, Buffer: 5
Buffer full,wait...
Consumer: item-1, Buffer: 4
Producer: item-6, Buffer: 5
Buffer full,wait...
Consumer: item-2, Buffer: 4
Producer: item-7, Buffer: 5
Buffer full,wait...
Consumer: item-3, Buffer: 4
Producer: item-8, Buffer: 5
Buffer full,wait...
Consumer: item-4, Buffer: 4
Producer: item-9, Buffer: 5
Consumer: item-5, Buffer: 4
Consumer: item-6, Buffer: 3
Consumer: item-7, Buffer: 2
Consumer: item-8, Buffer: 1
Consumer: item-9, Buffer: 0

Semaphore信号量

Semaphore 信号量控制对共享资源的访问数量。信号量的基本概念是使用一个内部计数器,每个 acquire() 调用将其递减,每个 release() 调用将其递增。计数器永远不能低于零;当 acquire() 发现计数器为零时,它会阻塞,直到某个其他线程调用 release()。当然,从源码看,信号量也是通过 Condition 条件对象来进行实现的。

import threading
import time
# 信号量,限制最多3个线程同时访问
semaphore = threading.Semaphore(3)
def access_resource(thread_id):
    """访问共享资源"""
    with semaphore:
        print(f"Thread {thread_id} acquire\n", end="")
        time.sleep(2)  # 模拟资源访问
        print(f"Thread {thread_id} release\n", end="")
# 创建10个线程
threads = []
for i in range(10):
    t = threading.Thread(target=access_resource, args=(i,))
    threads.append(t)
    t.start()
for t in threads:
    t.join()
Thread 0 acquire
Thread 1 acquire
Thread 2 acquire
Thread 0 release
Thread 3 acquire
Thread 1 release
Thread 2 release
Thread 4 acquire
Thread 5 acquire
Thread 3 release
Thread 6 acquire
Thread 4 release
Thread 7 acquire
Thread 5 release
Thread 8 acquire
Thread 6 release
Thread 9 acquire
Thread 7 release
Thread 8 release
Thread 9 release

Barrier屏障

Barrier 使多个线程等待,直到指定数目的线程都到达某个点,这些线程才会被同时唤醒,然后继续往后执行(需要注意的是:如果没有设置 timeout,且总的线程数无法整除给定的线程数 parties 时,会导致线程阻塞,形成死锁)。

import threading
import time
# 创建屏障,等待3个线程(注意:如果总的线程数无法整除3,则会导致线程阻塞)
barrier = threading.Barrier(3)
def worker(worker_id):
    """工作线程"""
    print(f"Worker {worker_id} start")
    time.sleep(worker_id)  # 模拟不同工作速度
    print(f"Worker {worker_id} arrive")
    barrier.wait()  # 等待所有线程到达
    print(f"Worker {worker_id} continue")
# 创建3个线程
threads = []
for i in range(6):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()
for t in threads:
    t.join()
Worker 0 start
Worker 0 arrive
Worker 1 start
Worker 2 start
Worker 3 start
Worker 4 start
Worker 5 start
Worker 1 arrive
Worker 2 arrive
Worker 2 continue
Worker 0 continue
Worker 1 continue
Worker 3 arrive
Worker 4 arrive
Worker 5 arrive
Worker 5 continue
Worker 3 continue
Worker 4 continue

不管以什么样的方式进行线程通信,最重要的当属线程安全,线程通信的各种设计,也是建立在通过锁的机制保证线程安全的情况下来实现各种功能的。

到此这篇关于Python 之多线程通信的几种常用方法的文章就介绍到这了,更多相关Python多线程通信内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Python实现npy/mat文件的保存与读取

    Python实现npy/mat文件的保存与读取

    除了常用的csv文件和excel文件之外,我们还可以通过Python把数据保存文npy文件格式和mat文件格式。本文为大家展示了实现npy文件与mat文件的保存与读取的示例代码,需要的可以参考一下
    2022-04-04
  • 解决pytorch 损失函数中输入输出不匹配的问题

    解决pytorch 损失函数中输入输出不匹配的问题

    这篇文章主要介绍了解决pytorch 损失函数中输入输出不匹配的问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-06-06
  • 放弃 Python 转向 Go语言有人给出了 9 大理由

    放弃 Python 转向 Go语言有人给出了 9 大理由

    今年 Stream 团队的主要编程语言从 Python 转向了 Go。本文解释了其背后的九大原因以及如何做好这一转换。下面小编给大家分享放弃 Python 转向 Go语言有人给出了 9 大理由,一起看看吧
    2017-10-10
  • Python 调用API发送邮件

    Python 调用API发送邮件

    这篇文章主要介绍了Python 调用API发送邮件的方法,帮助大家更好的理解和学习使用python,感兴趣的朋友可以了解下
    2021-03-03
  • 详细分析Python垃圾回收机制

    详细分析Python垃圾回收机制

    这篇文章主要介绍了Python垃圾回收机制的相关资料,文中讲解非常详细,示例代码帮助大家更好的理解和学习,感兴趣的朋友可以了解下
    2020-07-07
  • Python实现将罗马数字转换成普通阿拉伯数字的方法

    Python实现将罗马数字转换成普通阿拉伯数字的方法

    这篇文章主要介绍了Python实现将罗马数字转换成普通阿拉伯数字的方法,简单分析了罗马数字的构成并结合实例形式给出了Python转换罗马数字为阿拉伯数字的实现方法,需要的朋友可以参考下
    2017-04-04
  • Python3.9最新版下载与安装图文教程详解(Windows系统为例)

    Python3.9最新版下载与安装图文教程详解(Windows系统为例)

    这篇文章主要介绍了Python3.9最新版下载与安装图文教程详解,本文通过图文并茂的形式给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-11-11
  • Python小工具之消耗系统指定大小内存的方法

    Python小工具之消耗系统指定大小内存的方法

    今天小编就为大家分享一篇Python小工具之消耗系统指定大小内存的方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-12-12
  • python统计字符串中字母出现次数代码实例

    python统计字符串中字母出现次数代码实例

    这篇文章主要介绍了python统计字符串中字母出现次数代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-03-03
  • python中json格式数据输出的简单实现方法

    python中json格式数据输出的简单实现方法

    下面小编就为大家带来一篇python中json格式数据输出的简单实现方法。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2016-10-10

最新评论