Java使用WebFlux调用大模型实现智能对话

 更新时间:2025年06月26日 10:26:51   作者:ciku  
这篇文章主要为大家详细介绍了Java如何使用WebFlux调用大模型实现智能对话效果,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下

1.引入依赖

如果使用了tomcat作为容器需要排除tomcat,webflux使用Netty作为容器

     <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-tomcat</artifactId>
                </exclusion>
            </exclusions>

        </dependency>
         <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

2.定义请求类和接收类

AiPromptDto 用于接收用户输入信息

@Data
public class AiPromptDto {
    /**
     * 大模型id
     */
    private String serviceId;
    /**
     * 用户输入
     */
    private String userInput;
    /**
     * sessionId
     */
    private String sessionId;
    /**
     * 请求id
     */
    private String requestId;
    /**
     * 获取token
     */
    private String token;
    }

答案接收对象

@Data
@AllArgsConstructor
public class AnswerChunk {
    /**
     * 返回的内容
     */
    private String content;
    private String sessionId;
}

3.修改application.yml

此处配置response没有缓存,否则可能会阻塞,不会实时返回

reactor:
  netty:
    response:
      buffer-size: 0

4.测试大模型获取数据格式

1.欢迎词
userinput:你好?
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"messageId":"d47cce80-bcf0-49fe-8e23-06bb5ab79af3","messageContent":"消息1:我是一个聊天机器人,这里是我的消息"}
id:[DONE]
data:[DONE]

2.问答
userinput:物料00NY681的库存有多少个?
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"库"}

id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"存"}

id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"中"}

id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"物"}

id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"料"}

id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"0"}

id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"0"}

id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"N"}

id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"Y"}

id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"6"}

id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"8"}

id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"1"}

id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"的"}

id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"数"}

id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"量"}

id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"为"}

id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"1"}

id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"个"}

id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"。"}

id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"inquiryList":"[\"物料00NY681的库存是否充足?\",\"物料00NY681的库存位置在哪里?\",\"如何补充物料00NY681的库存?\"]"}

id:[DONE]
data:[DONE]

5.定义Service接口和实现类

webflux返回Mono或者Flux

public interface AiService {
    /**
     * 根据请求获取流式返回的答案
     * @param request
     * @return
     */
    Flux<AnswerChunk>  processStream(AiPromptDto request);
    }

实现类AIServiceImpl

import org.springframework.web.reactive.function.client.WebClient;
@Service
public class AIServiceImpl implements AiService {
    private final WebClient webClient;
    //初始化webClient,并ssl校验,生产环境不要跳过
    public AIServiceImpl(WebClient.Builder webClientBuilder) {
        // 使用InsecureTrustManagerFactory来信任所有证书
        SslContextBuilder sslContextBuilder = SslContextBuilder.forClient()
                .trustManager(InsecureTrustManagerFactory.INSTANCE);
        HttpClient httpClient = HttpClient.create()
                .secure(sslContextSpec -> sslContextSpec.sslContext(sslContextBuilder))
                .responseTimeout(Duration.ofMinutes(timeout));
        this.webClient = webClientBuilder.clientConnector(
                new ReactorClientHttpConnector(httpClient)
        ).baseUrl(aiForceUrl).build();
    }
  @Override
    public Flux<AnswerChunk> processStream(AiPromptDto request) {
   String body = JSONUtil.toJsonStr(request);//参数都转化为json字符串
   return webClient.post()
                    .uri(aiForceUrl + "/aiforceplatformapi/openapi/llm/debugSse")//大模型地址
                    .bodyValue(body)//body参数
                    .header("token", request.getToken())//设置请求头
                    .header("Content-Type", "application/json")
                    .retrieve()//retrieve 方法会从服务器响应中提取数据
                    .bodyToFlux(String.class)//响应体解析为一个流式的 String 类型序列
                    .map(chunk -> {//解析数据以供存储
                        //System.out.println("chunk = " + chunk);
                        String content = "";
                        // 解析大模型返回数据
                        if (!chunk.contains("[DONE]")) {//结束标志
                            if (chunk.contains("inquiryList")) {//处理返回的关联查询列表
                                content = parseChunk(chunk);
                                finalAnswer[0].setQueryList(content);
                            }else if (chunk.contains("messageId")&&chunk.contains("messageContent")) {//处理返回提示message
                                parseMessage(chunk, messageMap);
                            }  else if (chunk.contains("data")) {//处理返回的问题答案
                                content = parseChunk(chunk);
                                redisTemplate.opsForValue().append(request.getRequestId() + "_result", content);
                            } else if (chunk.contains("question")) {//处理返回question
                                //先删除
                                questionService.deleteQuestionsByPreviousIdAndRequest(questionId, requestId);
                                //保存ai返回的question
                            } else if (chunk.contains("image")) {//处理图片
                                parseImages(chunk, imagesUrl);
                            } else if (chunk.contains("referenceInfo")) {//处理参照信息
                                parseReference(chunk, aiAnswerReferenceList);
                            }
                        } else {
                            // 处理结束
                            end.set("[DONE]");
                            finalAnswer[0].setState("DONE");
                        }
                        if (StringUtils.isEmpty(chunk)) {
                            chunk = "";
                        }
                        return new AnswerChunk(chunk, request.getRequestId());
                    })
                    .doOnComplete(() -> {//答案都完成后存储对应数据到数据库中
                        String finalContent = redisTemplate.opsForValue().get(request.getRequestId() + "_result");
                        redisTemplate.delete(request.getRequestId());
                        //保存答案
                        String returnAnswer = "";
                        JSONObject answer1 = new JSONObject().putOnce("data", finalContent);
                        //具体实现
                    })
                    .onErrorResume(e -> {//错误情况处理
                        finalAnswer[0].setState("FAILED");
                        answerService.saveOrUpdate(finalAnswer[0]);
                        return Flux.error(e);
                    });
}
}

