Python异步请求实战之高效实现批量接口健康检查

 更新时间:2026年06月28日 09:14:22   作者:detayun  
在后端开发、运维工作中,接口健康检查是刚需日常,本文将从零落地一套高可用、可配置、带异常捕获、有限并发的异步接口健康检查工具,可直接用于项目监控、定时巡检、自动化运维场景,有需要的可以了解下

前言:为什么需要异步接口健康检查?

在后端开发、运维工作中,接口健康检查是刚需日常。微服务架构下,一个项目往往包含几十甚至上百个接口,需要定时检测接口是否存活、响应是否正常、状态码是否合规,及时发现服务宕机、超时、报错等问题。

传统的同步检查方案(requests循环请求)存在致命短板:串行阻塞执行。假设单个接口请求超时3秒,100个接口就需要300秒,检查效率极低,无法满足高频、实时的监控需求。

而基于 asyncio + aiohttp 的异步方案,可以实现毫秒级批量并发检测,100个接口的检查耗时等同于单个接口的最大响应时间,效率提升数十倍。

本文从零落地一套高可用、可配置、带异常捕获、有限并发的异步接口健康检查工具,可直接用于项目监控、定时巡检、自动化运维场景。

运行环境:Python3.7+

核心依赖:aiohttp(异步网络请求库)

安装依赖:

pip install aiohttp

一、核心原理:异步为何适合健康检查?

接口健康检查属于纯IO密集型场景,程序绝大部分时间都在等待服务器响应,无需占用CPU资源,完美契合异步IO的工作特性。

1. 同步检查弊端

逐个请求接口,必须等待上一个接口请求完成/超时,才会执行下一个,接口数量越多,总耗时越长,巡检时效性极差。

2. 异步检查优势

利用事件循环调度,IO等待期间切换执行其他接口请求,多接口并发执行;同时可通过信号量限制并发数,避免瞬间大量请求压垮服务器,兼顾效率与服务稳定性。

二、极简版:异步接口健康检查入门

先实现基础功能:批量并发请求接口,记录接口状态、响应时间、异常信息。

import asyncio
import aiohttp
import time

# 待检测的接口列表(可替换为自己的业务接口)
API_LIST = [
    {"name": "百度首页", "url": "https://www.baidu.com", "method": "GET"},
    {"name": "HTTPBIN GET", "url": "https://httpbin.org/get", "method": "GET"},
    {"name": "HTTPBIN POST", "url": "https://httpbin.org/post", "method": "POST"},
    {"name": "无效接口", "url": "https://httpbin.org/error", "method": "GET"},
]

# 全局超时配置(单个接口最大请求时间)
TIMEOUT = aiohttp.ClientTimeout(total=5)


async def check_api(session: aiohttp.ClientSession, api: dict):
    """单个接口健康检测函数"""
    start_time = time.time()
    name = api["name"]
    url = api["url"]
    method = api["method"]

    try:
        if method.upper() == "GET":
            async with session.get(url, timeout=TIMEOUT) as resp:
                response_time = round((time.time() - start_time) * 1000, 2)
                return {
                    "接口名称": name,
                    "接口地址": url,
                    "请求方式": method,
                    "状态": "正常",
                    "状态码": resp.status,
                    "响应耗时(ms)": response_time,
                    "异常信息": ""
                }
        elif method.upper() == "POST":
            async with session.post(url, timeout=TIMEOUT) as resp:
                response_time = round((time.time() - start_time) * 1000, 2)
                return {
                    "接口名称": name,
                    "接口地址": url,
                    "请求方式": method,
                    "状态": "正常",
                    "状态码": resp.status,
                    "响应耗时(ms)": response_time,
                    "异常信息": ""
                }
    except Exception as e:
        response_time = round((time.time() - start_time) * 1000, 2)
        return {
            "接口名称": name,
            "接口地址": url,
            "请求方式": method,
            "状态": "异常",
            "状态码": None,
            "响应耗时(ms)": response_time,
            "异常信息": str(e)
        }


