Python使用multiprocessing模块实现多进程并行计算

 更新时间:2025年07月21日 09:44:51   作者:彬彬侠  
Python的multiprocessing模块是一个标准库模块,用于实现多进程并行计算,相比线程(threading 模块),multiprocessing更适合需要高性能计算的场景,本文将详细介绍multiprocessing模块的定义、功能、用法、示例、应用场景、最佳实践和注意事项,需要的朋友可以参考下

引言

Python 的 multiprocessing 模块是一个标准库模块,用于实现多进程并行计算。它通过创建独立的进程,绕过 Python 的全局解释器锁(GIL),在多核 CPU 上实现真正的并行,特别适合 CPU 密集型任务(如数值计算、图像处理)。相比线程(threading 模块),multiprocessing 更适合需要高性能计算的场景。本文将详细介绍 multiprocessing 模块的定义、功能、用法、示例、应用场景、最佳实践和注意事项。

1. multiprocessing 模块的定义和原理

1.1 定义

multiprocessing 是一个跨平台的模块,提供创建和管理进程的 API,支持进程间通信(IPC)、同步机制和共享资源管理。它模仿了 threading 模块的接口,方便开发者从线程迁移到进程。

核心功能

  • 进程创建:创建独立进程,运行指定函数或任务。
  • 进程池:管理一组工作进程,分配任务。
  • 进程通信:支持管道(Pipe)、队列(Queue)等 IPC 机制。
  • 同步原语:提供锁(Lock)、信号量(Semaphore)、事件(Event)等。
  • 共享内存:支持共享基本数据类型(Value)和数组(Array)。
  • 跨平台:在 Windows、Linux、macOS 上运行一致。

依赖:标准库,无需额外安装。

1.2 原理

  • 进程 vs 线程
    • 进程:独立的内存空间,拥有自己的 Python 解释器和 GIL,适合 CPU 密集型任务。
    • 线程:共享内存空间,受 GIL 限制,适合 I/O 密集型任务。
  • GIL 绕过:每个进程有独立的 GIL,允许多核并行。
  • 进程创建
    • Linux/macOS:使用 fork(复制父进程),或 spawn(新进程)。
    • Windows:始终使用 spawn,启动新解释器。
  • 通信开销:进程间通信(如 Queue)比线程慢,需优化设计。

1.3 导入

import multiprocessing

2. multiprocessing 的核心组件和功能

2.1 进程创建(Process)

通过 multiprocessing.Process 创建进程,运行指定函数。

构造函数

Process(target=None, args=(), kwargs={}, name=None, daemon=None)
  • target:目标函数。
  • args/kwargs:函数参数。
  • name:进程名称。
  • daemon:是否为守护进程(随主进程退出)。

主要方法

  • start():启动进程。
  • join():等待进程结束。
  • terminate():强制终止进程。
  • is_alive():检查进程是否存活。

示例

import multiprocessing

def worker(num):
    print(f"Worker {num} running in process {multiprocessing.current_process().name}")

