Python子进程中创建多线程的完整指南

 更新时间:2025年08月28日 09:09:24   作者:Yant224  
在操作系统中,进程是资源分配的基本单位,每个进程都有独立的内存空间、文件描述符等系统资源,而线程是进程内的执行单元,多个线程共享同一进程的资源,本文给大家介绍了Python子进程中创建多线程的完整指南,需要的朋友可以参考下

一、理解进程与线程的关系

1.1 进程与线程的基本概念

在操作系统中,进程是资源分配的基本单位,每个进程都有独立的内存空间、文件描述符等系统资源。而线程是进程内的执行单元,多个线程共享同一进程的资源。

Python中的特殊之处在于**全局解释器锁(GIL)**的存在,这使得在单个进程中,多线程无法真正并行执行CPU密集型任务。但在I/O密集型任务中,多线程仍然能显著提升性能。

1.2 子进程内多线程的架构模型

这种架构的优势在于:

  • 充分利用多核CPU:每个子进程可以在不同的CPU核心上运行
  • 资源共享与隔离平衡:线程共享进程资源,进程间资源隔离
  • 灵活的任务分配:可以根据任务特性选择进程级或线程级并行

二、实现原理与技术细节

2.1 Python的多进程模块

Python提供了multiprocessing模块来创建和管理进程:

import multiprocessing
import os

def worker():
    print(f"进程ID: {os.getpid()}, 进程名称: {multiprocessing.current_process().name}")

if __name__ == "__main__":
    processes = []
    for i in range(3):
        p = multiprocessing.Process(target=worker, name=f"Process-{i}")
        processes.append(p)
        p.start()
    
    for p in processes:
        p.join()

2.2 进程间通信(IPC)机制

由于进程有独立的内存空间,必须使用特殊的通信机制:

通信方式描述适用场景
Queue先进先出的队列生产者-消费者模式
Pipe双向通信通道一对一通信
Shared Memory共享内存区域高性能数据共享
Manager托管共享对象复杂数据结构共享

三、三种实现方法详解

3.1 方法一:基础组合方式

import multiprocessing
import threading
import time

def thread_task(thread_id):
    """线程工作函数"""
    print(f"线程 {thread_id} 在进程 {multiprocessing.current_process().name} 中运行")
    time.sleep(2)
    return f"线程 {thread_id} 完成"

def process_task():
    """进程工作函数"""
    print(f"进程 {multiprocessing.current_process().name} 启动")
    
    # 创建并启动多个线程
    threads = []
    results = []
    
    for i in range(4):
        t = threading.Thread(target=thread_task, args=(i,))
        threads.append(t)
        t.start()
    
    # 等待所有线程完成
    for t in threads:
        t.join()
    
    print(f"进程 {multiprocessing.current_process().name} 结束")

if __name__ == "__main__":
    # 创建3个子进程
    processes = []
    for i in range(3):
        p = multiprocessing.Process(target=process_task, name=f"SubProcess-{i}")
        processes.append(p)
        p.start()
    
    # 等待所有子进程完成
    for p in processes:
        p.join()
    
    print("所有进程完成")

3.2 方法二:使用进程池和线程池

import concurrent.futures
import multiprocessing
import threading
import time

def thread_worker(data):
    """线程池工作函数"""
    process_name = multiprocessing.current_process().name
    thread_name = threading.current_thread().name
    time.sleep(0.5)  # 模拟工作负载
    return f"{process_name}-{thread_name} 处理: {data}"

def process_worker(data_chunk):
    """进程池工作函数"""
    print(f"进程 {multiprocessing.current_process().name} 开始处理 {len(data_chunk)} 个项目")
    
    # 使用线程池处理数据
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        results = list(executor.map(thread_worker, data_chunk))
    
    return results

if __name__ == "__main__":
    # 准备数据
    all_data = [f"data_{i}" for i in range(20)]
    chunk_size = 5
    data_chunks = [all_data[i:i+chunk_size] for i in range(0, len(all_data), chunk_size)]
    
    # 使用进程池
    with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
        futures = [executor.submit(process_worker, chunk) for chunk in data_chunks]
        
        # 收集结果
        all_results = []
        for future in concurrent.futures.as_completed(futures):
            all_results.extend(future.result())
    
    print("处理完成,结果:")
    for result in all_results:
        print(f"  {result}")

