基于SpringBoot和Dify实现流式响应输出

 更新时间:2025年03月18日 11:06:35   作者:ZKM!!  
这篇文章主要为大家详细介绍了如何基于SpringBoot和Dify实现流式响应输出效果,文中的示例代码讲解详细,感兴趣的小伙伴可以参考一下

在使用 Dify(假设为某种生成式 AI 模型或服务)结合 Spring Boot 和 WebClient 实现流式输出时,我们需要确保技术栈的版本兼容性,并理解流式输出的核心概念。以下是详细讲解:

1. 技术栈版本要求

Spring Boot 版本要求

最低推荐版本:2.7.x 或 3.x

如果需要支持 HTTP/2 或更高级别的异步处理能力,建议使用 Spring Boot 3.x。

Spring Boot 3.x 基于 Spring Framework 6.x 和 Java 17+,提供了更好的反应式编程支持。

JDK 版本要求

最低推荐版本:Java 11

Spring Boot 2.7.x 支持 Java 8 及以上,但推荐使用 Java 11 或更高版本。

如果使用 Spring Boot 3.x,则必须使用 Java 17 或更高版本,因为 Spring Boot 3.x 已经停止支持 Java 11 以下的版本。

2. 核心概念:流式输出

流式输出(Streaming Output)是指服务器以分块的方式逐步将数据发送到客户端,而不是一次性返回完整的结果。这种方式特别适合处理大文件传输、实时数据流或生成式模型的逐词输出。

在 Spring Boot 中,可以通过以下方式实现流式输出:

  • 使用 ResponseEntity<Flux<?>> 或 ResponseBodyEmitter(适用于同步场景)。
  • 使用 WebClient 的反应式编程模型来处理流式请求和响应。

3. 实现步骤

3.1 添加依赖

确保在 pom.xml 中添加以下依赖项:

 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
 </dependency>

spring-boot-starter-webflux 提供了反应式 Web 编程的支持。

3.2 配置 WebClient

创建一个 WebClient 实例,主要用于设置跨域资源共享(CORS, Cross-Origin Resource Sharing)。它的作用是解决前端和后端在不同域名或端口下通信时的跨域问题。

@Configuration
public class WebConfig implements WebMvcConfigurer {

    static final List<String> ORIGIN_LIST = Arrays.asList(
            // 本地
            "http://localhost:8080",
            "http://127.0.0.1:8080",
            "http://localhost:8888",
            "http://127.0.0.1:8888",
            "http://localhost:8803",
            "http://127.0.0.1:8803"
    );

    @Override
    public void addCorsMappings(CorsRegistry registry) {
        // 配置全局跨域规则
        registry.addMapping("/**") // 允许所有路径的请求
                .allowedOrigins(ORIGIN_LIST.toArray(new String[0])) // 允许的源
                .allowedMethods("GET", "POST", "PUT", "DELETE", "OPTIONS") // 允许的HTTP方法
                .allowedHeaders("Content-Type", "Authorization") // 允许的请求头
                .allowCredentials(true); // 是否允许发送Cookie等凭证信息
    }
}

3.3 实现流式输出控制器

@Slf4j
@RestController
@RequestMapping("/api")
@RequiredArgsConstructor
public class DifyController {
    

    @Value("${portal.chatMessages}")
    private String chatMessages;
    
    private final DifyService difyService;
    
    @GetMapping(value = "/chatMessagesStreaming", produces = "text/event-stream")
    public Flux<StreamResponse> chatMessagesStreaming(HttpServletRequest request,
                                                  @RequestParam(value = "query", required = true) String query,
                                                  @RequestParam(value = "userName", required = true) String userName,
                                                  @RequestParam(value = "conversationId", required = false) String conversationId) throws Exception {
   
	   return difyService.streamingMessage(query, conversationId, userName).doOnNext(response -> {
	        log.info("流式结果:" + response.toString());
	        //workflow_finished节点可以获取完整答案,进行你的逻辑处理
	        if (response.getEvent().equals("workflow_finished")) {
	            log.info("进入workflow_finished阶段");
	            String answer = response.getData().getOutputs().getAnswer();//完整答案
	        }
	         //message_end结束节点,进行你的逻辑处理
	        if (response.getEvent().equals("message_end")) {
	            log.info("进入message_end");
	        }
	
	    });
}

3.4 实现流式输出服务层

java
@Slf4j
@Service
@RequiredArgsConstructor
public class DifyService {

