python 实现流式接口返回响应处理示例

 更新时间:2025年12月05日 10:49:55   作者:宥钧  
流式接口设计是一种强大的编程范式,它能够提高代码的可读性、灵活性和可维护性,特别是在现在大模型的应用中很常见,下面就来详解一下python 实现流式接口返回响应处理示例,感兴趣的可以了解一下

一. 前言

在Python中实现流式处理通常涉及到处理大量的数据,这些数据可能来自于网络、磁盘或其他数据源。流式处理的一个关键点是数据不需要一次性加载到内存中,而是逐块处理。

二、设计原则

  • 可读性:流式接口的设计应着重提高代码的可读性。这要求开发者在命名和构造代码时,要考虑到如何使代码更加直观和易于理解。
  • 流畅性:通过链式调用,流式接口可以实现一系列操作的流畅组合,从而提高代码的编写效率和可读性。
  • 灵活性:流式接口允许开发者根据需要动态地添加或修改操作序列,从而提高了代码的灵活性。

三、实现方式

  • 返回对象本身:在流式接口的方法中,通常通过返回对象本身(例如使用this关键字)来维持链式调用的连续性。
  • 泛型与类型安全:在需要处理不同类型数据时,可以使用泛型来保持类型安全,同时提供灵活的操作序列。
  • 上下文管理:流式接口可能需要管理复杂的上下文信息,例如数据库连接、事务状态等。在设计时,应考虑到如何有效地管理这些上下文信息,以确保代码的正确性和稳定性。

四. 代码设计

1.python后端代码:

1.1 Flask服务

import json

import requests
from flask import Flask, Response, stream_with_context, request
import time

from flask_cors import CORS
from gevent.pywsgi import WSGIServer

app = Flask(__name__)

CORS(app, resources={r"/*": {"origins": "*"}})  # 允许所有源访问,但请注意生产环境的安全性


@app.route('/ret_json_data', methods=['GET'])
def ret_json_data():
    print('start request!')
    return {'hello': 'hello flask'}


@app.route('/stream_json_data',  methods=["GET", "POST"])
def stream_json_data():
    json_data = {
        'state': 0,
        'data': [],
        'text': '',
        'message': ''
    }
    ret_str = "大模型通常指的是那些规模庞大、参数数量极多的人工智能模型,它们在训练时使用了海量的数据,能够处理复杂的任务并展现出强大的语言理解和生成能力。"

    def generate():
        for i in range(len(ret_str)):
            json_data["text"] = ret_str[i]
            time.sleep(0.1)  # 暂停一秒,模拟数据产生的时间间隔
            print(f"Data: {json_data}\n")
            yield json.dumps(json_data, ensure_ascii=False)

    print('stream-data')
    return Response(stream_with_context(generate()), content_type='application/json; charset=utf-8')
    # return stream_data(generate())


@app.route('/stream_data_to_frontend')
def stream_data_to_frontend():
    # 假设你有一个外部 URL,该 URL 提供流式数据
    external_url = 'http://127.0.0.1:7000/stream_json_data'

    # 使用 requests 库的流式响应来接收数据
    headers = {"Content-Type": "application/json"}
    response = requests.get(external_url, stream=True, headers=headers)
    # with requests.get(external_url, stream=True, headers=headers) as r:
    #     # print('r========', r.json())
    #     # 检查外部请求是否成功
    if response.status_code == 200:
        # 创建一个生成器来流式传输响应内容
        def generate():
            for chunk in response.iter_content(chunk_size=1024):  # 你可以设置合适的 chunk_size
                if chunk:
                    print('chunk: ', chunk)
                    # yield chunk
                    # yield chunk.decode("utf-8", "ignore")
                    chunk = json.loads(chunk)
                    chunk['tt'] = 'ttttt'
                    yield json.dumps(chunk, ensure_ascii=False)

                    # 创建一个 Flask 响应对象,并使用 stream_with_context 来发送流式响应

        return Response(stream_with_context(generate()), content_type='application/json; charset=utf-8')
    else:
        # 如果外部请求失败,返回错误信息
        return 'Failed to retrieve data from external source', 500


@app.route('/stream_data_to_frontend_test')
def stream_data_to_frontend_test():
    url = "http://ip:port/llm"
    params = {
        "messages": [
            {
                "role": "user",
                "content": "储能公司在该项目攻关中取得了哪些关键性突破?"
            }
        ]
    }
    headers = {"Content-Type": "application/json"}
    response = requests.post(url=url, json=params, stream=True, headers=headers)

    def generate():
        for chunk in response.iter_content(1024):
            print(chunk.decode("utf-8", "ignore"))
            if chunk:
                yield chunk.decode("utf-8", "ignore")

    return Response(stream_with_context(generate()), content_type='application/json; charset=utf-8')


@app.route('/stream_faq_data_to_frontend_test')
def stream_faq_data_to_frontend_test():
    url = 'http://ip:port/url'
    # 请求的数据
    data = {
        "query": "测试"
    }
    headers = {"Content-Type": "application/json"}
    response = requests.post(url=url, json=data, stream=True, headers=headers)

    def generate():
        for chunk in response.iter_content(1024):
            print('chunk', chunk.decode("utf-8", "ignore"))
            if chunk:
                # chunk = json.loads(chunk)
                # chunk = json.dumps(chunk)
                yield chunk
                # yield chunk.decode("utf-8", "ignore")

    return stream_data(generate())