async def main():
    # 创建全局会话(复用连接池,提升效率)
    async with aiohttp.ClientSession(timeout=TIMEOUT) as session:
        # 批量创建异步任务
        tasks = [check_api(session, api) for api in API_LIST]
        # 等待所有任务执行完成,收集结果
        results = await asyncio.gather(*tasks)
    
    # 打印巡检结果
    print("=" * 80)
    print("接口健康巡检结果")
    print("=" * 80)
    for res in results:
        print(f"接口:{res['接口名称']:10} | 状态:{res['状态']} | 状态码:{res['状态码']} | 耗时:{res['响应耗时(ms)']}ms | 异常:{res['异常信息']}")


if __name__ == "__main__":
    start = time.time()
    asyncio.run(main())
    total_time = round(time.time() - start, 2)
    print(f"\n本次巡检总耗时:{total_time}s")

代码核心亮点

  1. 全局会话复用:仅创建一个 ClientSession,复用TCP连接,避免频繁创建销毁连接造成的性能损耗;
  2. 全异常捕获:覆盖超时、连接失败、接口404/500等各类异常,不会因单个接口报错导致整体巡检终止;
  3. 数据可视化:精准记录每个接口的状态、响应耗时、异常原因,结果清晰直观。

三、进阶优化:限制并发,避免压垮服务

上面的基础版本是无限制并发,如果一次性检测几百个接口,瞬间发起大量请求,容易触发服务器限流、防火墙拦截,甚至压垮后端服务。

我们通过 信号量(Semaphore) 限制最大并发数,可控、安全地批量巡检。

import asyncio
import aiohttp
import time

API_LIST = [
    {"name": "百度首页", "url": "https://www.baidu.com", "method": "GET"},
    {"name": "HTTPBIN GET", "url": "https://httpbin.org/get", "method": "GET"},
    {"name": "HTTPBIN POST", "url": "https://httpbin.org/post", "method": "POST"},
    {"name": "无效接口", "url": "https://httpbin.org/error", "method": "GET"},
    {"name": "测试接口1", "url": "https://httpbin.org/delay/1", "method": "GET"},
    {"name": "测试接口2", "url": "https://httpbin.org/delay/2", "method": "GET"},
]

# 核心配置
MAX_CONCURRENT = 3  # 最大并发数,同时最多3个接口请求
TIMEOUT = aiohttp.ClientTimeout(total=5)
# 创建信号量
semaphore = asyncio.Semaphore(MAX_CONCURRENT)


async def check_api(session: aiohttp.ClientSession, api: dict):
    # 限制并发:同一时间最多执行MAX_CONCURRENT个任务
    async with semaphore:
        start_time = time.time()
        name = api["name"]
        url = api["url"]
        method = api["method"]

        try:
            if method.upper() == "GET":
                async with session.get(url, timeout=TIMEOUT) as resp:
                    response_time = round((time.time() - start_time) * 1000, 2)
                    return {
                        "接口名称": name,
                        "接口地址": url,
                        "状态": "正常" if resp.status in [200, 201] else "异常",
                        "状态码": resp.status,
                        "响应耗时(ms)": response_time,
                        "异常信息": ""
                    }
            elif method.upper() == "POST":
                async with session.post(url, timeout=TIMEOUT) as resp:
                    response_time = round((time.time() - start_time) * 1000, 2)
                    return {
                        "接口名称": name,
                        "接口地址": url,
                        "状态": "正常" if resp.status in [200, 201] else "异常",
                        "状态码": resp.status,
                        "响应耗时(ms)": response_time,
                        "异常信息": ""
                    }
        except Exception as e:
            response_time = round((time.time() - start_time) * 1000, 2)
            return {
                "接口名称": name,
                "接口地址": url,
                "状态": "异常",
                "状态码": None,
                "响应耗时(ms)": response_time,
                "异常信息": str(e)
            }


