Golang中SSE协议实现方法
Gin流式返回实现方法
🧑💻 User
gin框架如何做到流式返回
🤖 Assistant
在 Gin 框架中实现流式返回主要通过以下几个关键技术点:
1. 使用 Server-Sent Events (SSE)
SSE 是实现流式返回的常用方式:
package main
import (
"fmt"
"net/http"
"time"
"github.com/gin-gonic/gin"
)
func main() {
r := gin.Default()
r.GET("/stream", func(c *gin.Context) {
// 设置响应头
c.Header("Content-Type", "text/event-stream")
c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive")
c.Header("Access-Control-Allow-Origin", "*")
// 创建通道用于发送数据
messageChan := make(chan string)
// 模拟数据生成
go func() {
defer close(messageChan)
for i := 0; i < 10; i++ {
message := fmt.Sprintf("消息 %d: 当前时间 %v", i, time.Now().Format("15:04:05"))
messageChan <- message
time.Sleep(1 * time.Second)
}
}()
// 流式返回数据
c.Stream(func(w io.Writer) bool {
if message, ok := <-messageChan; ok {
// SSE 格式: data: {message}\n\n
c.SSEvent("message", message)
return true
}
return false
})
})
r.Run(":8080")
}2. 手动控制响应流
更底层的方式是直接操作 ResponseWriter:
r.GET("/stream-data", func(c *gin.Context) {
c.Header("Content-Type", "text/plain")
c.Header("Transfer-Encoding", "chunked")
flusher, ok := c.Writer.(http.Flusher)
if !ok {
c.String(http.StatusInternalServerError, "Streaming not supported")
return
}
for i := 0; i < 10; i++ {
fmt.Fprintf(c.Writer, "数据块 %d\n", i)
flusher.Flush() // 立即发送到客户端
time.Sleep(1 * time.Second)
}
})3. 结合上下文取消机制
处理客户端中断连接的情况:
r.GET("/stream-with-context", func(c *gin.Context) {
c.Header("Content-Type", "text/event-stream")
c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive")
flusher, _ := c.Writer.(http.Flusher)
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-c.Request.Context().Done():
// 客户端断开连接
fmt.Println("客户端断开连接")
return
case t := <-ticker.C:
data := fmt.Sprintf("时间: %s", t.Format("15:04:05"))
c.SSEvent("update", data)
flusher.Flush()
}
}
})4. JSON 流式返回
对于结构化数据的流式返回:
r.GET("/stream-json", func(c *gin.Context) {
c.Header("Content-Type", "application/x-ndjson") // 换行分隔的JSON
flusher, _ := c.Writer.(http.Flusher)
for i := 0; i < 5; i++ {
data := gin.H{
"id": i,
"time": time.Now().Unix(),
"data": fmt.Sprintf("项目 %d", i),
}
c.Render(-1, gin.JSON{Data: data})
fmt.Fprintln(c.Writer) // 添加换行符
flusher.Flush()
time.Sleep(1 * time.Second)
}
})5. 文件流式下载
大文件的流式下载:
r.GET("/download-large-file", func(c *gin.Context) {
filePath := "./large-file.txt"
c.Header("Content-Type", "application/octet-stream")
c.Header("Content-Disposition", "attachment; filename=large-file.txt")
file, err := os.Open(filePath)
if err != nil {
c.String(http.StatusNotFound, "文件不存在")
return
}
defer file.Close()
// 获取文件信息
fileInfo, _ := file.Stat()
c.Header("Content-Length", fmt.Sprintf("%d", fileInfo.Size()))
// 流式传输文件内容
io.Copy(c.Writer, file)
})6. 前端示例
对应的前端代码:
<!DOCTYPE html>
<html>
<head>
<title>SSE 示例</title>
</head>
<body>
<div id="messages"></div>
<script>
const eventSource = new EventSource('/stream');
eventSource.onmessage = function(event) {
const messageDiv = document.createElement('div');
messageDiv.textContent = event.data;
document.getElementById('messages').appendChild(messageDiv);
};
eventSource.onerror = function(event) {
console.error('SSE 错误:', event);
};
</script>
</body>
</html>关键要点
- 设置正确的响应头:
text/event-stream、no-cache、keep-alive - 使用 Flusher:确保数据立即发送到客户端
- 处理连接中断:通过上下文监听客户端断开
- 错误处理:妥善处理流式传输中的错误
- 资源清理:确保 goroutine 和连接正确关闭
这种方式特别适合实时数据推送、大文件下载、长时间运行的任务进度更新等场景。
🧑💻 User
这个服务是如何返回的:
data: {"id":"019b84c169efb22b959b99a7914e4712","object":"chat.completion.chunk","created":1767458826,"model":"deepseek-ai/DeepSeek-V3.2","choices":[{"index":0,"delta":{"content":"","reasoning_content":null,"role":"assistant"},"finish_reason":"stop"}],"system_fingerprint":"","usage":{"prompt_tokens":23,"completion_tokens":19,"total_tokens":42,"completion_tokens_details":{"reasoning_tokens":0}}}
data: [DONE]
🤖 Assistant
这是一个典型的 Server-Sent Events (SSE) 流式响应格式。让我详细解析这种返回方式的工作原理:
响应格式解析
// 每条数据都以 "data: " 开头,后跟 JSON 数据
data: {"id":"019b84c169efb22b959b99a7914e4712","object":"chat.completion.chunk","created":1767458826,"model":"deepseek-ai/DeepSeek-V3.2","choices":[{"index":0,"delta":{"content":"服务。","reasoning_content":null,"role":"assistant"},"finish_reason":null}],"usage":{...}}
// 最后以 [DONE] 结束
data: [DONE]Gin 框架实现代码示例
package main
import (
"encoding/json"
"fmt"
"net/http"
"time"
"github.com/gin-gonic/gin"
)
func main() {
r := gin.Default()
r.GET("/chat/stream", func(c *gin.Context) {
// 设置 SSE 响应头
c.Header("Content-Type", "text/event-stream")
c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive")
c.Header("Access-Control-Allow-Origin", "*")
// 检查是否支持流式传输
flusher, ok := c.Writer.(http.Flusher)
if !ok {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Streaming not supported"})
return
}
// 模拟 AI 对话的逐词返回
messages := []string{"服务。", "请问有什么", "可以帮", "您的吗", "?那我", "先退", "下了。"}
for i, message := range messages {
// 构建响应数据
response := gin.H{
"id": "019b84c169efb22b959b99a7914e4712",
"object": "chat.completion.chunk",
"created": time.Now().Unix(),
"model": "deepseek-ai/DeepSeek-V3.2",
"choices": []gin.H{
{
"index": 0,
"delta": gin.H{
"content": message,
"reasoning_content": nil,
"role": "assistant",
},
"finish_reason": nil,
},
},
"usage": gin.H{
"prompt_tokens": 23,
"completion_tokens": 5 + i*2, // 模拟token计数
"total_tokens": 28 + i*2,
"completion_tokens_details": gin.H{
"reasoning_tokens": 0,
},
},
}
// 转换为 JSON
jsonData, _ := json.Marshal(response)
// 按照 SSE 格式写入:data: {json}\n\n
fmt.Fprintf(c.Writer, "data: %s\n\n", string(jsonData))
flusher.Flush() // 立即刷新到客户端
time.Sleep(200 * time.Millisecond) // 模拟处理延迟
}
// 发送结束标志
fmt.Fprintf(c.Writer, "data: [DONE]\n\n")
flusher.Flush()
})
r.Run(":8080")
}更完整的实现(包含错误处理)
func streamingChatHandler(c *gin.Context) {
// SSE 头设置
c.Header("Content-Type", "text/event-stream")
c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive")
c.Header("Access-Control-Allow-Origin", "*")
flusher, ok := c.Writer.(http.Flusher)
if !ok {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Streaming not supported"})
return
}
// 监听客户端断开连接
ctx := c.Request.Context()
// 使用 Gin 的 Stream 方法(推荐)
c.Stream(func(w io.Writer) bool {
select {
case <-ctx.Done():
return false // 客户端断开,停止流式传输
default:
// 生成聊天数据
chunk := generateChatChunk()
if chunk == nil {
// 发送结束信号
c.SSEvent("", "[DONE]")
return false
}
// 发送数据块
c.SSEvent("", chunk)
return true
}
})
}
func generateChatChunk() interface{} {
// 这里可以连接真实的 AI 模型
// 返回数据块结构
return map[string]interface{}{
"id": generateID(),
"object": "chat.completion.chunk",
"choices": []map[string]interface{}{
{
"delta": map[string]interface{}{
"content": getNextWord(),
},
},
},
}
}前端接收示例
<!DOCTYPE html>
<html>
<head>
<title>SSE Chat</title>
</head>
<body>
<div id="chat-output"></div>
<script>
const chatOutput = document.getElementById('chat-output');
const eventSource = new EventSource('/chat/stream');
eventSource.onmessage = function(event) {
if (event.data === '[DONE]') {
eventSource.close();
return;
}
try {
const data = JSON.parse(event.data);
if (data.choices && data.choices[0].delta.content) {
chatOutput.innerHTML += data.choices[0].delta.content;
}
} catch (error) {
console.error('解析错误:', error);
}
};
eventSource.onerror = function(event) {
console.error('SSE 错误:', event);
eventSource.close();
};
</script>
</body>
</html>关键技术点
- SSE 格式:每条消息以
data:开头,以\n\n结束 - 流式刷新:使用
Flusher立即发送数据到客户端 - 连接管理:监听上下文取消以处理客户端断开
- 数据分块:将完整响应拆分为多个小块逐步发送
- 结束标志:使用
[DONE]标记流式传输结束
这种模式广泛应用于 ChatGPT、Claude 等 AI 服务的流式对话功能。
🧑💻 User
golang发起请求是流式输出的时候可不可以支持流式print出来,而不是等待全部结束才输出
🤖 Assistant
当然可以!Golang 发起请求时也可以实现流式输出,而不是等待全部结束才输出。以下是几种实现方式:
1. 使用 HTTP 标准库实现流式读取
package main
import (
"bufio"
"fmt"
"io"
"net/http"
"time"
)
func main() {
// 创建请求
req, err := http.NewRequest("GET", "http://localhost:8080/chat/stream", nil)
if err != nil {
panic(err)
}
// 设置接受流式响应
req.Header.Set("Accept", "text/event-stream")
req.Header.Set("Cache-Control", "no-cache")
// 发送请求
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
panic(err)
}
defer resp.Body.Close()
// 创建读取器
reader := bufio.NewReader(resp.Body)
fmt.Println("开始接收流式响应:")
for {
// 逐行读取
line, err := reader.ReadBytes('\n')
if err != nil {
if err == io.EOF {
break
}
fmt.Printf("读取错误: %v\n", err)
break
}
// 处理 SSE 格式的数据行
lineStr := string(line)
if len(lineStr) > 6 && lineStr[:6] == "data: " {
data := lineStr[6:]
if data == "[DONE]\n" {
fmt.Println("\n流式传输结束")
break
}
// 这里可以解析 JSON 并提取内容
fmt.Printf("收到数据块: %s", data)
}
}
}2. 完整的 SSE 客户端实现
package main
import (
"bufio"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
)
type StreamResponse struct {
ID string `json:"id"`
Object string `json:"object"`
Choices []struct {
Delta struct {
Content string `json:"content"`
} `json:"delta"`
} `json:"choices"`
}
func main() {
streamSSE("http://localhost:8080/chat/stream")
}
func streamSSE(url string) {
req, _ := http.NewRequest("GET", url, nil)
req.Header.Set("Accept", "text/event-stream")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
fmt.Printf("请求错误: %v\n", err)
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
fmt.Printf("HTTP错误: %s\n", resp.Status)
return
}
scanner := bufio.NewScanner(resp.Body)
var fullResponse strings.Builder
fmt.Print("AI回复: ")
for scanner.Scan() {
line := scanner.Text()
if line == "" {
continue // 跳过空行
}
if strings.HasPrefix(line, "data: ") {
data := line[6:]
if data == "[DONE]" {
fmt.Println("\n\n=== 流式传输完成 ===")
break
}
// 解析 JSON
var chunk StreamResponse
if err := json.Unmarshal([]byte(data), &chunk); err != nil {
fmt.Printf("JSON解析错误: %v\n", err)
continue
}
// 输出内容
if len(chunk.Choices) > 0 && chunk.Choices[0].Delta.Content != "" {
content := chunk.Choices[0].Delta.Content
fmt.Print(content)
fullResponse.WriteString(content)
}
}
}
if err := scanner.Err(); err != nil {
fmt.Printf("读取流错误: %v\n", err)
}
fmt.Printf("\n完整回复: %s\n", fullResponse.String())
}3. 使用 Context 支持取消操作
package main
import (
"bufio"
"context"
"encoding/json"
"fmt"
"net/http"
"os"
"os/signal"
"strings"
"syscall"
"time"
)
func main() {
// 创建可取消的上下文
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 监听中断信号
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
go func() {
<-sigCh
fmt.Println("\n接收到中断信号,停止流式接收...")
cancel()
}()
// 启动流式接收
if err := streamWithContext(ctx, "http://localhost:8080/chat/stream"); err != nil {
fmt.Printf("错误: %v\n", err)
}
}
func streamWithContext(ctx context.Context, url string) error {
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return err
}
req.Header.Set("Accept", "text/event-stream")
client := &http.Client{
Timeout: 0, // 无超时限制
}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
// 创建带缓冲的读取器
reader := bufio.NewReader(resp.Body)
buffer := make([]byte, 1024)
fmt.Println("开始流式接收 (按Ctrl+C停止):")
fmt.Print("> ")
for {
select {
case <-ctx.Done():
fmt.Println("\n用户主动停止")
return nil
default:
// 非阻塞读取
n, err := reader.Read(buffer)
if err != nil {
if err == io.EOF {
fmt.Println("\n流结束")
return nil
}
return err
}
if n > 0 {
data := string(buffer[:n])
processSSEData(data)
}
time.Sleep(50 * time.Millisecond) // 避免CPU占用过高
}
}
}
func processSSEData(data string) {
lines := strings.Split(data, "\n")
for _, line := range lines {
line = strings.TrimSpace(line)
if strings.HasPrefix(line, "data: ") {
content := line[6:]
if content == "[DONE]" {
fmt.Println("\n[传输完成]")
return
}
// 简单的文本输出(可根据需要解析JSON)
fmt.Print(content)
}
}
}4. 使用第三方库简化操作
package main
import (
"fmt"
"github.com/r3labs/sse"
)
func main() {
client := sse.NewClient("http://localhost:8080/chat/stream")
fmt.Println("使用SSE库接收流式响应:")
// 订阅事件
err := client.Subscribe("message", func(msg *sse.Event) {
// 收到消息时立即处理
fmt.Printf("收到消息: %s\n", string(msg.Data))
})
if err != nil {
fmt.Printf("订阅错误: %v\n", err)
}
// 保持运行
select {}
}5. 简单的命令行聊天客户端示例
package main
import (
"bufio"
"encoding/json"
"fmt"
"net/http"
"os"
"strings"
)
func main() {
reader := bufio.NewReader(os.Stdin)
for {
fmt.Print("你: ")
input, _ := reader.ReadString('\n')
input = strings.TrimSpace(input)
if input == "exit" {
break
}
// 发送请求并流式接收回复
streamChatResponse(input)
fmt.Println()
}
}
func streamChatResponse(prompt string) {
url := fmt.Sprintf("http://localhost:8080/chat?message=%s", prompt)
req, _ := http.NewRequest("GET", url, nil)
req.Header.Set("Accept", "text/event-stream")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
fmt.Printf("请求错误: %v\n", err)
return
}
defer resp.Body.Close()
scanner := bufio.NewScanner(resp.Body)
fmt.Print("AI: ")
for scanner.Scan() {
line := scanner.Text()
if strings.HasPrefix(line, "data: ") {
data := line[6:]
if data == "[DONE]" {
break
}
var result map[string]interface{}
if json.Unmarshal([]byte(data), &result) == nil {
if choices, ok := result["choices"].([]interface{}); ok && len(choices) > 0 {
if choice, ok := choices[0].(map[string]interface{}); ok {
if delta, ok := choice["delta"].(map[string]interface{}); ok {
if content, ok := delta["content"].(string); ok {
fmt.Print(content)
}
}
}
}
}
}
}
}关键要点
- 使用
bufio.Scanner或bufio.Reader逐行读取响应 - 识别 SSE 格式:
data:前缀和[DONE]结束标记 - 实时刷新输出:使用
fmt.Print而不是缓冲的打印 - 处理上下文取消:支持用户中断流式接收
- 错误处理:妥善处理网络中断和解析错误
这样就能实现类似 ChatGPT 的流式输出效果,每个数据块到达时立即显示,而不是等待全部完成。
到此这篇关于Golang中SSE协议实现的文章就介绍到这了,更多相关Golang中SSE协议实现内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!


最新评论