Java使用WebFlux调用大模型实现智能对话

 更新时间:2025年06月26日 10:26:51   作者:ciku  
这篇文章主要为大家详细介绍了Java如何使用WebFlux调用大模型实现智能对话效果,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下

1.引入依赖

如果使用了tomcat作为容器需要排除tomcat,webflux使用Netty作为容器

     <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-tomcat</artifactId>
                </exclusion>
            </exclusions>

        </dependency>
         <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

2.定义请求类和接收类

AiPromptDto 用于接收用户输入信息

@Data
public class AiPromptDto {
    /**
     * 大模型id
     */
    private String serviceId;
    /**
     * 用户输入
     */
    private String userInput;
    /**
     * sessionId
     */
    private String sessionId;
    /**
     * 请求id
     */
    private String requestId;
    /**
     * 获取token
     */
    private String token;
    }

答案接收对象

@Data
@AllArgsConstructor
public class AnswerChunk {
    /**
     * 返回的内容
     */
    private String content;
    private String sessionId;
}

3.修改application.yml

此处配置response没有缓存,否则可能会阻塞,不会实时返回

reactor:
  netty:
    response:
      buffer-size: 0

4.测试大模型获取数据格式

1.欢迎词
userinput:你好?
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"messageId":"d47cce80-bcf0-49fe-8e23-06bb5ab79af3","messageContent":"消息1:我是一个聊天机器人,这里是我的消息"}
id:[DONE]
data:[DONE]

2.问答
userinput:物料00NY681的库存有多少个?
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"库"}

id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"存"}

id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"中"}

id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"物"}

id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"料"}

id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"0"}

id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"0"}

id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"N"}

id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"Y"}

id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"6"}

id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"8"}

id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"1"}

id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"的"}

id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"数"}

id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"量"}

id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"为"}

id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"1"}

id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"个"}

id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"。"}

id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"inquiryList":"[\"物料00NY681的库存是否充足?\",\"物料00NY681的库存位置在哪里?\",\"如何补充物料00NY681的库存?\"]"}

id:[DONE]
data:[DONE]

5.定义Service接口和实现类

webflux返回Mono或者Flux

public interface AiService {
    /**
     * 根据请求获取流式返回的答案
     * @param request
     * @return
     */
    Flux<AnswerChunk>  processStream(AiPromptDto request);
    }

实现类AIServiceImpl

import org.springframework.web.reactive.function.client.WebClient;
@Service
public class AIServiceImpl implements AiService {
    private final WebClient webClient;
    //初始化webClient,并ssl校验,生产环境不要跳过
    public AIServiceImpl(WebClient.Builder webClientBuilder) {
        // 使用InsecureTrustManagerFactory来信任所有证书
        SslContextBuilder sslContextBuilder = SslContextBuilder.forClient()
                .trustManager(InsecureTrustManagerFactory.INSTANCE);
        HttpClient httpClient = HttpClient.create()
                .secure(sslContextSpec -> sslContextSpec.sslContext(sslContextBuilder))
                .responseTimeout(Duration.ofMinutes(timeout));
        this.webClient = webClientBuilder.clientConnector(
                new ReactorClientHttpConnector(httpClient)
        ).baseUrl(aiForceUrl).build();
    }
  @Override
    public Flux<AnswerChunk> processStream(AiPromptDto request) {
   String body = JSONUtil.toJsonStr(request);//参数都转化为json字符串
   return webClient.post()
                    .uri(aiForceUrl + "/aiforceplatformapi/openapi/llm/debugSse")//大模型地址
                    .bodyValue(body)//body参数
                    .header("token", request.getToken())//设置请求头
                    .header("Content-Type", "application/json")
                    .retrieve()//retrieve 方法会从服务器响应中提取数据
                    .bodyToFlux(String.class)//响应体解析为一个流式的 String 类型序列
                    .map(chunk -> {//解析数据以供存储
                        //System.out.println("chunk = " + chunk);
                        String content = "";
                        // 解析大模型返回数据
                        if (!chunk.contains("[DONE]")) {//结束标志
                            if (chunk.contains("inquiryList")) {//处理返回的关联查询列表
                                content = parseChunk(chunk);
                                finalAnswer[0].setQueryList(content);
                            }else if (chunk.contains("messageId")&&chunk.contains("messageContent")) {//处理返回提示message
                                parseMessage(chunk, messageMap);
                            }  else if (chunk.contains("data")) {//处理返回的问题答案
                                content = parseChunk(chunk);
                                redisTemplate.opsForValue().append(request.getRequestId() + "_result", content);
                            } else if (chunk.contains("question")) {//处理返回question
                                //先删除
                                questionService.deleteQuestionsByPreviousIdAndRequest(questionId, requestId);
                                //保存ai返回的question
                            } else if (chunk.contains("image")) {//处理图片
                                parseImages(chunk, imagesUrl);
                            } else if (chunk.contains("referenceInfo")) {//处理参照信息
                                parseReference(chunk, aiAnswerReferenceList);
                            }
                        } else {
                            // 处理结束
                            end.set("[DONE]");
                            finalAnswer[0].setState("DONE");
                        }
                        if (StringUtils.isEmpty(chunk)) {
                            chunk = "";
                        }
                        return new AnswerChunk(chunk, request.getRequestId());
                    })
                    .doOnComplete(() -> {//答案都完成后存储对应数据到数据库中
                        String finalContent = redisTemplate.opsForValue().get(request.getRequestId() + "_result");
                        redisTemplate.delete(request.getRequestId());
                        //保存答案
                        String returnAnswer = "";
                        JSONObject answer1 = new JSONObject().putOnce("data", finalContent);
                        //具体实现
                    })
                    .onErrorResume(e -> {//错误情况处理
                        finalAnswer[0].setState("FAILED");
                        answerService.saveOrUpdate(finalAnswer[0]);
                        return Flux.error(e);
                    });
}
}