if __name__ == "__main__":
    processes = [multiprocessing.Process(target=worker, args=(i,)) for i in range(3)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()

输出(顺序可能不同):

Worker 0 running in process Process-1
Worker 1 running in process Process-2
Worker 2 running in process Process-3
  • 说明:创建 3 个进程,每个运行 worker 函数。

2.2 进程池(Pool)

Pool 用于管理固定数量的进程,适合并行处理大量任务。

构造函数

Pool(processes=None, initializer=None, initargs=())
  • processes:进程数(默认 CPU 核心数)。
  • initializer:每个进程的初始化函数。
  • initargs:初始化函数参数。

主要方法

  • map(func, iterable):并行执行 func 应用于 iterable,返回结果列表。
  • imap(func, iterable):惰性版本,返回迭代器。
  • apply(func, args=(), kwds={}):同步执行单任务。
  • apply_async(func, args=(), kwds={}):异步执行单任务。
  • close():关闭池,禁止新任务。
  • join():等待池内进程完成。

示例

from multiprocessing import Pool

def square(n):
    return n * n

if __name__ == "__main__":
    with Pool(processes=4) as pool:
        results = pool.map(square, range(10))
    print(results)  # 输出: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

2.3 进程通信

支持 PipeQueue 实现进程间数据交换。

Pipe

  • 双向或单向管道,适合两个进程通信。

构造函数

Pipe(duplex=True)
  • 返回 (conn1, conn2),两个连接对象。
  • duplex=True:双向;False:单向。

示例

from multiprocessing import Process, Pipe

def sender(conn):
    conn.send("Hello from sender")
    conn.close()

def receiver(conn):
    print(conn.recv())
    conn.close()

if __name__ == "__main__":
    parent_conn, child_conn = Pipe()
    p1 = Process(target=sender, args=(child_conn,))
    p2 = Process(target=receiver, args=(parent_conn,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

输出

Hello from sender

Queue

  • 线程和进程安全的队列,适合多生产者/消费者场景。

构造函数

Queue(maxsize=0)
  • maxsize:最大容量(0 表示无限制)。

示例

from multiprocessing import Process, Queue

def producer(queue):
    queue.put("Data from producer")

def consumer(queue):
    print(queue.get())

if __name__ == "__main__":
    queue = Queue()
    p1 = Process(target=producer, args=(queue,))
    p2 = Process(target=consumer, args=(queue,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

2.4 同步机制

提供锁、信号量等原语,确保进程安全访问共享资源。

Lock

  • 互斥锁,防止多个进程同时访问资源。
  • 示例
from multiprocessing import Process, Lock

def printer(lock, msg):
    with lock:
        print(msg)

if __name__ == "__main__":
    lock = Lock()
    processes = [Process(target=printer, args=(lock, f"Message {i}")) for i in range(3)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()

Semaphore

  • 控制有限资源的并发访问。
  • 示例
from multiprocessing import Process, Semaphore

def worker(sem, name):
    with sem:
        print(f"{name} acquired resource")
        # 模拟工作

if __name__ == "__main__":
    sem = Semaphore(2)  # 允许 2 个进程同时访问
    processes = [Process(target=worker, args=(sem, f"Worker {i}")) for i in range(5)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()

Event

  • 进程间信号通知。
  • 示例
from multiprocessing import Process, Event
import time

def wait_for_event(event):
    event.wait()
    print("Event triggered")

if __name__ == "__main__":
    event = Event()
    p = Process(target=wait_for_event, args=(event,))
    p.start()
    time.sleep(1)
    event.set()  # 触发事件
    p.join()

2.5 共享内存

通过 ValueArray 共享基本数据类型。

  • Value:单个共享值。
  • Array:共享数组。

示例

from multiprocessing import Process, Value, Array

def modify(shared_num, shared_arr):
    shared_num.value += 1
    for i in range(len(shared_arr)):
        shared_arr[i] += 1

if __name__ == "__main__":
    num = Value("i", 0)  # 共享整数
    arr = Array("i", [1, 2, 3])  # 共享数组
    p = Process(target=modify, args=(num, arr))
    p.start()
    p.join()
    print(num.value)  # 输出: 1
    print(list(arr))  # 输出: [2, 3, 4]

3. 应用场景

数值计算

  • 并行处理矩阵运算、蒙特卡洛模拟。
  • 示例:计算大数组的平方。

图像处理

  • 并行处理图像滤波、特征提取。
  • 示例:批量应用卷积滤波。

机器学习

  • 并行训练模型或处理数据预处理。
  • 示例:并行特征提取。

数据处理

  • 并行处理 CSV 文件、数据库查询。
  • 示例:多进程解析日志文件。

爬虫

  • 并行抓取网页(注意网络限制)。
  • 示例:结合 urllib 并发下载。

4. 示例:多进程爬虫

结合 urllibQueue 实现并行网页抓取。

示例

import urllib.request
from multiprocessing import Process, Queue
from urllib.error import URLError

def fetch_url(queue, url):
    try:
        with urllib.request.urlopen(url) as response:
            content = response.read().decode("utf-8")
            queue.put((url, len(content)))
    except URLError as e:
        queue.put((url, str(e)))

def main():
    urls = ["https://example.com", "https://python.org", "https://invalid-url"]
    queue = Queue()
    processes = [Process(target=fetch_url, args=(queue, url)) for url in urls]
    for p in processes:
        p.start()
    for p in processes:
        p.join()
    while not queue.empty():
        url, result = queue.get()
        print(f"{url}: {result}")

if __name__ == "__main__":
    main()

输出(示例):

https://example.com: 1256
https://python.org: 50000
https://invalid-url: [Errno 11001] getaddrinfo failed

5. 最佳实践

使用 if __name__ == "__main__":

  • 防止 Windows 和某些 Unix 系统重复导入模块。

示例

if __name__ == "__main__":
    p = Process(target=worker)
    p.start()

选择进程池

  • 对于批量任务,使用 Pool 简化管理。

示例

with Pool(4) as pool:
    results = pool.map(func, data)

优化通信

  • 尽量减少进程间通信,使用共享内存或批量传递数据。

示例

arr = Array("i", [0] * size)

异常处理

  • 在子进程中捕获异常,通过 Queue 或日志返回。

示例

def worker(queue):
    try:
        # 工作代码
    except Exception as e:
        queue.put(str(e))

测试代码

  • 使用 pytest 测试多进程行为。

示例

import pytest
from multiprocessing import Process

def test_process():
    def worker():
        print("Test")
    p = Process(target=worker)
    p.start()
    p.join()
    assert p.exitcode == 0

进程数选择

  • 默认使用 CPU 核心数(multiprocessing.cpu_count())。

示例

processes = min(len(tasks), multiprocessing.cpu_count())

6. 注意事项

GIL 限制

  • multiprocessing 绕过 GIL,适合 CPU 密集型任务;I/O 密集型任务考虑 threadingasyncio

示例

# I/O 密集型:使用 asyncio
import asyncio
async def fetch():
    pass

Windows 兼容性

  • Windows 使用 spawn,需确保代码在 if __name__ == "__main__": 中。

示例

if __name__ == "__main__":
    main()

资源管理

  • 及时关闭进程和池,释放资源。

示例

with Pool() as pool:
    pool.map(func, data)

序列化开销

  • 传递大数据到子进程(如通过 Queue)可能慢,使用共享内存。

示例

shared_data = Value("d", 0.0)

调试难度

  • 子进程错误可能不易捕获,使用日志或 Queue 返回错误。

示例

import logging
logging.basicConfig(level=logging.INFO)

7. 总结

Python 的 multiprocessing 模块是实现多进程并行的强大工具,绕过 GIL,适合 CPU 密集型任务。其核心特点包括:

  • 定义:提供进程创建、通信、同步和共享内存的 API。
  • 功能:支持 ProcessPoolQueuePipeLock 等。
  • 应用:数值计算、图像处理、机器学习、数据处理、爬虫。
  • 最佳实践:使用 if __name__ == "__main__":、优化通信、测试代码。

以上就是Python使用multiprocessing模块实现多进程并行计算的详细内容,更多关于Python multiprocessing多进程并行计算的资料请关注脚本之家其它相关文章!

相关文章

  • Python文件操作指南解锁三个txt文件合并技术

    Python文件操作指南解锁三个txt文件合并技术

    本文将深入介绍如何利用Python编写脚本,将三个文本文件中指定的列数据合并成一个新文件,通过丰富的示例代码和详细解释,帮助掌握这一实用而灵活的数据处理技巧
    2024-01-01
  • Python中socket网络通信是干嘛的

    Python中socket网络通信是干嘛的

    在本篇文章里小编给大家分享的是关于Python中socket网络通信知识点内容,需要的朋友们可以跟着学习下。
    2020-05-05
  • 在Pycharm中设置默认自动换行的方法

    在Pycharm中设置默认自动换行的方法

    今天小编就为大家分享一篇在Pycharm中设置默认自动换行的方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2019-01-01
  • 如何通过Python实现一个消息队列

    如何通过Python实现一个消息队列

    这篇文章主要为大家详细介绍了如何通过Python实现一个简单的消息队列,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下
    2025-02-02
  • 使用Python提取PDF文件中内容的代码示例和使用技巧

    使用Python提取PDF文件中内容的代码示例和使用技巧

    在文档自动化处理、数据提取和信息分析等任务中,从 PDF 文件中提取文本是一项常见需求,PDF 文件通常分为两种类型:基于文本的 PDF 和 包含扫描图像的 PDF,本文将介绍如何使用 Python 分别提取这两种类型的 PDF 内容,需要的朋友可以参考下
    2025-07-07
  • python gdal安装与简单使用

    python gdal安装与简单使用

    这篇文章主要介绍了python gdal安装与简单使用,本文给大家介绍的非常详细,具有一定的参考借鉴价值,需要的朋友可以参考下
    2019-08-08
  • python可以用哪些数据库

    python可以用哪些数据库

    在本篇文章里小编给大家整理的是关于python支持哪些数据库的相关知识点内容,有兴趣的朋友们可以学习下。
    2020-06-06
  • Python基础之赋值,浅拷贝,深拷贝的区别

    Python基础之赋值,浅拷贝,深拷贝的区别

    这篇文章主要介绍了Python基础之赋值,浅拷贝,深拷贝的区别,文中有非常详细的代码示例,对正在学习python基础的小伙伴们也有非常好的帮助,需要的朋友可以参考下
    2021-04-04
  • 极简Python库CherryPy构建高性能Web应用实例探索

    极简Python库CherryPy构建高性能Web应用实例探索

    今天为大家介绍的是 CherryPy,它是一个极简、稳定且功能强大的Web框架,可以帮助开发者快速构建高性能的 Web 应用程序,使用 CherryPy,你可以轻松地创建RESTful API、静态网站、异步任务和 WebSocket 等应用
    2024-01-01
  • 对python的bytes类型数据split分割切片方法

    对python的bytes类型数据split分割切片方法

    今天小编就为大家分享一篇对python的bytes类型数据split分割切片方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-12-12

最新评论