async def main():
    # 自定义连接池,优化并发性能
    connector = aiohttp.TCPConnector(limit=MAX_CONCURRENT)
    async with aiohttp.ClientSession(connector=connector, timeout=TIMEOUT) as session:
        tasks = [check_api(session, api) for api in API_LIST]
        results = await asyncio.gather(*tasks)

    # 统计巡检数据
    normal_count = len([res for res in results if res["状态"] == "正常"])
    error_count = len(results) - normal_count

    # 输出结果
    print("=" * 90)
    print("接口健康巡检报告")
    print("=" * 90)
    for res in results:
        print(f"接口:{res['接口名称']:10} | 状态:{res['状态']} | 状态码:{res['状态码']} | 耗时:{res['响应耗时(ms)']}ms | 异常:{res['异常信息']}")
    
    print(f"\n✅ 正常接口:{normal_count} 个 | ❌ 异常接口:{error_count} 个")


if __name__ == "__main__":
    start = time.time()
    asyncio.run(main())
    print(f"本次巡检总耗时:{round(time.time() - start, 2)}s")

优化点说明

  • 信号量限流:通过 asyncio.Semaphore 控制最大并发数,防止请求泛滥;
  • 状态码校验:自定义正常状态码规则(200/201),精准判断接口健康状态;
  • 连接池优化TCPConnector 适配并发数,最大化复用连接,提升巡检速度;
  • 数据统计:自动统计正常/异常接口数量,生成极简巡检报告。

四、企业级扩展:支持请求头、请求参数、POST传参

实际业务中,大部分接口需要Token 鉴权、JSON参数、表单传参。我们对工具进行扩展,适配复杂业务接口。

import asyncio
import aiohttp
import time

# 支持请求头、请求参数的接口配置
API_LIST = [
    {
        "name": "带鉴权GET接口",
        "url": "https://httpbin.org/headers",
        "method": "GET",
        "headers": {"Authorization": "Bearer test123456", "User-Agent": "Mozilla/5.0"},
        "data": None
    },
    {
        "name": "带参数POST接口",
        "url": "https://httpbin.org/post",
        "method": "POST",
        "headers": {"Content-Type": "application/json"},
        "data": {"username": "admin", "password": "123456"}
    }
]

MAX_CONCURRENT = 3
TIMEOUT = aiohttp.ClientTimeout(total=5)
semaphore = asyncio.Semaphore(MAX_CONCURRENT)


async def check_api(session: aiohttp.ClientSession, api: dict):
    async with semaphore:
        start_time = time.time()
        name = api["name"]
        url = api["url"]
        method = api["method"]
        headers = api.get("headers", {})
        data = api.get("data")

        try:
            if method.upper() == "GET":
                async with session.get(url, headers=headers, timeout=TIMEOUT) as resp:
                    response_time = round((time.time() - start_time) * 1000, 2)
                    return {
                        "接口名称": name, "状态": "正常" if resp.status == 200 else "异常",
                        "状态码": resp.status, "耗时(ms)": response_time, "异常": ""
                    }
            elif method.upper() == "POST":
                async with session.post(url, headers=headers, json=data, timeout=TIMEOUT) as resp:
                    response_time = round((time.time() - start_time) * 1000, 2)
                    return {
                        "接口名称": name, "状态": "正常" if resp.status == 200 else "异常",
                        "状态码": resp.status, "耗时(ms)": response_time, "异常": ""
                    }
        except Exception as e:
            return {
                "接口名称": name, "状态": "异常", "状态码": None,
                "耗时(ms)": round((time.time() - start_time) * 1000, 2), "异常": str(e)
            }


async def main():
    connector = aiohttp.TCPConnector(limit=MAX_CONCURRENT)
    async with aiohttp.ClientSession(connector=connector, timeout=TIMEOUT) as session:
        tasks = [check_api(session, api) for api in API_LIST]
        results = await asyncio.gather(*tasks)

    for res in results:
        print(res)


if __name__ == "__main__":
    asyncio.run(main())

五、定时巡检部署(进阶落地)

接口健康检查的核心价值是持续监控,搭配定时任务即可实现无人值守巡检。可以结合APScheduler 实现秒级/分钟级定时检测,异常接口可对接钉钉/企业微信机器人推送告警消息,快速发现线上问题。

六、常见问题避坑

1. 为什么不能用requests做异步?

requests 是同步阻塞库,不兼容asyncio事件循环,强行使用会堵塞整个异步程序,完全失去并发效果。异步网络请求必须使用 aiohttp

2. 并发数设置多少合适?

