Python concurrent.futures并发编程实战

 更新时间:2026年06月22日 10:01:40   作者:第一程序员  
本文主要介绍了Python concurrent.futures并发编程实战,涵盖ThreadPoolExecutor和ProcessPoolExecutor的详解、Future对象使用、实战案例和最佳实践,通过理解这些内容,开发者可以有效地实现并发编程

引言

在后端开发中,并发编程是提高系统性能的关键技术。Python的concurrent.futures模块提供了简洁的高级API,让开发者能够轻松实现多线程和多进程并发。作为一名从Python转向Rust的后端开发者,我在实践中总结了concurrent.futures的最佳实践。本文将深入探讨这个强大的并发工具,帮助你编写高效的并发代码。

一、concurrent.futures基础

1.1 模块概述

concurrent.futures模块提供了两个主要的执行器:

  • ThreadPoolExecutor:线程池执行器
  • ProcessPoolExecutor:进程池执行器

1.2 基本使用模式

from concurrent.futures import ThreadPoolExecutor

def task(n):
    return n * n

with ThreadPoolExecutor(max_workers=4) as executor:
    future = executor.submit(task, 5)
    result = future.result()
    print(result)

1.3 核心组件

组件说明
Executor抽象执行器基类
ThreadPoolExecutor线程池执行器
ProcessPoolExecutor进程池执行器
Future表示异步计算的结果

二、ThreadPoolExecutor详解

2.1 创建线程池

from concurrent.futures import ThreadPoolExecutor

# 创建线程池
executor = ThreadPoolExecutor(
    max_workers=4,
    thread_name_prefix='worker-'
)

# 使用上下文管理器
with ThreadPoolExecutor(max_workers=4) as executor:
    # 提交任务
    pass

2.2 提交任务

from concurrent.futures import ThreadPoolExecutor

def download_file(url):
    # 模拟下载
    import time
    time.sleep(1)
    return f"Downloaded: {url}"

with ThreadPoolExecutor(max_workers=3) as executor:
    # 提交单个任务
    future = executor.submit(download_file, "http://example.com/file1.txt")
    
    # 获取结果(阻塞)
    result = future.result(timeout=5)
    print(result)

2.3 批量提交任务

urls = [
    "http://example.com/file1.txt",
    "http://example.com/file2.txt",
    "http://example.com/file3.txt",
]

with ThreadPoolExecutor(max_workers=3) as executor:
    # 批量提交
    futures = [executor.submit(download_file, url) for url in urls]
    
    # 获取所有结果
    for future in futures:
        print(future.result())

三、ProcessPoolExecutor详解

3.1 创建进程池

from concurrent.futures import ProcessPoolExecutor

def compute_heavy(n):
    # CPU密集型任务
    return sum(i * i for i in range(n))

with ProcessPoolExecutor(max_workers=4) as executor:
    future = executor.submit(compute_heavy, 1_000_000)
    print(future.result())

3.2 线程池 vs 进程池

特性ThreadPoolExecutorProcessPoolExecutor
GIL限制受GIL限制不受GIL限制
适用场景IO密集型CPU密集型
启动开销
内存开销
数据共享容易困难

3.3 选择建议

# IO密集型任务 → ThreadPoolExecutor
# CPU密集型任务 → ProcessPoolExecutor
# 混合任务 → 结合使用

def process_task(data):
    # IO操作
    raw_data = fetch_from_api(data)
    # CPU操作
    result = compute(raw_data)
    return result

四、Future对象详解

4.1 Future状态

from concurrent.futures import Future

future = Future()

# 检查状态
print(future.done())    # False
print(future.running()) # False
print(future.cancelled()) # False

# 设置结果
future.set_result(42)

print(future.done())    # True
print(future.result())  # 42

4.2 添加回调

def callback(future):
    print(f"Task completed: {future.result()}")

with ThreadPoolExecutor() as executor:
    future = executor.submit(task, 5)
    future.add_done_callback(callback)

4.3 超时处理

try:
    result = future.result(timeout=2)
except concurrent.futures.TimeoutError:
    print("Task timed out")

五、高级用法

5.1 as_completed

from concurrent.futures import ThreadPoolExecutor, as_completed

def task(id):
    import time
    time.sleep(id)
    return f"Task {id} completed"

with ThreadPoolExecutor(max_workers=3) as executor:
    futures = [executor.submit(task, i) for i in range(1, 4)]
    
    # 按完成顺序获取结果
    for future in as_completed(futures):
        print(future.result())

