SpringBoot 2.x 接入非标准SSE格式大模型流式响应的实战解决方案

 更新时间:2025年02月25日 08:51:25   作者:fengzeng  
本文介绍了在SpringBoot2.7.3环境中接入非标准SSE格式大模型流式响应的实战解决方案,通过自定义实现,解决了大模型返回数据格式不符合标准SSE规范的问题,关键步骤包括引入Gradle依赖、配置WebClient、处理粘包、格式兼容和双重过滤机制,感兴趣的朋友跟随小编一起看看吧

近期DeepSeek等国产大模型热度持续攀升,其关注度甚至超过了OpenAI(被戏称为CloseAI)。在SpringBoot3.x环境中,可以使用官方的Spring AI轻松接入,但对于仍在使用JDK8SpringBoot2.7.3的企业级应用来说,往往需要自定义实现。特别是当大模型团队返回的数据格式不符合标准SSE规范时,更需要灵活处理。本文将分享我们的实战解决方案。

📦 引入Gradle依赖

核心依赖说明:

  • spring-boot-starter-web:基础Web支持
  • spring-boot-starter-webflux:响应式编程支持(WebClient所在模块)
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.boot:spring-boot-starter-webflux'

🌐 WebClient配置要点

初始化时特别注意Header配置:

@Bean
public WebClient init() {
    return WebClient.builder()
            .baseUrl(baseUrl)
            .defaultHeader(HttpHeaders.AUTHORIZATION, "Bearer " + openAi)
            // ⚠️ 必须设置为JSON格式
            .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
            .build();
}

🚨 关键踩坑点:初始设置MediaType.TEXT_EVENT_STREAM_VALUE会导致请求失败,必须使用APPLICATION_JSON_VALUE

🧠 核心处理逻辑

流式请求入口

@GetMapping(value = "/stream/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamChatEnhanced(@RequestParam("prompt") String prompt) {
    // 请求体构建
    String requestBody = String.format("""
        {
            "model": "%s",
            "messages": [{"role": "user", "content": "%s"}],
            "stream": true
        }
        """, model, prompt);
    return webClient.post()
            // 请求配置
            .uri("/v1/chat/completions")
            .bodyValue(requestBody)
            .accept(MediaType.TEXT_EVENT_STREAM)
            .retrieve()
            .bodyToFlux(DataBuffer.class)  // 🔑 关键配置点
            .transform(this::processStream)
            // 重试和超时配置
            .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)))
            .timeout(Duration.ofSeconds(180));
            // 错误处理
            .doOnError(e -> log.error("Stream error", e))
            .doFinally(signal -> log.info("Stream completed: {}", signal));
}

技术原理说明

当使用bodyToFlux(DataBuffer.class)时:

  • ✅ 获得原始字节流控制权
  • ❌ 避免自动SSE格式解析(适用于非标准响应)
  • 📡 动态数据流处理:类似Java Stream,但数据持续追加

🔧 非标准SSE数据处理

核心处理流程

private Flux<String> processStream(Flux<DataBuffer> dataBufferFlux) {
    return dataBufferFlux
            .transform(DataBufferUtils::join)          // 字节流合并
            .map(buffer -> {                          // 字节转字符串
                String content = buffer.toString(StandardCharsets.UTF_8);
                DataBufferUtils.release(buffer);
                return content;
            })
            .flatMap(content ->                       // 处理粘包问题
                Flux.fromArray(content.split("\\r?\\n\\r?\\n")))
            .filter(event -> !event.trim().isEmpty()) // 过滤空事件
            .map(event -> {                           // 格式标准化处理
                String trimmed = event.trim();
                if (trimmed.startsWith("data:")) {
                    String substring = trimmed.substring(5);
                    return substring.startsWith(" ") ? substring.substring(1) : substring;
                }
                return trimmed;
            })
            .filter(event -> !event.startsWith("data:")); // 二次过滤
}

三大关键技术点

  • 粘包处理通过split("\\r?\\n\\r?\\n")解决网络传输中的消息边界问题,示例原始数据:

    data:{response1}\n\ndata:{response2}\n\n
  • 格式兼容处理自动去除服务端可能返回的data:前缀,同时保留Spring自动添加SSE前缀的能力

  • 双重过滤机制确保最终输出不包含任何残留的SSE格式标识

⚠️ 特别注意

当接口设置produces = MediaType.TEXT_EVENT_STREAM_VALUE时:

  • Spring WebFlux会自动添加data: 前缀

  • 前端收到的格式示例:

    data: {实际内容}
  • 若手动添加

    data: 

    前缀会导致重复:

    data: data: {错误内容}  // ❌ 错误格式
    

🛠️ 完整实现代码