	@Value("${dify.url}")
    private String url;

    @Value("${dify.key}")
    private String apiKey;
	/**
     * 流式调用dify.
     *
     * @param query 查询文本
     * @param conversationId id
     * @param userName  用户名
     * @return Flux 响应流
     */
    public Flux<StreamResponse> streamingMessage(String query, String conversationId, String userName) {
        //1.设置请求体
        DifyRequestBody body = new DifyRequestBody();
        body.setInputs(new HashMap<>());
        body.setQuery(query);
        body.setResponseMode("streaming");
        body.setConversationId("");
        body.setUser(userName);
        if (StringUtils.isNotEmpty(conversationId)) {
            body.setConversationId(conversationId);
        }
        //如果存在自定义入参可以加到如下Map中
        //Map<String, Object> commoninputs = new HashMap<>();
        //commoninputs.put("search_type", searchType);
        //body.setInputs(commoninputs);

        //2.使用webclient发送post请求
        return webClient.post()
                .uri(url)
                .headers(httpHeaders -> {
                    httpHeaders.setContentType(MediaType.APPLICATION_JSON);
                    httpHeaders.setBearerAuth(apiKey);
                })
                .bodyValue(JSON.toJSONString(body))
                .retrieve()
                .bodyToFlux(StreamResponse.class);//实体转换
                .filter(this::shouldInclude) // 过滤掉不需要的数据【根据需求增加】
                //.map(this::convertToCustomResponseAsync) // 异步转换【如果返回格式自定义则通过异步转换实现】
                .onErrorResume(throwable -> {
                	log.info("异常输出:"+throwable.getMessage())
                })
                //.concatWith(Mono.just(createCustomFinalMessage())); // 添加自定义的最终消息【根据需求增加】
    }

    private boolean shouldInclude(StreamResponse streamResponse) {
        // 示例:只要message节点的数据和message_end节点的数据
        if (streamResponse.getEvent().equals("message")
                || streamResponse.getEvent().equals("message_end")) {
            return true;
        }
        return false;
    }

3.4 实现流式输出数据访问层

和dify返回流式输出格式一致

@Data
public class StreamResponse implements Serializable {

    /**
     * 不同模式下的事件类型.
     */
    private String event;

    /**
     * agent_thought id.
     */
    private String id;

    /**
     * 任务ID.
     */
    private String task_id;

    /**
     * 消息唯一ID.
     */
    private String message_id;

    /**
     * LLM 返回文本块内容.
     */
    private String answer;

    /**
     * 创建时间戳.
     */
    private Long created_at;

    /**
     * 会话 ID.
     */
    private String conversation_id;
    
    private StreamResponseData data;
}

@Data
public class StreamResponseData implements Serializable {
    private String id;
    private String workflow_id;
    private String status;
    private Long created_at;
    private Long finished_at;
    private OutputsData outputs;
}

@Data
public class OutputsData implements Serializable {
    private String answer;
}

4. 关键点说明

1.MediaType.TEXT_EVENT_STREAM_VALUE

表示使用 Server-Sent Events (SSE) 协议进行流式传输。

客户端可以通过浏览器或支持 SSE 的工具(如 Postman)接收流式数据。

2.Flux

Flux 是 Reactor 库中的核心类型,表示一个可以包含零个或多个元素的异步序列。

在这里,Flux 表示从 Dify 接收到的逐词或逐句生成的文本流。

3.WebClient 的反应式特性

WebClient 是 Spring 提供的反应式 HTTP 客户端,能够高效处理流式数据。

它不会阻塞线程,而是通过事件驱动的方式逐步处理数据

总结

通过上述步骤,我们可以使用 Spring Boot 和 WebClient 实现流式输出功能。关键在于利用反应式编程模型(Reactor 的 Flux 和 WebClient),以及正确配置流式传输协议(如 SSE)。根据需求选择合适的 Spring Boot 和 JDK 版本,可以确保项目的性能和稳定性。

到此这篇关于基于SpringBoot和Dify实现流式响应输出的文章就介绍到这了,更多相关SpringBoot Dify流式响应输出内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 解决idea 中 SpringBoot 点击运行没反应按钮成灰色的问题

