Python并行处理实战之如何使用ProcessPoolExecutor加速计算

 更新时间:2025年06月13日 17:24:12   作者:engchina  
Python提供了多种并行处理的方式,其中concurrent.futures模块的ProcessPoolExecutor是一个非常强大且易于使用的工具,本文将通过一个实际示例,展示如何使用ProcessPoolExecutor进行并行处理,并详细解释代码的工作原理,感兴趣的朋友跟随小编一起看看吧

简介

在现代计算中,并行处理是提高程序性能的重要手段。Python提供了多种并行处理的方式,其中concurrent.futures模块的ProcessPoolExecutor是一个非常强大且易于使用的工具。本文将通过一个实际示例,展示如何使用ProcessPoolExecutor进行并行处理,并详细解释代码的工作原理。

完整代码示例

import time
import multiprocessing
from concurrent.futures import ProcessPoolExecutor, as_completed
from typing import List
def process_numbers(chunk: List[int], factor: int) -> str:
    """
    处理数字的函数,通过将它们乘以因子来模拟处理。
    这个函数接受一个数字列表和一个因子,计算列表中每个数字乘以因子的和,
    并返回结果字符串。
    """
    result = sum(x * factor for x in chunk)
    time.sleep(0.1)  # 使用睡眠模拟工作
    return f"处理的块和: {result}"