// 包声明和导入...
@Service
@Slf4j
public class OpenAiService {
    // 配置项和初始化
    private String openAiApiKey = "sk-xxxxxx";
    private String baseUrl = "https://openai.com/xxxx";
    private String model = "gpt-4o";
    private WebClient webClient;
    @PostConstruct
    public void init() {
        webClient = WebClient.builder()
                .baseUrl(baseUrl)
                .defaultHeader(HttpHeaders.AUTHORIZATION, "Bearer " + openAiApiKey)
                .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
                .build();
    }
    @GetMapping(value = "/stream/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> streamChatEnhanced(@RequestParam("prompt") String prompt) {
        // 构建请求体
        String requestBody = String.format("""
                {
                    "model": "gpt-4o-mini",
                    "messages": [{"role": "user", "content": "%s"}],
                    "stream": true
                }
                """, prompt);
        // 发送流式请求
        return webClient.post()
            .uri("/v1/chat/completions")
            .bodyValue(requestBody)
            .retrieve()
            .onStatus(HttpStatusCode::isError, response ->
                    response.bodyToMono(String.class)
                            .flatMap(error -> Mono.error(new RuntimeException("API Error: " + error)))
            )
            .bodyToFlux(DataBuffer.class)
            .transform(this::processStream)
            .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)))
            .timeout(Duration.ofSeconds(180))
            .doOnError(e -> log.error("Stream error", e))
            .doFinally(signal -> log.info("Stream completed: {}", signal));
    }
    private Flux<String> processStream(Flux<DataBuffer> dataBufferFlux) {
        return dataBufferFlux
                // 使用字节流处理
                .transform(DataBufferUtils::join)
                .map(buffer -> {
                    String content = buffer.toString(StandardCharsets.UTF_8);
                    DataBufferUtils.release(buffer);
                    return content;
                })
                // 按 SSE 事件边界,防止粘包的问题
                .flatMap(content -> Flux.fromArray(content.split("\\r?\\n\\r?\\n")))
                // 过滤空事件
                .filter(event -> !event.trim().isEmpty())
                // 规范 SSE 事件格式
                .map(event -> {
                    String trimmed = event.trim();
                    // 由于webflux设置了"produces = MediaType.TEXT_EVENT_STREAM_VALUE",
                    // 所以在返回数据时会自动添加“data:”,因此如果返回的格式带了“data:”需要手动去除
                    if (trimmed.startsWith("data:")) {
                        trimmed = trimmed.replaceFirst("data:","").trim();
                    }
                    return trimmed;
                })
                .filter(event -> !event.startsWith("data:"));
    }
}

到此这篇关于SpringBoot 2.x 接入非标准SSE格式大模型流式响应实践的文章就介绍到这了,更多相关SpringBoot 2.x 接入非标准SSE格式大模型流式响应内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Spring远程加载配置的实现方法详解

    Spring远程加载配置的实现方法详解

    这篇文章主要介绍了Spring远程加载配置的实现方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习吧
    2023-03-03
  • Spring Boot 集成PageHelper的使用方法

    Spring Boot 集成PageHelper的使用方法

    这篇文章主要介绍了Spring Boot 集成PageHelper的使用方法,文章内容围绕主题展开详细介绍,需要的小伙伴可以参考一下,希望对你的学习有所帮助
    2022-04-04
  • Java基础之枚举Enum类案例详解

    Java基础之枚举Enum类案例详解

    这篇文章主要介绍了Java基础之枚举Enum类案例详解,文中有非常详细的代码示例,对正在学习java基础的小伙伴们有很好的帮助,需要的朋友可以参考下
    2021-05-05
  • idea热部署插件jrebel正式版及破解版安装详细图文教程

    idea热部署插件jrebel正式版及破解版安装详细图文教程

    这篇文章主要介绍了idea热部署插件jrebel正式版及破解版安装详细教程,本文通过图文并茂的形式给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-12-12
  • EasyExcel实现读写Excel文件的示例代码

    EasyExcel实现读写Excel文件的示例代码

    EasyExcel是阿里巴巴开源的一个excel处理框架,以使用简单、节省内存著称。它可以在尽可能节约内存的情况下支持读写百M的Excel,所以本文就将利用它实现读写Excel文件,感兴趣的可以了解一下
    2022-08-08
  • IDEA之MyBatisX使用的图文步骤

    IDEA之MyBatisX使用的图文步骤

    本文主要介绍了IDEA之MyBatisX使用,文中通过图文示例介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2025-05-05
  • SpringBoot实现缓存预热的几种常用方案

    SpringBoot实现缓存预热的几种常用方案

    缓存预热是指在 Spring Boot 项目启动时,预先将数据加载到缓存系统(如 Redis)中的一种机制,本文给大家介绍了SpringBoot实现缓存预热的几种常用方案,并通过代码示例讲解的非常详细,需要的朋友可以参考下
    2024-02-02
  • Java 将list集合数据按照时间字段排序的方法

    Java 将list集合数据按照时间字段排序的方法

    这篇文章主要介绍了Java 将list集合数据按照时间字段排序,本文通过示例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2023-03-03
  • Springboot实现Java阿里短信发送代码实例

    Springboot实现Java阿里短信发送代码实例

    这篇文章主要介绍了springboot实现Java阿里短信发送代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-02-02
  • Java实现发送短信验证码功能

    Java实现发送短信验证码功能

    这篇文章主要为大家详细介绍了Java实现发送短信验证码功能,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-11-11

最新评论