python处理多线程请求接口结果顺序的4种方案

 更新时间:2025年11月12日 08:40:15   作者:用户22176592792  
除了“无序收集+统一排序”的方案一,处理多线程请求接口结果顺序的核心思路是 “确保结果与请求提交顺序对齐” ,以下是 4 种实用方案,大家可以根据需要进行选择

除了“无序收集+统一排序”的方案一,处理多线程请求接口结果顺序的核心思路是 “确保结果与请求提交顺序对齐” ,以下是 4 种实用方案(含进阶优化和第三方库方案),覆盖不同场景需求,且均保证线程安全和并发效率

一、方案一:固定位置存储(无排序,高效实时)

核心逻辑

提前创建一个与请求总数长度一致的结果列表,每个线程携带唯一的“请求索引”,执行完成后直接将结果写入列表的对应索引位置(如任务 5 的结果写入 ​​results[5]​​)。由于索引与提交顺序一一对应,所有线程完成后,列表自然是有序的。

关键优势

  • 无需后续排序,效率最高(省去排序开销);
  • 可实时查看每个任务的执行状态(通过列表非 ​​None​​ 的位置判断);
  • 仅需对“列表写入”加锁,锁粒度极小,不影响并发。

完整代码

import requests
import threading
import time
from typing import List

API_URL = "https://jsonplaceholder.typicode.com/posts/{}"
THREAD_NUM = 10
TOTAL_REQUESTS = 20
TIMEOUT = 5

# 1. 初始化:固定长度的结果列表(与请求顺序对应)+ 互斥锁(保护列表写入)
results: List[tuple] = [None] * TOTAL_REQUESTS  # 初始值为 None,完成后写入结果
lock = threading.Lock()  # 线程安全:避免多线程同时修改同一列表位置

def request_api(index: int):
    """线程执行函数:按索引写入结果到固定位置"""
    url = API_URL.format(index % 10 + 1)
    try:
        response = requests.get(url, timeout=TIMEOUT)
        response.raise_for_status()
        result = (index, True, f"响应:{response.json()['title'][:20]}...")
    except Exception as e:
        result = (index, False, f"失败:{str(e)[:30]}")

    # 2. 加锁写入结果(仅锁定写入操作,不影响请求并发)
    with lock:
        results[index] = result  # 关键:按请求索引写入对应位置

if __name__ == "__main__":
    start_time = time.time()
    threads = []

    # 3. 创建并启动线程(控制线程池大小)
    for i in range(TOTAL_REQUESTS):
        # 限制同时运行的线程数,避免创建过多线程
        if len(threads) >= THREAD_NUM:
            # 等待任意线程完成后再创建新线程
            threading.Thread.join(threading.Thread.wait(threads))
            threads = [t for t in threads if t.is_alive()]
        
        t = threading.Thread(target=request_api, args=(i,), name=f"Thread-{i}")
        t.start()
        threads.append(t)

    # 4. 等待所有线程完成
    for t in threads:
        t.join()

    # 5. 直接按列表顺序输出(已与提交顺序一致)
    print("固定位置存储 - 按请求顺序输出:")
    for idx, is_success, msg in results:
        print(f"任务[{idx}]:{'✅' if is_success else '❌'} {msg}")

    print(f"\n总耗时:{round(time.time() - start_time, 3)}s")

二、方案二:队列(Queue)流式有序处理

核心逻辑

用两个线程安全的队列:

  • 任务队列:按提交顺序存入所有请求索引(0、1、2...);
  • 结果队列:线程执行完成后,将(索引+结果)存入队列;
  • 主线程按“期望索引”(从 0 开始)循环检查结果队列,只输出当前期望的索引结果,非期望结果放回队列,直到所有任务完成。

关键优势

  • 支持流式输出:无需等待所有任务完成,可实时按顺序打印结果;
  • 队列自带线程安全(​​queue.Queue​​ 内部已实现锁),无需手动加锁;
  • 适合实时展示进度(如批量操作时实时打印日志)。

完整代码

import requests
import threading
from queue import Queue
import time

API_URL = "https://jsonplaceholder.typicode.com/posts/{}"
THREAD_NUM = 10
TOTAL_REQUESTS = 20
TIMEOUT = 5

def worker(task_queue: Queue, result_queue: Queue):
    """工作线程:从任务队列取任务,执行后存入结果队列"""
    while not task_queue.empty():
        try:
            index = task_queue.get(timeout=1)  # 非阻塞取任务(超时1秒退出)
            url = API_URL.format(index % 10 + 1)
            try:
                response = requests.get(url, timeout=TIMEOUT)
                response.raise_for_status()
                result = (index, True, f"响应:{response.json()['title'][:20]}...")
            except Exception as e:
                result = (index, False, f"失败:{str(e)[:30]}")
            result_queue.put(result)  # 存入结果队列(无序)
        except Exception:
            break