3.3 方法三:自定义进程类

import multiprocessing
import threading
import time

class ThreadedProcess(multiprocessing.Process):
    def __init__(self, task_id, data_list):
        super().__init__()
        self.task_id = task_id
        self.data_list = data_list
        self.results = multiprocessing.Manager().list()
    
    def run(self):
        print(f"进程 {self.name} 开始处理任务 {self.task_id}")
        
        # 创建线程
        threads = []
        for i, data in enumerate(self.data_list):
            thread = threading.Thread(
                target=self.process_item,
                args=(data, i)
            )
            threads.append(thread)
            thread.start()
        
        # 等待所有线程完成
        for thread in threads:
            thread.join()
        
        print(f"进程 {self.name} 完成任务 {self.task_id}")
    
    def process_item(self, data, index):
        """处理单个数据项"""
        thread_name = threading.current_thread().name
        print(f"线程 {thread_name} 处理: {data}")
        time.sleep(0.3)  # 模拟处理时间
        
        # 处理数据并保存结果
        result = {
            'index': index,
            'data': data,
            'processed': data.upper(),  # 示例处理
            'thread': thread_name,
            'process': self.name
        }
        self.results.append(result)

if __name__ == "__main__":
    # 准备任务数据
    tasks = [
        (1, ['apple', 'banana', 'cherry']),
        (2, ['dog', 'elephant', 'fox']),
        (3, ['green', 'blue', 'red', 'yellow'])
    ]
    
    # 创建并启动进程
    processes = []
    for task_id, data_list in tasks:
        process = ThreadedProcess(task_id, data_list)
        processes.append(process)
        process.start()
    
    # 等待所有进程完成
    for process in processes:
        process.join()
    
    # 汇总结果
    all_results = []
    for process in processes:
        all_results.extend(list(process.results))
    
    print("\n所有任务完成,结果汇总:")
    for result in all_results:
        print(f"  任务{result['index']}: {result['data']} -> {result['processed']}")

四、进程间通信实战

4.1 使用Queue进行进程间通信

import multiprocessing
import threading
import time
import random

def producer(queue, producer_id):
    """生产者线程函数"""
    for i in range(5):
        item = f"生产者{producer_id}-项目{i}"
        queue.put(item)
        print(f"生产: {item}")
        time.sleep(random.uniform(0.1, 0.5))
    queue.put(f"生产者{producer_id}-完成")

def consumer_process(queue, consumer_id):
    """消费者进程函数"""
    print(f"消费者进程 {consumer_id} 启动")
    
    completed_producers = 0
    total_producers = 2  # 假设有2个生产者
    
    while completed_producers < total_producers:
        try:
            item = queue.get(timeout=5)
            if item.endswith("-完成"):
                completed_producers += 1
                print(f"消费者{consumer_id} 收到完成信号: {item}")
            else:
                print(f"消费者{consumer_id} 处理: {item}")
                time.sleep(random.uniform(0.2, 0.8))  # 模拟处理时间
        except queue.Empty:
            print(f"消费者{consumer_id} 等待超时")
            break
    
    print(f"消费者进程 {consumer_id} 结束")

def producer_process(queue, process_id):
    """生产者进程函数"""
    print(f"生产者进程 {process_id} 启动")
    
    # 在生产者进程中创建多个线程
    producer_threads = []
    for i in range(2):  # 每个进程创建2个生产者线程
        thread = threading.Thread(
            target=producer,
            args=(queue, f"P{process_id}-T{i}")
        )
        producer_threads.append(thread)
        thread.start()
    
    # 等待所有生产者线程完成
    for thread in producer_threads:
        thread.join()
    
    print(f"生产者进程 {process_id} 结束")

