Python 中的进程(Process)实例详解
更新时间:2026年06月13日 10:08:00 作者:梦想不只是梦与想
这段文章详细介绍了进程的概念及其与线程的区别,重点讲解了Python的多进程编程,创建方式、进程间通信机制(IPC和进程同步方法,特别强调了进程间数据不共享的特点,适合于CPU密集型任务处理,感兴趣的朋友一起看看吧
进程是操作系统进行资源分配和调度的基本单位,每个进程都有独立的内存空间、文件描述符、代码和数据。Python 通过 multiprocessing 模块支持多进程编程。
进程(Process)是什么?
进程是资源分配的最小单位,通俗点:一个正在运行的程序就是一个进程(比如正在运行的微信、QQ)。
可以理解成:
进程 = 操作系统给你开的一间“独立办公室”(内存空间、资源都有自己的一套)
线程 = 办公室里干活的人(这一章先不展开)
一、进程 vs 线程
| 特性 | 进程 | 线程 |
|---|---|---|
| 内存空间 | 独立,互不干扰 | 共享同一进程的内存 |
| 数据隔离 | 天然隔离,需要 IPC | 共享数据需要加锁 |
| 创建开销 | 大(需要复制内存) | 小 |
| 切换开销 | 大(切换页表) | 小 |
| GIL 影响 | 无,真正的并行 | 有,受 GIL 限制 |
| 适用场景 | CPU 密集型 | I/O 密集型 |
# 进程的独立内存空间
from multiprocessing import Process
data = [] # 全局列表
def worker():
data.append(1) # 每个进程有自己 copy 的 data
print(f"{id(data)}: {data}")
p1 = Process(target=worker)
p2 = Process(target=worker)
p1.start()
p2.start()
p1.join()
p2.join()
# 输出:不同内存地址的进程各自操作自己的 data二、创建进程的方式
1. 使用Process类
from multiprocessing import Process
import os
# 方式1:传入目标函数
def worker(name, num):
print(f"进程 {name}: PID={os.getpid()}, 父进程={os.getppid()}")
return num * 2
p = Process(target=worker, args=("worker1", 100))
p.start() # 启动进程
p.join() # 等待进程结束
print(f"exit code: {p.exitcode}")
# 方式2:自定义进程类
class MyProcess(Process):
def __init__(self, name):
super().__init__()
self.name = name
def run(self):
print(f"运行进程: {self.name}")
p = MyProcess("my_worker")
p.start()
p.join()2. 进程启动方式(spawn vs fork)
import multiprocessing as mp
# 设置启动方式(必须在 if __name__ == '__main__' 内)
if __name__ == '__main__':
mp.set_start_method('spawn') # Windows 默认,macOS 可选
# 或 mp.set_start_method('fork') # Linux 默认
# 三种启动方式对比:
# | 方式 | 特点 | 适用平台 |
# |--------|-------------------------------|-----------|
# | spawn | 全新启动,不继承父进程资源 | Windows |
# | fork | 复制父进程,继承资源(不安全) | Linux/macOS|
# | forkserver| 通过服务进程 fork(安全) | Unix-like |三、进程池(Pool)
from multiprocessing import Pool
import time
def cpu_intensive(n):
"""CPU 密集型任务"""
return sum(i * i for i in range(n))
def io_bound(n):
"""I/O 密集型任务"""
time.sleep(n)
return n
if __name__ == '__main__':
# 创建进程池(默认 CPU 核心数)
with Pool(processes=4) as pool:
# 方式1:map(阻塞,保持顺序)
results = pool.map(cpu_intensive, [10**7] * 4)
# 方式2:map_async(非阻塞)
result = pool.map_async(cpu_intensive, [10**7] * 4)
print("异步提交完成")
results = result.get() # 等待结果
# 方式3:apply(同步)
result = pool.apply(cpu_intensive, (10**7,))
# 方式4:apply_async(异步)
async_result = pool.apply_async(cpu_intensive, (10**7,))
result = async_result.get(timeout=30)
# 方式5:starmap(多参数)
args = [(1, 2), (3, 4), (5, 6)]
results = pool.starmap(lambda a, b: a + b, args)四、进程间通信(IPC)
1. Queue(队列)
from multiprocessing import Process, Queue
import time
def producer(q):
for i in range(5):
q.put(f"消息{i}")
print(f"生产: {i}")
time.sleep(0.5)
q.put(None) # 结束标志
def consumer(q):
while True:
msg = q.get()
if msg is None:
break
print(f"消费: {msg}")
time.sleep(1)
if __name__ == '__main__':
q = Queue(maxsize=10) # 指定最大容量
p1 = Process(target=producer, args=(q,))
p2 = Process(target=consumer, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()2. Pipe(管道)
from multiprocessing import Process, Pipe
def worker(conn):
"""子进程通过管道发送数据"""
conn.send([1, 2, 3]) # 发送对象
print(f"收到父进程消息: {conn.recv()}")
conn.close()
if __name__ == '__main__':
# 创建双向管道
parent_conn, child_conn = Pipe()
p = Process(target=worker, args=(child_conn,))
p.start()
print(f"收到子进程数据: {parent_conn.recv()}")
parent_conn.send("来自父进程的问候")
p.join()3. Shared Memory(共享内存)
from multiprocessing import Process, Value, Array
def worker(val, arr):
val.value += 1
for i in range(len(arr)):
arr[i] += i
if __name__ == '__main__':
# 创建共享变量
num = Value('i', 0) # 'i' 表示有符号整数
arr = Array('d', [0.0, 1.0, 2.0]) # 'd' 表示 double
processes = []
for _ in range(4):
p = Process(target=worker, args=(num, arr))
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"num: {num.value}")
print(f"arr: {arr[:]}")共享内存类型码:
| 类型码 | C 类型 | Python 类型 |
|---|---|---|
'i' | int | 有符号整数 |
'I' | unsigned int | 无符号整数 |
'l' | long | 有符号长整数 |
'f' | float | 浮点数 |
'd' | double | 双精度浮点数 |
'c' | char | 字符 |
4. Manager(管理器)
from multiprocessing import Process, Manager
def worker(d, l, key, value):
d[key] = value
l.append(value)
if __name__ == '__main__':
with Manager() as manager:
# 创建共享字典和列表
d = manager.dict()
l = manager.list()
processes = []
for i in range(5):
p = Process(target=worker, args=(d, l, f"key{i}", i))
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"dict: {d}")
print(f"list: {l}")五、进程同步(锁)
from multiprocessing import Process, Lock, Value
import time
def worker(lock, counter):
for _ in range(1000):
lock.acquire()
try:
counter.value += 1
finally:
lock.release()
if __name__ == '__main__':
counter = Value('i', 0)
lock = Lock()
processes = [Process(target=worker, args=(lock, counter)) for _ in range(10)]
for p in processes:
p.start()
for p in processes:
p.join()
print(f"最终值: {counter.value}") # 应该是 10000其他:
| 同步 | 用途 |
|---|---|
Lock | 互斥锁,一次只能一个进程访问 |
RLock | 可重入锁,同一进程可多次获取 |
Semaphore | 信号量,限制并发数量 |
Event | 事件,用于进程间信号通知 |
Condition | 条件变量,复杂同步 |
核心点:
- 进程内存隔离,数据不共享(与线程不同)
- 创建进程需要
if __name__ == '__main__' - 进程适合 CPU 密集型任务
- 进程间通信使用
Queue、Pipe、Manager等 - 共享可变数据需要加锁
- 进程创建开销大,使用进程池复用
到此这篇关于Python 中的进程(Process)实例详解的文章就介绍到这了,更多相关Python 进程Process内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!


最新评论