if __name__ == "__main__":
    start_time = time.time()

    # 1. 初始化队列
    task_queue = Queue()  # 按顺序存入请求索引(0~19)
    result_queue = Queue()  # 存储无序的结果

    # 2. 提交任务(按顺序入队)
    for i in range(TOTAL_REQUESTS):
        task_queue.put(i)

    # 3. 启动工作线程
    threads = [threading.Thread(target=worker, args=(task_queue, result_queue)) for _ in range(THREAD_NUM)]
    for t in threads:
        t.start()

    # 4. 主线程按顺序提取结果(流式输出)
    print("队列流式处理 - 按请求顺序实时输出:")
    expected_index = 0  # 期望的下一个任务索引(从0开始)
    completed = 0  # 已完成的任务数

    while completed < TOTAL_REQUESTS:
        if not result_queue.empty():
            index, is_success, msg = result_queue.get()
            # 匹配期望索引则输出,否则放回队列
            if index == expected_index:
                print(f"任务[{index}]:{'✅' if is_success else '❌'} {msg}")
                expected_index += 1
                completed += 1
            else:
                result_queue.put((index, is_success, msg))  # 未匹配则放回
        else:
            time.sleep(0.01)  # 避免空循环占用CPU

    # 5. 等待所有线程结束
    for t in threads:
        t.join()

    print(f"\n总耗时:{round(time.time() - start_time, 3)}s")

三、方案三:使用 ​​concurrent.futures​​ + 有序结果收集

核心逻辑

​ThreadPoolExecutor​​ 提交任务后会返回一个 ​​Future​​ 列表,该列表的顺序与提交顺序一致(即使任务完成顺序无序)。通过遍历 ​​Future​​ 列表(而非 ​​as_completed​​),直接调用 ​​future.result()​​,会按提交顺序阻塞等待每个任务完成,从而自然得到有序结果。

关键优势

  • 基于 Python 标准库,代码简洁(无需手动管理线程/锁/队列);
  • 本质是“按提交顺序等待结果”,无需排序或额外存储;
  • 适合需要“逐个按顺序处理结果”且不想写复杂逻辑的场景。

完整代码

import requests
from concurrent.futures import ThreadPoolExecutor
import time

API_URL = "https://jsonplaceholder.typicode.com/posts/{}"
THREAD_NUM = 10
TOTAL_REQUESTS = 20
TIMEOUT = 5

def request_api(index: int) -> tuple:
    """单个请求函数:返回(索引,是否成功,结果信息)"""
    url = API_URL.format(index % 10 + 1)
    try:
        response = requests.get(url, timeout=TIMEOUT)
        response.raise_for_status()
        return (index, True, f"响应:{response.json()['title'][:20]}...")
    except Exception as e:
        return (index, False, f"失败:{str(e)[:30]}")

if __name__ == "__main__":
    start_time = time.time()

    # 1. 提交任务并获取 Future 列表(顺序与提交一致)
    with ThreadPoolExecutor(max_workers=THREAD_NUM) as executor:
        future_list = [executor.submit(request_api, i) for i in range(TOTAL_REQUESTS)]

        # 2. 按 Future 列表顺序获取结果(阻塞等待,顺序与提交一致)
        print("ThreadPoolExecutor 有序收集 - 按请求顺序输出:")
        for future in future_list:
            index, is_success, msg = future.result()  # 按提交顺序阻塞等待
            print(f"任务[{index}]:{'✅' if is_success else '❌'} {msg}")

    total_cost = round(time.time() - start_time, 3)
    print(f"\n总耗时:{total_cost}s")

注意

  • 该方案的“有序”是通过“按提交顺序等待每个任务完成”实现的,不会降低并发效率(任务仍在后台并行执行,只是结果获取顺序固定);
  • 若前一个任务未完成,会阻塞等待,直到其完成后再获取下一个任务的结果,适合需要“逐个处理结果”的场景(如按顺序写入数据库)。

四、方案四:第三方库 ​​aiohttp​​(异步并发+有序结果)

核心逻辑

虽然是“异步”而非“多线程”,但 ​​aiohttp​​ 是 IO 密集型接口请求的更优选择(单线程异步并发,无 GIL 影响,效率更高),且天然支持有序结果——异步任务的提交顺序与结果返回顺序一致。

关键优势

  • 并发效率高于多线程(无线程切换开销);
  • 代码简洁,无需处理线程安全问题;
  • 适合高并发接口请求场景(如批量爬取、接口压测)。

完整代码

import aiohttp
import asyncio
import time

API_URL = "https://jsonplaceholder.typicode.com/posts/{}"
TOTAL_REQUESTS = 20
TIMEOUT = 5

async def request_api(session: aiohttp.ClientSession, index: int) -> tuple:
    """异步请求函数"""
    url = API_URL.format(index % 10 + 1)
    try:
        async with session.get(url, timeout=TIMEOUT) as response:
            response.raise_for_status()
            data = await response.json()
            return (index, True, f"响应:{data['title'][:20]}...")
    except Exception as e:
        return (index, False, f"失败:{str(e)[:30]}")