if __name__ == "__main__":
    # 创建进程间通信队列
    queue = multiprocessing.Queue(maxsize=10)
    
    # 创建生产者进程
    producer_processes = []
    for i in range(2):
        p = multiprocessing.Process(
            target=producer_process,
            args=(queue, i)
        )
        producer_processes.append(p)
        p.start()
    
    # 创建消费者进程
    consumer_processes = []
    for i in range(2):
        c = multiprocessing.Process(
            target=consumer_process,
            args=(queue, i)
        )
        consumer_processes.append(c)
        c.start()
    
    # 等待所有进程完成
    for p in producer_processes:
        p.join()
    
    for c in consumer_processes:
        c.join()
    
    print("所有生产消费任务完成")

五、性能优化与最佳实践

5.1 资源管理策略

1.合理设置进程和线程数量

import os

# 根据CPU核心数设置进程数
cpu_count = os.cpu_count()
process_pool_size = max(1, cpu_count - 1)  # 留一个核心给系统

# 根据任务类型设置线程数
if task_type == "io_intensive":
    thread_pool_size = 10  # I/O密集型可以更多线程
else:
    thread_pool_size = cpu_count  # CPU密集型不宜过多

2.使用连接池管理资源

from multiprocessing import Pool
import threading
import database  # 假设的数据库模块

# 进程级别的连接池
process_conn_pool = None

def init_process():
    global process_conn_pool
    process_conn_pool = database.ConnectionPool(max_connections=5)

def thread_task(query):
    # 从进程级连接池获取连接
    conn = process_conn_pool.get_connection()
    try:
        result = conn.execute(query)
        return result
    finally:
        process_conn_pool.release_connection(conn)

def process_worker(queries):
    with threading.ThreadPoolExecutor(max_workers=3) as executor:
        results = list(executor.map(thread_task, queries))
    return results

if __name__ == "__main__":
    queries = [f"SELECT * FROM table WHERE id = {i}" for i in range(10)]

    with Pool(processes=2, initializer=init_process) as pool:
        results = pool.map(process_worker, [queries[:5], queries[5:]])

5.2 错误处理与重试机制

import multiprocessing
import threading
import time
from functools import wraps