def stream_data(generate):
    """
    :param generate: generator object
    :return:
    """
    return Response(stream_with_context(generate), content_type='application/json; charset=utf-8')


if __name__ == '__main__':
    app.run(host='0.0.0.0', port=7000)
    # server = WSGIServer(('0.0.0.0', 7000), app)
    # server.serve_forever()

1.2 FastAPI服务

基础流式文本输出

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio

app = FastAPI()

async def data_generator():
    for i in range(5):
        # 模拟耗时操作(如 AI 推理、实时数据采集)
        await asyncio.sleep(1)
        yield f"数据块 {i}\n"

@app.get("/stream")
async def stream_data():
    return StreamingResponse(
        data_generator(),
        media_type="text/plain",  # 或 "text/event-stream"(SSE)
    )

流式传输大文件

@app.get("/stream-file")
def stream_large_file():
    def file_iterator():
        with open("large_file.zip", "rb") as f:
            while chunk := f.read(4096):  # 每次读取 4KB
                yield chunk

    return StreamingResponse(
        file_iterator(),
        media_type="application/octet-stream",
        headers={"Content-Disposition": "attachment; filename=large_file.zip"}
    )

Server-Sent Events (SSE)

@app.get("/sse")
async def sse():
    async def event_generator():
        for i in range(5):
            await asyncio.sleep(1)
            # SSE 格式要求
            yield f"data: 事件 {i}\n\n"

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream"
    )

2.前端代码

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Single Request Demo</title>
</head>
<body>

<h1>Single Request Demo</h1>
<pre id="single-output"></pre>

<script>
    fetch('http://127.0.0.1:7000/stream_json_data')
        .then(response => {
            if (!response.ok) {
                throw new Error('Network response was not ok');
            }
            return response.text(); // 或者使用 response.json() 如果后端返回的是JSON
        })
        .then(data => {
            // 将接收到的数据添加到页面上
            var output = document.getElementById('single-output');
            output.textContent += data; // 注意这里使用了 = 而不是 +=,因为我们只想要一次性的内容
        })
        .catch(error => {
            console.error('There has been a problem with your fetch operation:', error);
        });
</script>

</body>
</html>

五. 总结

流式接口设计是一种强大的编程范式,它能够提高代码的可读性、灵活性和可维护性。特别是在现在大模型的应用中很常见。

到此这篇关于python 实现流式接口返回响应处理示例的文章就介绍到这了,更多相关python 流式接口返回响应内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • django数据模型(Model)的字段类型解析

    django数据模型(Model)的字段类型解析

    这篇文章主要介绍了django数据模型(Model)的字段类型,文中给大家提到了django数据模型on_delete, db_constraint的使用,需要的朋友可以参考下
    2019-12-12
  • Python实现Appium端口检测与释放的实现

    Python实现Appium端口检测与释放的实现

    这篇文章主要介绍了Python实现Appium端口检测与释放的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-12-12
  • python 实现客户端与服务端的通信

    python 实现客户端与服务端的通信

    这篇文章主要介绍了python 实现客户端与服务端的通信的方法,帮助大家更好的理解和使用python,感兴趣的朋友可以了解下
    2020-12-12
  • python递归函数使用详解

    python递归函数使用详解

    递归函数是一种在函数内部调用自身的编程技巧。在Python中,我们可以使用递归函数来解决一些需要重复执行相同操作的问题。递归函数通常包含两个部分:基本情况和递归情况。基本情况是指函数停止调用自身的条件,而递归情况是指函数调用自身来解决更小规模的问题。
    2023-09-09
  • Python如何获取免费高匿代理IP及验证

    Python如何获取免费高匿代理IP及验证

    这篇文章主要介绍了Python如何获取免费高匿代理IP及验证问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-06-06
  • python定时任务schedule库用法详细讲解

    python定时任务schedule库用法详细讲解

    python中有一个轻量级的定时任务调度的库schedule,下面这篇文章主要给大家介绍了关于python定时任务schedule库用法的相关资料,文中通过实例代码介绍的非常详细,需要的朋友可以参考下
    2023-01-01
  • 详解Python如何使用并发模型编程

    详解Python如何使用并发模型编程

    这篇文章主要为大家详细介绍了如何让 Python 能够同时处理多个任务,即如何使用并发模型编程,文中的示例代码讲解详细,需要的可以参考一下
    2023-05-05
  • langchain Prompt大语言模型使用技巧详解

    langchain Prompt大语言模型使用技巧详解

    这篇文章主要为大家介绍了langchain Prompt大语言模型使用技巧详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-07-07
  • python读写csv文件方法详细总结

    python读写csv文件方法详细总结

    在本文中小编给各位分享的是关于python读写csv文件方法的详细内容,对此有需要的朋友们跟着学习参考下。
    2019-07-07
  • Python 类,property属性(简化属性的操作),@property,property()用法示例

    Python 类,property属性(简化属性的操作),@property,property()用法示例

    这篇文章主要介绍了Python 类,property属性(简化属性的操作),@property,property()用法,结合实例形式分析了Python类的定义、属性、方法及相关使用技巧,需要的朋友可以参考下
    2019-10-10

最新评论