Python处理application/json错误的方法详解

 更新时间:2025年02月24日 09:22:50   作者:Maybe_9527  
这篇文章主要为大家详细介绍了python使用httpx_sse调用sse流式接口对响应格式为application/json的错误信息处理的相关知识,需要的可以了解下

问题描述

调用sse流式接口使用httpx_sse的方式

				import httpx
				from httpx_sse import connect_sse
				# 省略无关代码
				try:
                    with httpx.Client() as client:
                        with connect_sse(client, "GET", url, params=param) as event_source:
                            clear_textbox(response_textbox)
                            # 把 iter_sse() 迭代完, 就相当于处理完了一次流式调用
                            for sse in event_source.iter_sse():
                                # 流式响应中,每次响应体的处理逻辑
                                print(f"generated_answer的值是: '{sse.data}'")
                                response = sse.data
                                if response != '':
                                    # self.response = response
                                    append_text(response_textbox, response)

                except httpx.RequestError as e:
                    print(f"请求错误:{e}")
                except Exception as e:
                    print(f"发生了一个错误:{e}")

httpx_sse的connet_sse源码:

@contextmanager
def connect_sse(
    client: httpx.Client, method: str, url: str, **kwargs: Any
) -> Iterator[EventSource]:
    headers = kwargs.pop("headers", {})
    headers["Accept"] = "text/event-stream"
    headers["Cache-Control"] = "no-store"

    with client.stream(method, url, headers=headers, **kwargs) as response:
        yield EventSource(response)

可以看到connect_sse源码中的headers的"Accept"设置了只接受"text/event-stream"流式结果,正常这么调用是没错的。但是当后端的流式接口因为401权限问题等报错返回了"application/json"格式,如
{ “code”:401, “msg”:“登录过期,请重新登录”, “data”:null} 这样的json格式结果时,以上代码就会报错,因为他不是"text/event-stream"流式响应结果头。那么该怎么办呢?

方案

重新写一个自定义的connect_sse。

import httpx
from httpx_sse import EventSource
from typing import Any, Iterator
from contextlib import contextmanager
import json
# 自定义调用sse接口
    @contextmanager
    def custom_connect_sse(
            self, client: httpx.Client, method: str, url: str, **kwargs: Any
    ) -> Iterator[EventSource]:
        headers = kwargs.pop("headers", {})
        # 只有当没有指定Accept时才添加默认值
        headers["Accept"] = "*/*"
        headers["Cache-Control"] = "no-store"

        with client.stream(method, url, headers=headers, **kwargs) as response:
            content_type = response.headers.get('content-type', '').lower()
            json_flag = False
            if 'text/event-stream' in content_type:
                # 处理SSE流
                yield json_flag, EventSource(response)
            elif 'application/json' in content_type:
                # yield response  # 在这里你可以决定如何进一步处理这个JSON响应
                # 读取并合并所有文本块
                text_data = ''.join([chunk for chunk in response.iter_text()])
                # 解析整个响应体为JSON
                json_data = json.loads(text_data)
                json_flag = True
                yield json_flag, json_data

调用代码

# 使用自定义的connect_sse函数
                try:
                    with httpx.Client() as client:
                        with self.custom_connect_sse(client, "GET", url, params=param, headers=headers) as (json_flag, event_source):
                            if json_flag:
                                code = event_source.get("code")
                                msg = event_source.get("msg")
                                print(f"Code: [code], Message: {msg}")
                            else:
                                full_answer = ""
                                clear_textbox(response_textbox)
                                for sse in event_source.iter_sse():
                                    print(f"generated_answer的值是: '{sse.data}'")
                                    response = sse.data
                                    if response:
                                        append_text(response_textbox, response)
                                        full_answer += response
                                user_record += reply + full_answer + "\n"
                                print(f"user_record:{user_record}")

                except httpx.RequestError as e:
                    print(f"请求错误:{e}")
                except Exception as e:
                    print(f"发生了一个错误:{e}")

关键步骤:

1.设置headers[“Accept”] = “/”,所有响应头都可以接收

2.content_type = response.headers.get(‘content-type’, ‘’).lower() 判断响应头是流式还是json,并用json_flag记录是否json标识,返回不同的结果。如果是json,则循环合并处理chunk块,拼装完整json返回结果(实测第一次就返回完整json结构了,但是代码得这么写)。

3.使用自定义connect_sse方法时,根据json_flag来分别处理成功调用流式结果还是异常的json结果。

到此这篇关于Python处理application/json错误的方法详解的文章就介绍到这了,更多相关Python处理application/json错误内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • argparse 模块简介

    argparse 模块简介

    argparse是一个用来解析命令行参数的 Python 库,它是 Python 标准库的一部分,基于 python 2.7 的stdlib 代码,这篇文章主要介绍了argparse 模块详解,需要的朋友可以参考下
    2023-02-02
  • Python实现扩展内置类型的方法分析

    Python实现扩展内置类型的方法分析

    这篇文章主要介绍了Python实现扩展内置类型的方法,结合实例形式分析了Python嵌入内置类型扩展及子类方式扩展的具体实现技巧,需要的朋友可以参考下
    2017-10-10
  • pytorch permute维度转换方法

    pytorch permute维度转换方法

    今天小编就为大家分享一篇pytorch permute维度转换方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-12-12
  • python实现自动登录人人网并采集信息的方法

    python实现自动登录人人网并采集信息的方法

    这篇文章主要介绍了python实现自动登录人人网并采集信息的方法,涉及Python模拟登陆及正则匹配的相关技巧,需要的朋友可以参考下
    2015-06-06
  • python解压缩文件或文件夹两种常见方式(附代码)

    python解压缩文件或文件夹两种常见方式(附代码)

    这篇文章主要介绍了python解压缩文件或文件夹两种常见方式,Python的zipfile和shutil模块提供了强大的文件和文件夹压缩与解压缩功能,zipfile模块适合精细控制,而shutil模块则快速简单,文中提供了详细的代码示例,需要的朋友可以参考下
    2025-04-04
  • Python制作微信机器人教程详解

    Python制作微信机器人教程详解

    这篇文章主要介绍了Python如何实现微信机器人,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2021-12-12
  • 动态设置django的model field的默认值操作步骤

    动态设置django的model field的默认值操作步骤

    这篇文章主要介绍了动态设置django的model field的默认值操作步骤,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-03-03
  • Flask框架实现给视图函数增加装饰器操作示例

    Flask框架实现给视图函数增加装饰器操作示例

    这篇文章主要介绍了Flask框架实现给视图函数增加装饰器操作,结合实例形式分析了flask框架视图添加装饰器的具体操作方法及相关注意事项,需要的朋友可以参考下
    2018-07-07
  • Python 文件数据读写的具体实现

    Python 文件数据读写的具体实现

    这篇文章主要介绍了Python 文件数据读写的具体实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-01-01
  • python 返回列表中某个值的索引方法

    python 返回列表中某个值的索引方法

    今天小编就为大家分享一篇python 返回列表中某个值的索引方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-11-11

最新评论