async def main():
    start_time = time.time()

    # 1. 创建异步会话(复用连接,提升效率)
    async with aiohttp.ClientSession() as session:
        # 2. 创建所有异步任务(顺序与提交一致)
        tasks = [request_api(session, i) for i in range(TOTAL_REQUESTS)]
        # 3. 并发执行任务,按提交顺序获取结果
        results = await asyncio.gather(*tasks)  # gather 保证结果顺序与任务顺序一致

    # 4. 输出结果(已有序)
    print("aiohttp 异步并发 - 按请求顺序输出:")
    for idx, is_success, msg in results:
        print(f"任务[{idx}]:{'✅' if is_success else '❌'} {msg}")

    total_cost = round(time.time() - start_time, 3)
    print(f"\n总耗时:{total_cost}s")

if __name__ == "__main__":
    # 兼容 Windows 系统
    if __name__ == "__main__":
        if sys.platform == "win32":
            asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
        asyncio.run(main())

依赖安装

pip install aiohttp

各方案对比&选型建议

方案技术依赖核心优势适用场景
固定位置存储threading + lock效率最高、可实时查进度高并发、需跟踪任务状态
队列流式处理threading + Queue流式输出、无需等待所有任务实时展示进度、边请求边处理
ThreadPoolExecutor 有序收集concurrent.futures代码最简单、标准库支持无需实时输出、按顺序处理结果
aiohttp 异步并发aiohttp + asyncio并发效率最高、无线程安全问题高并发接口请求、爬取、压测

面试&实战关键要点

线程安全是前提:修改共享数据(如列表)必须加锁,或使用线程安全的数据结构(如 ​​queue.Queue​​);

有序的核心是“索引绑定” :无论哪种方案,都需要通过“请求索引”关联任务和结果,确保顺序对齐;

IO 密集型优先选异步:​​aiohttp​​ 异步并发效率高于多线程,且天然有序,是接口请求的最优解;

避免过度设计:简单场景用 ​​ThreadPoolExecutor​​ 有序收集(方案四),复杂场景用队列或固定位置存储,无需追求复杂逻辑。

到此这篇关于python处理多线程请求接口结果顺序的4种方案的文章就介绍到这了,更多相关python处理多线程请求接口结果顺序内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Python calendar模块详情

    Python calendar模块详情

    这篇文章主要介绍了 Python calendar模块,Python 专门为了处理日历提供了calendar日历模块,下面文章基于time模块和datetime模块展开,具有一定的参考价值,需要的朋友可以参考一下
    2021-11-11
  • Python的图像处理库Pillow安装与使用教程

    Python的图像处理库Pillow安装与使用教程

    Pillow库是Python中用于图像处理的开源库,提供了丰富的图像处理功能,如图像读取、保存、裁剪、调整大小、旋转、添加文字等,这篇文章主要给大家介绍了关于Python的图像处理库Pillow安装与使用的相关资料,需要的朋友可以参考下
    2024-04-04
  • python使用OpenCV获取高动态范围成像HDR

    python使用OpenCV获取高动态范围成像HDR

    这篇文章主要介绍了python使用OpenCV获取高动态范围成像HDR,如何使用不同曝光设置拍摄的多张图像创建高动态范围图像HDR,下文吗更详细的内容介绍,需要的小伙伴可以参考一下
    2022-04-04
  • Python实现一键自动分类管理文件

    Python实现一键自动分类管理文件

    经常杂乱无章的文件夹会让我们找不到所想要的文件,所以本文小编特意为大家介绍了如何制作一个可视化GUI界面,通过输入路径一键点击实现文件分门别类的归档,希望对大家有所帮助<BR>
    2024-01-01
  • 在django view中给form传入参数的例子

    在django view中给form传入参数的例子

    今天小编就为大家分享一篇在django view中给form传入参数的例子,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2019-07-07
  • python判断端口是否打开的实现代码

    python判断端口是否打开的实现代码

    python判断端口是否打开的代码,有需要的朋友可以参考下
    2013-02-02
  • python pandas 时间日期的处理实现

    python pandas 时间日期的处理实现

    这篇文章主要介绍了python pandas 时间日期的处理实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-07-07
  • 使用Python实现计算DICOM图像两点真实距离

    使用Python实现计算DICOM图像两点真实距离

    这篇文章主要为大家详细介绍了如何使用Python实现计算DICOM图像两点真实距离,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下
    2023-11-11
  • Python+OpenCV图片局部区域像素值处理详解

    Python+OpenCV图片局部区域像素值处理详解

    这篇文章主要为大家详细介绍了Python+OpenCV图片局部区域像素值处理,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2019-01-01
  • python client使用http post 到server端的代码

    python client使用http post 到server端的代码

    python client使用 http post 到server端的代码,供大家学习参考
    2013-02-02

最新评论