5.2 map函数

with ThreadPoolExecutor(max_workers=3) as executor:
    results = executor.map(task, [1, 2, 3, 4, 5])
    
    # 按输入顺序返回结果
    for result in results:
        print(result)

5.3 wait函数

from concurrent.futures import wait, FIRST_COMPLETED

with ThreadPoolExecutor(max_workers=3) as executor:
    futures = [executor.submit(task, i) for i in range(1, 4)]
    
    # 等待第一个完成
    done, not_done = wait(futures, return_when=FIRST_COMPLETED)
    
    print(f"Completed: {len(done)}")
    print(f"Not completed: {len(not_done)}")

六、实战案例

6.1 并行下载文件

import requests
from concurrent.futures import ThreadPoolExecutor

def download_file(url, save_path):
    response = requests.get(url)
    with open(save_path, 'wb') as f:
        f.write(response.content)
    return save_path

urls = [
    ("https://example.com/image1.jpg", "images/image1.jpg"),
    ("https://example.com/image2.jpg", "images/image2.jpg"),
    ("https://example.com/image3.jpg", "images/image3.jpg"),
]

with ThreadPoolExecutor(max_workers=5) as executor:
    futures = [executor.submit(download_file, url, path) for url, path in urls]
    
    for future in as_completed(futures):
        print(f"Downloaded: {future.result()}")

6.2 并行数据库查询

import psycopg2
from concurrent.futures import ThreadPoolExecutor

def query_user(user_id):
    conn = psycopg2.connect("dbname=example user=postgres")
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
    result = cursor.fetchone()
    conn.close()
    return result

user_ids = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

with ThreadPoolExecutor(max_workers=4) as executor:
    results = executor.map(query_user, user_ids)
    
    for user_id, user in zip(user_ids, results):
        print(f"User {user_id}: {user}")

6.3 混合IO和CPU任务

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def fetch_data(url):
    import requests
    return requests.get(url).json()

def process_data(data):
    # CPU密集型处理
    return sum(item['value'] for item in data)

def pipeline(url):
    data = fetch_data(url)
    return process_data(data)

urls = ["https://api.example.com/data1", "https://api.example.com/data2"]

# IO阶段使用线程池
with ThreadPoolExecutor(max_workers=4) as io_executor:
    futures = [io_executor.submit(fetch_data, url) for url in urls]
    raw_data = [f.result() for f in futures]

# CPU阶段使用进程池
with ProcessPoolExecutor(max_workers=4) as cpu_executor:
    results = list(cpu_executor.map(process_data, raw_data))

print(results)

七、最佳实践

7.1 合理设置worker数量

import os

# CPU密集型任务
cpu_workers = os.cpu_count() or 4

# IO密集型任务
io_workers = min(32, (os.cpu_count() or 4) * 5)

7.2 避免共享状态

# 不好的做法:共享可变状态
counter = 0

def increment():
    global counter
    counter += 1

# 好的做法:使用线程安全的数据结构或锁
from threading import Lock

class ThreadSafeCounter:
    def __init__(self):
        self._count = 0
        self._lock = Lock()
    
    def increment(self):
        with self._lock:
            self._count += 1

7.3 优雅关闭

executor = ThreadPoolExecutor(max_workers=4)

try:
    # 提交任务
    futures = [executor.submit(task, i) for i in range(10)]
    
    # 获取结果
    for future in futures:
        print(future.result())
finally:
    # 关闭执行器
    executor.shutdown(wait=True)

八、性能对比

8.1 同步vs异步

import time

def sync_download(urls):
    for url in urls:
        download_file(url)

def async_download(urls):
    with ThreadPoolExecutor(max_workers=5) as executor:
        executor.map(download_file, urls)

# 性能对比
urls = ["https://example.com/file{}.txt".format(i) for i in range(10)]

start = time.time()
sync_download(urls)
print(f"Sync time: {time.time() - start:.2f}s")

start = time.time()
async_download(urls)
print(f"Async time: {time.time() - start:.2f}s")

8.2 线程池vs进程池

def cpu_intensive(n):
    return sum(i * i for i in range(n))

# 线程池(受GIL限制)
with ThreadPoolExecutor(max_workers=4) as executor:
    start = time.time()
    executor.map(cpu_intensive, [10_000_000] * 4)
    print(f"ThreadPool time: {time.time() - start:.2f}s")