6.定义Controller

@RestController
@RequestMapping("/aiAgent")
public class AiForceController {
   /**
     * 获取内容
     *
     * @param request MediaType.TEXT_EVENT_STREAM_VALUE 流式输出,否则会一次返回
     *                charset=UTF-8 字符集,不设置会乱码
     *                注意:使用get会中文乱码
     * @return
     */
    @PostMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE + ";charset=UTF-8")
    public Flux<ServerSentEvent<String>> streamResponse(@RequestBody AiForcePromptDto request) {
        return aiService.processStream(request)
                .limitRate(100) // 限制每秒最大请求数
                .onBackpressureBuffer(100,//背压策略:缓冲区大小为 100
                buffer -> {
                    logger.warn("Backpressure buffer overflow, dropping {} items", buffer);
                }).publishOn(Schedulers.boundedElastic(),1) // 单线程调度确保顺序
                .flatMap(chunk -> { // 使用 flatMap 将一个异步流中的每个元素映射为另一个流,并将这些流合并为一个单一的流
                    String content = chunk.getContent();
                    if (StringUtils.isNotBlank(content)) {
                        String processedContent = content.replaceAll("`{3}", "\n```"); // 规范代码块格式
                        return Flux.just(ServerSentEvent.<String>builder()
                                .id(request.getRequestId())
                                .data(processedContent)
                                .build());
                    }
                    return Flux.empty();//如果内容为空,就返回空的flux
                }, 1) // 设置并发度为 1,确保逐条发送
                .doOnNext(event -> logger.info("Streaming chunk: {}", event.data())); // 日志记录每次发送的数据
    }
}
// Flux<ServerSentEvent<String>> 实现 SSE(Server-Sent Events),以便客户端可以实时接收服务器推送的消息

7.调用结果

注意:在部署时,如果使用到了nginx需要配置

  • chunked_transfer_encoding off 关闭分块传输,会发送完整的数据
  • proxy_buffering off #禁用代理缓冲,适用于流式传输
  • gzip off ##关闭压缩,数据以未压缩的方式传输
  • add_header Cache-Control “no-cache” header定义无缓存
  • add_header X-Accel-Buffering no;##禁用 Nginx 的缓冲功能,确保数据实时传输

到此这篇关于Java使用WebFlux调用大模型实现智能对话的文章就介绍到这了,更多相关Java智能对话内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • FastJson时间格式化问题避坑经验分享

    FastJson时间格式化问题避坑经验分享

    这篇文章主要为大家介绍了FastJson时间格式化问题避坑经验分享,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-08-08
  • java环境变量的配置方法图文详解【win10环境为例】

    java环境变量的配置方法图文详解【win10环境为例】

    这篇文章主要介绍了java环境变量的配置方法,结合图文形式详细分析了win10环境下java环境变量的配置方法与相关操作注意事项,需要的朋友可以参考下
    2020-04-04
  • Java的idea连接mongodb数据库的详细教程

    Java的idea连接mongodb数据库的详细教程

    这篇文章主要介绍了Java的idea连接mongodb数据库的详细教程,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-11-11
  • Java基础之并发相关知识总结

    Java基础之并发相关知识总结

    随着摩尔定律逐步失效,cpu单核性能达到瓶颈,并发逐渐逐渐得到广泛应用,因而学习了解以及使用并发就显得十分重要,但并发相关的知识比较琐碎,不易系统学习,因而本篇文章参照王宝令老师《Java并发编程》来勾勒出一张“并发全景图”,需要的朋友可以参考下
    2021-05-05
  • Jackson序列化和反序列化忽略字段操作

    Jackson序列化和反序列化忽略字段操作

    这篇文章主要介绍了Jackson序列化和反序列化忽略字段操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-09-09
  • Spring JPA 增加字段执行异常问题及解决

    Spring JPA 增加字段执行异常问题及解决

    这篇文章主要介绍了Spring JPA 增加字段执行异常问题及解决,具有很好的参考价值,
    2022-06-06
  • Java CPU性能分析工具代码实例

    Java CPU性能分析工具代码实例

    这篇文章主要介绍了Java CPU性能分析工具代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-01-01
  • Java lombok中@Accessors注解三个属性的作用

    Java lombok中@Accessors注解三个属性的作用

    这篇文章主要介绍了Java lombok的@Accessors注解属性解析,该注解主要作用是:当属性字段在生成 getter 和 setter 方法时,做一些相关的设置,需要的朋友可以参考下
    2023-05-05
  • java中的SpringBoot框架

    java中的SpringBoot框架

    这篇文章主要介绍了java学习之SpringBoot框架,文章基于Java的相关资料展开详细的内容介绍,具有一定的参考价值,需要的小伙伴可以参考一下
    2022-04-04
  • Springboot配置suffix指定mvc视图的后缀方法

    Springboot配置suffix指定mvc视图的后缀方法

    这篇文章主要介绍了Springboot配置suffix指定mvc视图的后缀方法,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-07-07

最新评论