def retry(max_attempts=3, delay=1):
    """重试装饰器"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            attempts = 0
            while attempts < max_attempts:
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    attempts += 1
                    print(f"尝试 {attempts} 失败: {e}")
                    if attempts < max_attempts:
                        time.sleep(delay)
                    else:
                        raise
        return wrapper
    return decorator

@retry(max_attempts=3, delay=2)
def reliable_thread_task(data):
    """可靠的线程任务"""
    # 模拟可能失败的操作
    if random.random() < 0.3:  # 30%概率失败
        raise ValueError("随机失败")
    
    time.sleep(0.5)
    return f"成功处理: {data}"

def robust_process():
    """健壮的进程函数"""
    try:
        threads = []
        results = []
        
        for i in range(5):
            t = threading.Thread(
                target=lambda: results.append(reliable_thread_task(f"data-{i}"))
            )
            threads.append(t)
            t.start()
        
        for t in threads:
            t.join()
        
        print(f"处理结果: {results}")
        
    except Exception as e:
        print(f"进程失败: {e}")
        # 这里可以添加更复杂的错误处理逻辑

if __name__ == "__main__":
    processes = []
    for i in range(2):
        p = multiprocessing.Process(target=robust_process)
        processes.append(p)
        p.start()
    
    for p in processes:
        p.join()

六、实战应用:Web服务请求处理

import multiprocessing
import threading
import time
import random
from http.server import HTTPServer, BaseHTTPRequestHandler
import json

class RequestHandler(BaseHTTPRequestHandler):
    def do_GET(self):
        # 模拟处理时间
        processing_time = random.uniform(0.1, 1.0)
        time.sleep(processing_time)
        
        self.send_response(200)
        self.send_header('Content-type', 'application/json')
        self.end_headers()
        
        response = {
            'path': self.path,
            'processing_time': processing_time,
            'process': multiprocessing.current_process().name,
            'thread': threading.current_thread().name
        }
        
        self.wfile.write(json.dumps(response).encode())

def run_server(port):
    """运行HTTP服务器"""
    server = HTTPServer(('localhost', port), RequestHandler)
    print(f"服务器在进程 {multiprocessing.current_process().name} 中启动,端口: {port}")
    server.serve_forever()

def health_checker(server_ports):
    """健康检查线程"""
    while True:
        time.sleep(5)
        print(f"健康检查: 服务器进程正常运行,监控端口: {server_ports}")

def server_process(port):
    """服务器进程函数"""
    # 创建服务器线程
    server_thread = threading.Thread(
        target=run_server,
        args=(port,),
        daemon=True
    )
    
    # 创建健康检查线程
    health_thread = threading.Thread(
        target=health_checker,
        args=([port],),
        daemon=True
    )
    
    server_thread.start()
    health_thread.start()
    
    # 等待服务器线程结束
    server_thread.join()

if __name__ == "__main__":
    # 启动多个服务器进程,每个进程在不同的端口上运行
    ports = [8000, 8001, 8002]
    processes = []
    
    for port in ports:
        p = multiprocessing.Process(
            target=server_process,
            args=(port,),
            name=f"ServerProcess-{port}"
        )
        processes.append(p)
        p.start()
    
    print(f"启动了 {len(processes)} 个服务器进程")
    
    try:
        # 主进程保持运行
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        print("正在关闭服务器...")
        for p in processes:
            p.terminate()
        for p in processes:
            p.join()
        print("所有服务器已关闭")

以上就是Python子进程中创建多线程的完整指南的详细内容,更多关于Python子进程创建多线程的资料请关注脚本之家其它相关文章!

相关文章

  • YOLOv5构建安全帽检测和识别系统使用详解

    YOLOv5构建安全帽检测和识别系统使用详解

    这篇文章主要为大家介绍了YOLOv5构建安全帽检测和识别系统使用详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-04-04
  • python 追踪except信息方式

    python 追踪except信息方式

    这篇文章主要介绍了python 追踪except信息方式,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-04-04
  • Python Django搭建文件下载服务器的实现

    Python Django搭建文件下载服务器的实现

    这篇文章主要介绍了Python Django搭建文件下载服务器的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-05-05
  • Python实现一个优先级队列的方法

    Python实现一个优先级队列的方法

    这篇文章主要介绍了Python实现一个优先级队列的方法,文中讲解非常细致,代码帮助大家更好的理解和学习,感兴趣的朋友可以了解下
    2020-07-07
  • python3的print()函数的用法图文讲解

    python3的print()函数的用法图文讲解

    在本篇内容里小编给各位分享的是关于python3的print()函数的用法知识点,对此有需要的朋友们跟着学习下吧。
    2019-07-07
  • Python+Tkinter制作股票数据抓取小程序

    Python+Tkinter制作股票数据抓取小程序

    这篇文章主要为大家详细介绍了如何实现一个Tkinter GUI程序,完成无代码股票抓取!文中的示例代码讲解详细,快跟小编一起动手试一试吧
    2022-08-08
  • 一文详解Python中常用的初等函数(内置函数)

    一文详解Python中常用的初等函数(内置函数)

    初等函数是由基本初等函数经过有限次的四则运算和复合运算所得到的函数,这篇文章主要介绍了Python中常用初等函数(内置函数)的相关资料,文中通过代码介绍的非常详细,需要的朋友可以参考下
    2025-06-06
  • python GUI多行输入文本Text的实现

    python GUI多行输入文本Text的实现

    这篇文章主要介绍了python GUI多行输入文本Text的实现方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-06-06
  • 详解pandas DataFrame的查询方法(loc,iloc,at,iat,ix的用法和区别)

    详解pandas DataFrame的查询方法(loc,iloc,at,iat,ix的用法和区别)

    这篇文章主要介绍了详解pandas DataFrame的查询方法(loc,iloc,at,iat,ix的用法和区别),文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-08-08
  • springboot整合单机缓存ehcache的实现

    springboot整合单机缓存ehcache的实现

    本文主要介绍了springboot整合单机缓存ehcache的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-02-02

最新评论