# 进程池(不受GIL限制)
with ProcessPoolExecutor(max_workers=4) as executor:
    start = time.time()
    executor.map(cpu_intensive, [10_000_000] * 4)
    print(f"ProcessPool time: {time.time() - start:.2f}s")

总结

concurrent.futures是Python并发编程的利器。通过本文的学习,你应该掌握了以下核心要点:

  1. ThreadPoolExecutor:适用于IO密集型任务
  2. ProcessPoolExecutor:适用于CPU密集型任务
  3. Future对象:管理异步计算结果
  4. 高级APIas_completedmapwait
  5. 实战案例:并行下载、数据库查询、混合任务
  6. 最佳实践:合理设置worker数量、避免共享状态
  7. 性能对比:同步vs异步、线程池vs进程池

作为从Python转向Rust的后端开发者,理解并发编程模式对于构建高性能系统至关重要。虽然Rust的并发模型更加安全,但Python的concurrent.futures提供了快速实现并发的便捷方式。

到此这篇关于Python concurrent.futures并发编程实战的文章就介绍到这了,更多相关Python concurrent.futures并发内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 使用Python编写批量文件重命名工具

    使用Python编写批量文件重命名工具

    有时候呢,你的文件被下载下来文件名都是乱七八糟毫无规律,但是当时你下载的时候没办法重名或者你又不想另存为重新重命名,所以我们就来使用Python编写一个文件批量重命名工具吧
    2025-05-05
  • python自动化测试通过日志3分钟定位bug

    python自动化测试通过日志3分钟定位bug

    软件开发中通过日志记录程序的运行情况是一个开发的好习惯,对于错误排查和系统运维都有很大帮助,Python标准库自带了强大的logging日志模块,在各种python模块中得到广泛应用
    2021-11-11
  • python模块简介之有序字典(OrderedDict)

    python模块简介之有序字典(OrderedDict)

    字典是Python开发中很常用的一种数据结构,但dict有个缺陷(其实也不算缺陷),迭代时并不是按照元素添加的顺序进行,可能在某些场景下,不能满足我们的要求。
    2016-12-12
  • Pandas透视表(pivot_table)详解

    Pandas透视表(pivot_table)详解

    这篇文章主要介绍了Pandas透视表(pivot_table)详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-07-07
  • Python中JSON数据验证的三种专业级方案

    Python中JSON数据验证的三种专业级方案

    这篇文章主要介绍了Python中三种专业的JSON验证方案:jsonschema、Pydantic和voluptuous,分别从声明式验证、类型安全的模型校验和轻量级验证流程三个方面进行了详细讲解,此外,还讨论了传统验证方法的局限性,并展示了如何使用这些库进行高效的数据校验和模型定义
    2026-01-01
  • Python实现可获取网易页面所有文本信息的网易网络爬虫功能示例

    Python实现可获取网易页面所有文本信息的网易网络爬虫功能示例

    这篇文章主要介绍了Python实现可获取网易页面所有文本信息的网易网络爬虫功能,涉及Python针对网页的获取、字符串正则判定等相关操作技巧,需要的朋友可以参考下
    2018-01-01
  • python中装饰器的理解与使用详解

    python中装饰器的理解与使用详解

    这篇文章主要介绍了python中装饰器的理解与使用详解,装饰器本质上是一个闭包函数,其作用在于可以为其他函数增加额外功能,装饰器的返回值是一个函数对象,需要的朋友可以参考下
    2023-07-07
  • python实现自动登录跳转页面并获取信息

    python实现自动登录跳转页面并获取信息

    这篇文章主要为大家详细介绍了如何使用python实现自动登录跳转页面并获取信息功能,文中的示例代码讲解详细,有需要的小伙伴可以跟随小编一起学习一下
    2025-05-05
  • Python使用Pydantic验证和解析配置数据的完整指南

    Python使用Pydantic验证和解析配置数据的完整指南

    在开发过程中,配置管理是绕不开的核心环节,这些配置数据的质量直接影响系统的稳定性和安全性,下面我们就来看看如何使用Pydantic验证和解析配置数据吧
    2026-01-01
  • python matplotlib饼状图参数及用法解析

    python matplotlib饼状图参数及用法解析

    这篇文章主要介绍了python matplotlib饼状图参数及用法解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-11-11

最新评论