内网接口:可设置10-20;公网接口:建议3-5,避免被IP封禁;根据服务器性能灵活调整。

3. 程序偶尔报错连接超时?

属于正常网络波动,代码已捕获异常,不会影响整体巡检;可适当调大timeout超时时间。

七、总结

基于Python异步IO实现接口健康检查,是运维、后端开发的实用高效方案,核心优势总结:

  • 效率极高:IO并发执行,批量巡检耗时大幅缩短;
  • 安全可控:信号量限流,避免请求泛滥压垮服务;
  • 扩展性强:支持鉴权、自定义参数、定时巡检、消息告警;
  • 稳定可靠:全局异常捕获,单接口故障不影响整体任务。

本文提供的代码可直接落地到个人项目、企业运维监控系统,快速搭建轻量化的接口健康监控体系。

以上就是Python异步请求实战之高效实现批量接口健康检查的详细内容,更多关于Python异步接口健康检查的资料请关注脚本之家其它相关文章!

相关文章

  • 使用Python来批量检测并删除Word文档中的宏

    使用Python来批量检测并删除Word文档中的宏

    Word文档作为最常用的电子文档格式之一,经常被用来作为内容分享工具,在网络中或设备之间进行传输,其安全性也需要受到关注,宏是可嵌入Word文档中的一种VBA迷你程序,本文将介绍如何使用Python来批量检测并删除Word文档中的宏,保护计算机的安全,需要的朋友可以参考下
    2024-07-07
  • Python 3.6 中使用pdfminer解析pdf文件的实现

    Python 3.6 中使用pdfminer解析pdf文件的实现

    这篇文章主要介绍了Python 3.6 中使用pdfminer解析pdf文件的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-09-09
  • Windows系统下Chromedriver.exe安装及配置详细教程

    Windows系统下Chromedriver.exe安装及配置详细教程

    ChromeDriver.exe是一款实用的chrome浏览器驱动工具,能够用于自动化测试、网络爬虫和操作浏览器,其主要作用是模拟浏览器操作,下面这篇文章主要给大家介绍了关于Windows系统下Chromedriver.exe安装及配置的相关资料,需要的朋友可以参考下
    2023-11-11
  • 解决pymongo连接数据库报错certificate verify failed:certificate has expired

    解决pymongo连接数据库报错certificate verify failed:certific

    这篇文章主要介绍了解决pymongo连接数据库报错certificate verify failed:certificate has expired问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-01-01
  • python 一篇文章搞懂装饰器所有用法(建议收藏)

    python 一篇文章搞懂装饰器所有用法(建议收藏)

    这篇文章主要介绍了python 一篇文章搞懂装饰器所有用法(建议收藏),文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-08-08
  • 通过Folium在地图上展示数据Python地理可视化的入门示例详解

    通过Folium在地图上展示数据Python地理可视化的入门示例详解

    这篇文章主要介绍了通过Folium在地图上展示数据Python地理可视化的入门,在本文中,我们介绍了如何使用Python中的Folium库进行地理可视化,通过Folium,我们可以轻松地创建交互式地图,并在地图上展示数据、绘制形状、添加图例和文本标签等,需要的朋友可以参考下
    2024-05-05
  • Python sklearn对文本数据进行特征化提取

    Python sklearn对文本数据进行特征化提取

    这篇文章主要介绍了Python sklearn对文本数据进行特征化提取,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习吧
    2023-04-04
  • 利用Python实现批量转换图片格式

    利用Python实现批量转换图片格式

    本文重点介绍普通图片格式怎么相互转换,如jpg格式图片怎么批量转化为png格式,在深度学习项目中,有时我们收集到的数据集图片格式不统一,有的代码支持多种格式图片输入,有的则只支持个别格式,所以这时,我们需要通过脚本来转换图片格式,不说废话,直接上代码
    2025-08-08
  • python 修改本地网络配置的方法

    python 修改本地网络配置的方法

    今天小编就为大家分享一篇python 修改本地网络配置的方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2019-08-08
  • pytest多重断言的实现

    pytest多重断言的实现

    本文主要介绍了pytest多重断言的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-02-02

最新评论