6.定义Controller

@RestController
@RequestMapping("/aiAgent")
public class AiForceController {
   /**
     * 获取内容
     *
     * @param request MediaType.TEXT_EVENT_STREAM_VALUE 流式输出,否则会一次返回
     *                charset=UTF-8 字符集,不设置会乱码
     *                注意:使用get会中文乱码
     * @return
     */
    @PostMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE + ";charset=UTF-8")
    public Flux<ServerSentEvent<String>> streamResponse(@RequestBody AiForcePromptDto request) {
        return aiService.processStream(request)
                .limitRate(100) // 限制每秒最大请求数
                .onBackpressureBuffer(100,//背压策略:缓冲区大小为 100
                buffer -> {
                    logger.warn("Backpressure buffer overflow, dropping {} items", buffer);
                }).publishOn(Schedulers.boundedElastic(),1) // 单线程调度确保顺序
                .flatMap(chunk -> { // 使用 flatMap 将一个异步流中的每个元素映射为另一个流,并将这些流合并为一个单一的流
                    String content = chunk.getContent();
                    if (StringUtils.isNotBlank(content)) {
                        String processedContent = content.replaceAll("`{3}", "\n```"); // 规范代码块格式
                        return Flux.just(ServerSentEvent.<String>builder()
                                .id(request.getRequestId())
                                .data(processedContent)
                                .build());
                    }
                    return Flux.empty();//如果内容为空,就返回空的flux
                }, 1) // 设置并发度为 1,确保逐条发送
                .doOnNext(event -> logger.info("Streaming chunk: {}", event.data())); // 日志记录每次发送的数据
    }
}
// Flux<ServerSentEvent<String>> 实现 SSE(Server-Sent Events),以便客户端可以实时接收服务器推送的消息

7.调用结果

注意:在部署时,如果使用到了nginx需要配置

  • chunked_transfer_encoding off 关闭分块传输,会发送完整的数据
  • proxy_buffering off #禁用代理缓冲,适用于流式传输
  • gzip off ##关闭压缩,数据以未压缩的方式传输
  • add_header Cache-Control “no-cache” header定义无缓存
  • add_header X-Accel-Buffering no;##禁用 Nginx 的缓冲功能,确保数据实时传输

到此这篇关于Java使用WebFlux调用大模型实现智能对话的文章就介绍到这了,更多相关Java智能对话内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Spring mvc整合mybatis(crud+分页插件)操作mysql

    Spring mvc整合mybatis(crud+分页插件)操作mysql

    这篇文章主要介绍了Spring mvc整合mybatis(crud+分页插件)操作mysql的步骤详解,需要的朋友可以参考下
    2017-04-04
  • Java值传递之swap()方法不能交换的解决

    Java值传递之swap()方法不能交换的解决

    这篇文章主要介绍了Java值传递之swap()方法不能交换的解决,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-09-09
  • Java对象深复制与浅复制实例详解

    Java对象深复制与浅复制实例详解

    这篇文章主要介绍了 Java对象深复制与浅复制实例详解的相关资料,需要的朋友可以参考下
    2017-05-05
  • java ThreadPool线程池的使用,线程池工具类用法说明

    java ThreadPool线程池的使用,线程池工具类用法说明

    这篇文章主要介绍了java ThreadPool线程池的使用,线程池工具类用法说明,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-10-10
  • Java 十大排序算法之计数排序刨析

    Java 十大排序算法之计数排序刨析

    计数排序是一个非基于比较的排序算法,该算法于1954年由 Harold H. Seward 提出。它的优势在于在对一定范围内的整数排序时,它的复杂度为Ο(n+k)(其中k是整数的范围),快于任何比较排序算法
    2021-11-11
  • 浅谈spring中isolation和propagation的用法

    浅谈spring中isolation和propagation的用法

    这篇文章主要介绍了浅谈spring中isolation 和propagation的用法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-07-07
  • java web过滤器处理乱码

    java web过滤器处理乱码

    本文主要介绍了java web过滤器处理乱码的方法解析。具有很好的参考价值。下面跟着小编一起来看下吧
    2017-04-04
  • Java内存结构和数据类型

    Java内存结构和数据类型

    本文重点给大家介绍java内存结构和数据类型知识,非常不错,具有参考借鉴价值,需要的朋友参考下
    2016-12-12
  • Java实现较大二进制文件的读、写方法

    Java实现较大二进制文件的读、写方法

    本篇文章主要介绍了Java实现较大二进制文件的读、写方法,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-02-02
  • Java 的可变参数方法详述

    Java 的可变参数方法详述

    这篇文章主要介绍了Java 的可变参数方法,可变参数只能作为函数的最后一个参数,在其前面可以有也可以没有任何其他参数,由于可变参数必须是最后一个参数,所以一个函数最多只能有一个可变参数,下面我们一起进入文章了解更多关于可变参数的内容吧
    2022-02-02

最新评论