    解决idea 中 SpringBoot 点击运行没反应按钮成灰色的问题

    在使用 Spring Boot 开发项目时,可能会遇到一个问题:点击运行按钮后,控制台没有任何输出,项目界面也没有显示,这种情况可能是由多种原因导致的,本文将介绍一些常见的解决方法,需要的朋友可以参考下
    2023-08-08
  • Java 将PPT幻灯片转为HTML文件的实现思路

    Java 将PPT幻灯片转为HTML文件的实现思路

    本文以Java程序代码为例展示如何通过格式转换的方式将PPT幻灯片文档转为HTML文件,本文通过实例代码图文相结合给大家分享实现思路,需要的朋友参考下吧
    2021-06-06
  • Java 随机取字符串的工具类

    Java 随机取字符串的工具类

    随机数在实际中使用很广泛,比如要随即生成一个固定长度的字符串、数字。或者随即生成一个不定长度的数字、或者进行一个模拟的随机选择等等。Java提供了最基本的工具,可以帮助开发者来实现这一切
    2014-01-01
  • java取出list中某几个属性组成一个新集合的几种方式

    java取出list中某几个属性组成一个新集合的几种方式

    在Java开发中经常需要对List中的对象进行一些操作,例如对某个字段进行过滤、排序等,这篇文章主要给大家介绍了关于java取出list中某几个属性组成一个新集合的几种方式,需要的朋友可以参考下
    2024-03-03
  • Java面向对象之成员隐藏与属性封装操作示例

    Java面向对象之成员隐藏与属性封装操作示例

    这篇文章主要介绍了Java面向对象之成员隐藏与属性封装操作,结合实例形式分析了Java面向对象程序设计中成员的隐藏及属性封装相关实现与使用操作技巧,需要的朋友可以参考下
    2018-06-06
  • Java设计模式之桥模式(Bridge模式)介绍

    Java设计模式之桥模式(Bridge模式)介绍

    这篇文章主要介绍了Java设计模式之桥模式(Bridge模式)介绍,本文讲解了为什么使用桥模式、如何实现桥模式、Bridge模式在EJB中的应用等内容,需要的朋友可以参考下
    2015-03-03
  • SpringBoot中application.properties、application.yaml、application.yml区别

    SpringBoot中application.properties、application.yaml、applicati

    本文主要介绍了SpringBoot中application.properties、application.yaml、application.yml区别,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2024-04-04
  • Java中ArrayList 和 HashMap 自动扩容机制详解

    Java中ArrayList 和 HashMap 自动扩容机制详解

    本文深入分析了Java集合框架中ArrayList和HashMap的自动扩容机制,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2026-03-03
  • Java 深入学习static关键字和静态属性及方法

    Java 深入学习static关键字和静态属性及方法

    这篇文章主要介绍了Java 深入学习static关键字和静态属性及方法,文章通过围绕主题展开详细的内容介绍,具有一定的参考价值,需要的小伙伴可以参考一下
    2022-09-09
  • 关于fastjson的常见API详解

    关于fastjson的常见API详解

    这篇文章主要介绍了关于fastjson的常见API详解,Fastjson是一个Java库,可用于将Java对象转换为其JSON表示,它还可用于将JSON字符串转换为等效的Java对象,Fastjson可以处理任意Java对象,包括您没有源代码的预先存在的对象,需要的朋友可以参考下
    2023-07-07

最新评论