def main(numbers: List[int] = None, num_chunks: int = 10, factor: int = 2):
    """
    演示并行处理的主函数。
    这个函数负责设置日志记录、生成数字列表、确定最佳工作进程数量、
    将数字分成块,并使用ProcessPoolExecutor进行并行处理。
    """
    import logging
    logging.basicConfig(level=logging.INFO)
    _log = logging.getLogger(__name__)
    # 如果没有提供数字,则生成示例列表
    if numbers is None:
        numbers = list(range(1, 101))  # 生成1到100的数字
    total_numbers = len(numbers)
    _log.info(f"开始并行处理 {total_numbers} 个数字")
    cpu_count = multiprocessing.cpu_count()
    _log.info(f"检测到 {cpu_count} 个CPU核心")
    # 确定最佳工作进程数量
    optimal_workers = min(cpu_count, num_chunks)
    _log.info(f"使用 {optimal_workers} 个工作进程")
    # 计算块大小
    chunk_size = max(1, total_numbers // optimal_workers)
    _log.info(f"每个块包含 {chunk_size} 个数字")
    # 将数字分成块
    chunks = [numbers[i:i + chunk_size] for i in range(0, total_numbers, chunk_size)]
    _log.info(f"总共生成了 {len(chunks)} 个块")
    start_time = time.time()
    processed_count = 0
    # 使用ProcessPoolExecutor进行并行处理
    with ProcessPoolExecutor(max_workers=optimal_workers) as executor:
        _log.info("启动ProcessPoolExecutor")
        # 提交所有任务
        futures = [executor.submit(process_numbers, chunk, factor) for chunk in chunks]
        _log.info(f"提交了 {len(futures)} 个任务")
        # 等待完成并收集结果
        for future in as_completed(futures):
            try:
                result = future.result()
                processed_count += 1
                _log.info(f"{'#'*50}\n{result} ({processed_count}/{len(chunks)} 总计)\n{'#'*50}")
            except Exception as e:
                _log.error(f"处理块时出错: {str(e)}")
                raise
    elapsed_time = time.time() - start_time
    _log.info(f"并行处理完成,耗时 {elapsed_time:.2f} 秒。")
if __name__ == "__main__":
    # 使用数字列表的示例
    main()

代码解释

1. 导入必要的模块

import time
import multiprocessing
from concurrent.futures import ProcessPoolExecutor, as_completed
from typing import List

这些模块提供了我们需要的并行处理功能和类型提示。

2. 定义处理函数

def process_numbers(chunk: List[int], factor: int) -> str:
    """
    处理数字的函数,通过将它们乘以因子来模拟处理。
    这个函数接受一个数字列表和一个因子,计算列表中每个数字乘以因子的和,
    并返回结果字符串。
    """
    result = sum(x * factor for x in chunk)
    time.sleep(0.1)  # 使用睡眠模拟工作
    return f"处理的块和: {result}"

这个函数模拟了对数字列表的处理,通过将每个数字乘以一个因子并求和。time.sleep(0.1)用于模拟实际工作。

3. 主函数

def main(numbers: List[int] = None, num_chunks: int = 10, factor: int = 2):
    """
    演示并行处理的主函数。
    这个函数负责设置日志记录、生成数字列表、确定最佳工作进程数量、
    将数字分成块,并使用ProcessPoolExecutor进行并行处理。
    """
    import logging
    logging.basicConfig(level=logging.INFO)
    _log = logging.getLogger(__name__)

主函数负责设置日志记录、生成数字列表、确定最佳工作进程数量、将数字分成块,并使用ProcessPoolExecutor进行并行处理。

4. 生成数字列表

    # 如果没有提供数字,则生成示例列表
    if numbers is None:
        numbers = list(range(1, 101))  # 生成1到100的数字

如果没有提供数字列表,则生成1到100的数字列表。

5. 确定最佳工作进程数量

    cpu_count = multiprocessing.cpu_count()
    _log.info(f"检测到 {cpu_count} 个CPU核心")
    # 确定最佳工作进程数量
    optimal_workers = min(cpu_count, num_chunks)
    _log.info(f"使用 {optimal_workers} 个工作进程")

根据CPU核心数和用户指定的块数,确定最佳工作进程数量。

6. 将数字分成块

    # 计算块大小
    chunk_size = max(1, total_numbers // optimal_workers)
    _log.info(f"每个块包含 {chunk_size} 个数字")
    # 将数字分成块
    chunks = [numbers[i:i + chunk_size] for i in range(0, total_numbers, chunk_size)]
    _log.info(f"总共生成了 {len(chunks)} 个块")

将数字列表分成多个块,每个块的大小根据总数和工作进程数量计算。

7. 并行处理

    start_time = time.time()
    processed_count = 0
    # 使用ProcessPoolExecutor进行并行处理
    with ProcessPoolExecutor(max_workers=optimal_workers) as executor:
        _log.info("启动ProcessPoolExecutor")
        # 提交所有任务
        futures = [executor.submit(process_numbers, chunk, factor) for chunk in chunks]
        _log.info(f"提交了 {len(futures)} 个任务")
        # 等待完成并收集结果
        for future in as_completed(futures):
            try:
                result = future.result()
                processed_count += 1
                _log.info(f"{'#'*50}\n{result} ({processed_count}/{len(chunks)} 总计)\n{'#'*50}")
            except Exception as e:
                _log.error(f"处理块时出错: {str(e)}")
                raise

使用ProcessPoolExecutor进行并行处理,提交所有任务并等待完成。

8. 计算耗时

    elapsed_time = time.time() - start_time
    _log.info(f"并行处理完成,耗时 {elapsed_time:.2f} 秒。")

计算并行处理的总耗时并输出。

并行处理的基本概念和优势

并行处理是指同时执行多个任务,以提高程序的执行效率。Python的concurrent.futures模块提供了一个高级接口,用于并行执行任务。ProcessPoolExecutor是其中一个重要的类,它使用多进程来并行执行任务。

并行处理的优势包括:

  • 提高程序的执行效率
  • 充分利用多核CPU的计算能力
  • 简化多线程或多进程编程的复杂性

如何运行和测试这个示例

  • 将上述代码保存为parallel_processing_example.py文件。
  • 确保你的Python环境中安装了必要的模块(本示例不需要额外安装模块)。
  • 在终端或命令行中运行以下命令:
python parallel_processing_example.py

你将看到程序的执行过程和并行处理的结果。

总结

通过这个示例,我们展示了如何使用Python的ProcessPoolExecutor进行并行处理。并行处理是提高程序性能的重要手段,特别是在处理大量数据或计算密集型任务时。希望这个示例能帮助你更好地理解并行处理的概念和实现。

到此这篇关于Python并行处理实战之如何使用ProcessPoolExecutor加速计算的文章就介绍到这了,更多相关Python ProcessPoolExecutor加速计算内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Python读取网页内容的方法

    Python读取网页内容的方法

    这篇文章主要介绍了Python读取网页内容的方法,实例分析了Python基于URL读取网页内容的相关技巧,具有一定参考借鉴价值,需要的朋友可以参考下
    2015-07-07
  • python sqlalchemy动态修改tablename两种实现方式

    python sqlalchemy动态修改tablename两种实现方式

    这篇文章主要介绍了python sqlalchemy动态修改tablename两种实现方式,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习吧
    2023-03-03
  • Python实现概率分布公式及可视化详解

    Python实现概率分布公式及可视化详解

    在机器学习或者深度学习课题里,时常要频繁地使用统计概率的理论来辅助进行数据处理与研究,所以本文我们就来聊聊Python实现概率分布公式及可视化的相关知识吧
    2025-05-05
  • Python新版极验验证码识别验证码教程详解

    Python新版极验验证码识别验证码教程详解

    这篇文章主要介绍了Python新版极验验证码识别验证码,极验验证是一种在计算机领域用于区分自然人和机器人的,通过简单集成的方式,为开发者提供安全、便捷的云端验证服务
    2023-02-02
  • PyQt6/PySide6中QTreeView类的实现

    PyQt6/PySide6中QTreeView类的实现

    QTreeView是PyQt6或PySide6库中用于显示分层数据的控件,本文主要介绍了PyQt6/PySide6中QTreeView类的实现,具有一定的参考价值,感兴趣的可以了解一下
    2025-04-04
  • Python数据合并的concat函数与merge函数详解

    Python数据合并的concat函数与merge函数详解

    大家都知道concat()函数可以沿着一条轴将多个对象进行堆叠,其使用方式类似数据库中的数据表合并,在使用merge()函数进行合并时,默认会使用重叠的列索引做为合并键,即取行索引重叠的部分,本文给大家介绍python 数据合并concat函数与merge函数,感兴趣的朋友一起看看吧
    2022-05-05
  • 详解python3中的真值测试

    详解python3中的真值测试

    这篇文章主要介绍了详解python3中的真值测试,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-08-08
  • Python中惊艳的一行代码简洁强大表达力技巧实例

    Python中惊艳的一行代码简洁强大表达力技巧实例

    在Python中,语言的设计理念之一是简洁优雅,这使得我们能够用一行代码完成一些令人惊叹的任务,本文将分享一些在一行代码中展现出Python强大表达力的示例,涵盖各种领域的实用技巧
    2024-01-01
  • python的time模块和datetime模块实例解析

    python的time模块和datetime模块实例解析

    这篇文章主要介绍了python的time模块和datetime模块实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-11-11
  • Python设计模式之抽象工厂模式

    Python设计模式之抽象工厂模式

    这篇文章主要为大家详细介绍了Python设计模式之抽象工厂模式,感兴趣的小伙伴们可以参考一下
    2016-08-08

最新评论