Golang中SSE协议实现方法

 更新时间:2026年01月13日 08:55:26   作者:shuix1ng  
Gin框架通过Server-SentEvents(SSE)实现流式返回,关键步骤包括设置正确的响应头、使用Flusher确保数据立即发送、处理客户端中断和错误,本文给大家介绍Golang中SSE协议实现方法,感兴趣的朋友跟随小编一起看看吧

Gin流式返回实现方法

🧑‍💻 User

gin框架如何做到流式返回

🤖 Assistant

在 Gin 框架中实现流式返回主要通过以下几个关键技术点:

1. 使用 Server-Sent Events (SSE)

SSE 是实现流式返回的常用方式:

package main
import (
    "fmt"
    "net/http"
    "time"
    "github.com/gin-gonic/gin"
)
func main() {
    r := gin.Default()
    r.GET("/stream", func(c *gin.Context) {
        // 设置响应头
        c.Header("Content-Type", "text/event-stream")
        c.Header("Cache-Control", "no-cache")
        c.Header("Connection", "keep-alive")
        c.Header("Access-Control-Allow-Origin", "*")
        // 创建通道用于发送数据
        messageChan := make(chan string)
        // 模拟数据生成
        go func() {
            defer close(messageChan)
            for i := 0; i < 10; i++ {
                message := fmt.Sprintf("消息 %d: 当前时间 %v", i, time.Now().Format("15:04:05"))
                messageChan <- message
                time.Sleep(1 * time.Second)
            }
        }()
        // 流式返回数据
        c.Stream(func(w io.Writer) bool {
            if message, ok := <-messageChan; ok {
                // SSE 格式: data: {message}\n\n
                c.SSEvent("message", message)
                return true
            }
            return false
        })
    })
    r.Run(":8080")
}

2. 手动控制响应流

更底层的方式是直接操作 ResponseWriter:

r.GET("/stream-data", func(c *gin.Context) {
    c.Header("Content-Type", "text/plain")
    c.Header("Transfer-Encoding", "chunked")
    flusher, ok := c.Writer.(http.Flusher)
    if !ok {
        c.String(http.StatusInternalServerError, "Streaming not supported")
        return
    }
    for i := 0; i < 10; i++ {
        fmt.Fprintf(c.Writer, "数据块 %d\n", i)
        flusher.Flush() // 立即发送到客户端
        time.Sleep(1 * time.Second)
    }
})

3. 结合上下文取消机制

处理客户端中断连接的情况:

r.GET("/stream-with-context", func(c *gin.Context) {
    c.Header("Content-Type", "text/event-stream")
    c.Header("Cache-Control", "no-cache")
    c.Header("Connection", "keep-alive")
    flusher, _ := c.Writer.(http.Flusher)
    ticker := time.NewTicker(1 * time.Second)
    defer ticker.Stop()
    for {
        select {
        case <-c.Request.Context().Done():
            // 客户端断开连接
            fmt.Println("客户端断开连接")
            return
        case t := <-ticker.C:
            data := fmt.Sprintf("时间: %s", t.Format("15:04:05"))
            c.SSEvent("update", data)
            flusher.Flush()
        }
    }
})

4. JSON 流式返回

对于结构化数据的流式返回:

r.GET("/stream-json", func(c *gin.Context) {
    c.Header("Content-Type", "application/x-ndjson") // 换行分隔的JSON
    flusher, _ := c.Writer.(http.Flusher)
    for i := 0; i < 5; i++ {
        data := gin.H{
            "id":   i,
            "time": time.Now().Unix(),
            "data": fmt.Sprintf("项目 %d", i),
        }
        c.Render(-1, gin.JSON{Data: data})
        fmt.Fprintln(c.Writer) // 添加换行符
        flusher.Flush()
        time.Sleep(1 * time.Second)
    }
})

5. 文件流式下载

大文件的流式下载:

r.GET("/download-large-file", func(c *gin.Context) {
    filePath := "./large-file.txt"
    c.Header("Content-Type", "application/octet-stream")
    c.Header("Content-Disposition", "attachment; filename=large-file.txt")
    file, err := os.Open(filePath)
    if err != nil {
        c.String(http.StatusNotFound, "文件不存在")
        return
    }
    defer file.Close()
    // 获取文件信息
    fileInfo, _ := file.Stat()
    c.Header("Content-Length", fmt.Sprintf("%d", fileInfo.Size()))
    // 流式传输文件内容
    io.Copy(c.Writer, file)
})

6. 前端示例

对应的前端代码:

<!DOCTYPE html>
<html>
<head>
    <title>SSE 示例</title>
</head>
<body>
    <div id="messages"></div>
    <script>
        const eventSource = new EventSource('/stream');
        eventSource.onmessage = function(event) {
            const messageDiv = document.createElement('div');
            messageDiv.textContent = event.data;
            document.getElementById('messages').appendChild(messageDiv);
        };
        eventSource.onerror = function(event) {
            console.error('SSE 错误:', event);
        };
    </script>
</body>
</html>

关键要点

  • 设置正确的响应头text/event-streamno-cachekeep-alive
  • 使用 Flusher:确保数据立即发送到客户端
  • 处理连接中断:通过上下文监听客户端断开
  • 错误处理:妥善处理流式传输中的错误
  • 资源清理:确保 goroutine 和连接正确关闭

这种方式特别适合实时数据推送、大文件下载、长时间运行的任务进度更新等场景。

🧑‍💻 User

这个服务是如何返回的:

data: {"id":"019b84c169efb22b959b99a7914e4712","object":"chat.completion.chunk","created":1767458826,"model":"deepseek-ai/DeepSeek-V3.2","choices":[{"index":0,"delta":{"content":"","reasoning_content":null,"role":"assistant"},"finish_reason":"stop"}],"system_fingerprint":"","usage":{"prompt_tokens":23,"completion_tokens":19,"total_tokens":42,"completion_tokens_details":{"reasoning_tokens":0}}}

data: [DONE]

🤖 Assistant

这是一个典型的 Server-Sent Events (SSE) 流式响应格式。让我详细解析这种返回方式的工作原理:

响应格式解析

// 每条数据都以 "data: " 开头,后跟 JSON 数据
data: {"id":"019b84c169efb22b959b99a7914e4712","object":"chat.completion.chunk","created":1767458826,"model":"deepseek-ai/DeepSeek-V3.2","choices":[{"index":0,"delta":{"content":"服务。","reasoning_content":null,"role":"assistant"},"finish_reason":null}],"usage":{...}}
// 最后以 [DONE] 结束
data: [DONE]

Gin 框架实现代码示例

package main
import (
    "encoding/json"
    "fmt"
    "net/http"
    "time"
    "github.com/gin-gonic/gin"
)
func main() {
    r := gin.Default()
    r.GET("/chat/stream", func(c *gin.Context) {
        // 设置 SSE 响应头
        c.Header("Content-Type", "text/event-stream")
        c.Header("Cache-Control", "no-cache")
        c.Header("Connection", "keep-alive")
        c.Header("Access-Control-Allow-Origin", "*")
        // 检查是否支持流式传输
        flusher, ok := c.Writer.(http.Flusher)
        if !ok {
            c.JSON(http.StatusInternalServerError, gin.H{"error": "Streaming not supported"})
            return
        }
        // 模拟 AI 对话的逐词返回
        messages := []string{"服务。", "请问有什么", "可以帮", "您的吗", "?那我", "先退", "下了。"}
        for i, message := range messages {
            // 构建响应数据
            response := gin.H{
                "id":      "019b84c169efb22b959b99a7914e4712",
                "object":  "chat.completion.chunk",
                "created": time.Now().Unix(),
                "model":   "deepseek-ai/DeepSeek-V3.2",
                "choices": []gin.H{
                    {
                        "index": 0,
                        "delta": gin.H{
                            "content":         message,
                            "reasoning_content": nil,
                            "role":           "assistant",
                        },
                        "finish_reason": nil,
                    },
                },
                "usage": gin.H{
                    "prompt_tokens":     23,
                    "completion_tokens": 5 + i*2, // 模拟token计数
                    "total_tokens":      28 + i*2,
                    "completion_tokens_details": gin.H{
                        "reasoning_tokens": 0,
                    },
                },
            }
            // 转换为 JSON
            jsonData, _ := json.Marshal(response)
            // 按照 SSE 格式写入:data: {json}\n\n
            fmt.Fprintf(c.Writer, "data: %s\n\n", string(jsonData))
            flusher.Flush() // 立即刷新到客户端
            time.Sleep(200 * time.Millisecond) // 模拟处理延迟
        }
        // 发送结束标志
        fmt.Fprintf(c.Writer, "data: [DONE]\n\n")
        flusher.Flush()
    })
    r.Run(":8080")
}

更完整的实现(包含错误处理)

func streamingChatHandler(c *gin.Context) {
    // SSE 头设置
    c.Header("Content-Type", "text/event-stream")
    c.Header("Cache-Control", "no-cache")
    c.Header("Connection", "keep-alive")
    c.Header("Access-Control-Allow-Origin", "*")
    flusher, ok := c.Writer.(http.Flusher)
    if !ok {
        c.JSON(http.StatusInternalServerError, gin.H{"error": "Streaming not supported"})
        return
    }
    // 监听客户端断开连接
    ctx := c.Request.Context()
    // 使用 Gin 的 Stream 方法(推荐)
    c.Stream(func(w io.Writer) bool {
        select {
        case <-ctx.Done():
            return false // 客户端断开,停止流式传输
        default:
            // 生成聊天数据
            chunk := generateChatChunk()
            if chunk == nil {
                // 发送结束信号
                c.SSEvent("", "[DONE]")
                return false
            }
            // 发送数据块
            c.SSEvent("", chunk)
            return true
        }
    })
}
func generateChatChunk() interface{} {
    // 这里可以连接真实的 AI 模型
    // 返回数据块结构
    return map[string]interface{}{
        "id":      generateID(),
        "object":  "chat.completion.chunk",
        "choices": []map[string]interface{}{
            {
                "delta": map[string]interface{}{
                    "content": getNextWord(),
                },
            },
        },
    }
}

前端接收示例

<!DOCTYPE html>
<html>
<head>
    <title>SSE Chat</title>
</head>
<body>
    <div id="chat-output"></div>
    <script>
        const chatOutput = document.getElementById('chat-output');
        const eventSource = new EventSource('/chat/stream');
        eventSource.onmessage = function(event) {
            if (event.data === '[DONE]') {
                eventSource.close();
                return;
            }
            try {
                const data = JSON.parse(event.data);
                if (data.choices && data.choices[0].delta.content) {
                    chatOutput.innerHTML += data.choices[0].delta.content;
                }
            } catch (error) {
                console.error('解析错误:', error);
            }
        };
        eventSource.onerror = function(event) {
            console.error('SSE 错误:', event);
            eventSource.close();
        };
    </script>
</body>
</html>

关键技术点

  • SSE 格式:每条消息以 data: 开头,以 \n\n 结束
  • 流式刷新:使用 Flusher 立即发送数据到客户端
  • 连接管理:监听上下文取消以处理客户端断开
  • 数据分块:将完整响应拆分为多个小块逐步发送
  • 结束标志:使用 [DONE] 标记流式传输结束

这种模式广泛应用于 ChatGPT、Claude 等 AI 服务的流式对话功能。

🧑‍💻 User

golang发起请求是流式输出的时候可不可以支持流式print出来,而不是等待全部结束才输出

🤖 Assistant

当然可以!Golang 发起请求时也可以实现流式输出,而不是等待全部结束才输出。以下是几种实现方式:

1. 使用 HTTP 标准库实现流式读取

package main
import (
    "bufio"
    "fmt"
    "io"
    "net/http"
    "time"
)
func main() {
    // 创建请求
    req, err := http.NewRequest("GET", "http://localhost:8080/chat/stream", nil)
    if err != nil {
        panic(err)
    }
    // 设置接受流式响应
    req.Header.Set("Accept", "text/event-stream")
    req.Header.Set("Cache-Control", "no-cache")
    // 发送请求
    client := &http.Client{}
    resp, err := client.Do(req)
    if err != nil {
        panic(err)
    }
    defer resp.Body.Close()
    // 创建读取器
    reader := bufio.NewReader(resp.Body)
    fmt.Println("开始接收流式响应:")
    for {
        // 逐行读取
        line, err := reader.ReadBytes('\n')
        if err != nil {
            if err == io.EOF {
                break
            }
            fmt.Printf("读取错误: %v\n", err)
            break
        }
        // 处理 SSE 格式的数据行
        lineStr := string(line)
        if len(lineStr) > 6 && lineStr[:6] == "data: " {
            data := lineStr[6:]
            if data == "[DONE]\n" {
                fmt.Println("\n流式传输结束")
                break
            }
            // 这里可以解析 JSON 并提取内容
            fmt.Printf("收到数据块: %s", data)
        }
    }
}

2. 完整的 SSE 客户端实现

package main
import (
    "bufio"
    "encoding/json"
    "fmt"
    "io"
    "net/http"
    "strings"
)
type StreamResponse struct {
    ID      string `json:"id"`
    Object  string `json:"object"`
    Choices []struct {
        Delta struct {
            Content string `json:"content"`
        } `json:"delta"`
    } `json:"choices"`
}
func main() {
    streamSSE("http://localhost:8080/chat/stream")
}
func streamSSE(url string) {
    req, _ := http.NewRequest("GET", url, nil)
    req.Header.Set("Accept", "text/event-stream")
    client := &http.Client{}
    resp, err := client.Do(req)
    if err != nil {
        fmt.Printf("请求错误: %v\n", err)
        return
    }
    defer resp.Body.Close()
    if resp.StatusCode != http.StatusOK {
        fmt.Printf("HTTP错误: %s\n", resp.Status)
        return
    }
    scanner := bufio.NewScanner(resp.Body)
    var fullResponse strings.Builder
    fmt.Print("AI回复: ")
    for scanner.Scan() {
        line := scanner.Text()
        if line == "" {
            continue // 跳过空行
        }
        if strings.HasPrefix(line, "data: ") {
            data := line[6:]
            if data == "[DONE]" {
                fmt.Println("\n\n=== 流式传输完成 ===")
                break
            }
            // 解析 JSON
            var chunk StreamResponse
            if err := json.Unmarshal([]byte(data), &chunk); err != nil {
                fmt.Printf("JSON解析错误: %v\n", err)
                continue
            }
            // 输出内容
            if len(chunk.Choices) > 0 && chunk.Choices[0].Delta.Content != "" {
                content := chunk.Choices[0].Delta.Content
                fmt.Print(content)
                fullResponse.WriteString(content)
            }
        }
    }
    if err := scanner.Err(); err != nil {
        fmt.Printf("读取流错误: %v\n", err)
    }
    fmt.Printf("\n完整回复: %s\n", fullResponse.String())
}

3. 使用 Context 支持取消操作

package main
import (
    "bufio"
    "context"
    "encoding/json"
    "fmt"
    "net/http"
    "os"
    "os/signal"
    "strings"
    "syscall"
    "time"
)
func main() {
    // 创建可取消的上下文
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    // 监听中断信号
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
    go func() {
        <-sigCh
        fmt.Println("\n接收到中断信号,停止流式接收...")
        cancel()
    }()
    // 启动流式接收
    if err := streamWithContext(ctx, "http://localhost:8080/chat/stream"); err != nil {
        fmt.Printf("错误: %v\n", err)
    }
}
func streamWithContext(ctx context.Context, url string) error {
    req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
    if err != nil {
        return err
    }
    req.Header.Set("Accept", "text/event-stream")
    client := &http.Client{
        Timeout: 0, // 无超时限制
    }
    resp, err := client.Do(req)
    if err != nil {
        return err
    }
    defer resp.Body.Close()
    // 创建带缓冲的读取器
    reader := bufio.NewReader(resp.Body)
    buffer := make([]byte, 1024)
    fmt.Println("开始流式接收 (按Ctrl+C停止):")
    fmt.Print("> ")
    for {
        select {
        case <-ctx.Done():
            fmt.Println("\n用户主动停止")
            return nil
        default:
            // 非阻塞读取
            n, err := reader.Read(buffer)
            if err != nil {
                if err == io.EOF {
                    fmt.Println("\n流结束")
                    return nil
                }
                return err
            }
            if n > 0 {
                data := string(buffer[:n])
                processSSEData(data)
            }
            time.Sleep(50 * time.Millisecond) // 避免CPU占用过高
        }
    }
}
func processSSEData(data string) {
    lines := strings.Split(data, "\n")
    for _, line := range lines {
        line = strings.TrimSpace(line)
        if strings.HasPrefix(line, "data: ") {
            content := line[6:]
            if content == "[DONE]" {
                fmt.Println("\n[传输完成]")
                return
            }
            // 简单的文本输出(可根据需要解析JSON)
            fmt.Print(content)
        }
    }
}

4. 使用第三方库简化操作

package main
import (
    "fmt"
    "github.com/r3labs/sse"
)
func main() {
    client := sse.NewClient("http://localhost:8080/chat/stream")
    fmt.Println("使用SSE库接收流式响应:")
    // 订阅事件
    err := client.Subscribe("message", func(msg *sse.Event) {
        // 收到消息时立即处理
        fmt.Printf("收到消息: %s\n", string(msg.Data))
    })
    if err != nil {
        fmt.Printf("订阅错误: %v\n", err)
    }
    // 保持运行
    select {}
}

5. 简单的命令行聊天客户端示例

package main
import (
    "bufio"
    "encoding/json"
    "fmt"
    "net/http"
    "os"
    "strings"
)
func main() {
    reader := bufio.NewReader(os.Stdin)
    for {
        fmt.Print("你: ")
        input, _ := reader.ReadString('\n')
        input = strings.TrimSpace(input)
        if input == "exit" {
            break
        }
        // 发送请求并流式接收回复
        streamChatResponse(input)
        fmt.Println()
    }
}
func streamChatResponse(prompt string) {
    url := fmt.Sprintf("http://localhost:8080/chat?message=%s", prompt)
    req, _ := http.NewRequest("GET", url, nil)
    req.Header.Set("Accept", "text/event-stream")
    client := &http.Client{}
    resp, err := client.Do(req)
    if err != nil {
        fmt.Printf("请求错误: %v\n", err)
        return
    }
    defer resp.Body.Close()
    scanner := bufio.NewScanner(resp.Body)
    fmt.Print("AI: ")
    for scanner.Scan() {
        line := scanner.Text()
        if strings.HasPrefix(line, "data: ") {
            data := line[6:]
            if data == "[DONE]" {
                break
            }
            var result map[string]interface{}
            if json.Unmarshal([]byte(data), &result) == nil {
                if choices, ok := result["choices"].([]interface{}); ok && len(choices) > 0 {
                    if choice, ok := choices[0].(map[string]interface{}); ok {
                        if delta, ok := choice["delta"].(map[string]interface{}); ok {
                            if content, ok := delta["content"].(string); ok {
                                fmt.Print(content)
                            }
                        }
                    }
                }
            }
        }
    }
}

关键要点

  • 使用 bufio.Scanner 或 bufio.Reader 逐行读取响应
  • 识别 SSE 格式data: 前缀和 [DONE] 结束标记
  • 实时刷新输出:使用 fmt.Print 而不是缓冲的打印
  • 处理上下文取消:支持用户中断流式接收
  • 错误处理:妥善处理网络中断和解析错误

这样就能实现类似 ChatGPT 的流式输出效果,每个数据块到达时立即显示,而不是等待全部完成。

到此这篇关于Golang中SSE协议实现的文章就介绍到这了,更多相关Golang中SSE协议实现内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 使用gin框架搭建简易服务的实现方法

    使用gin框架搭建简易服务的实现方法

    go语言web框架挺多的,本文就介绍了一下如何使用gin框架搭建简易服务的实现方法,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-12-12
  • Golang 内存模型详解(一)

    Golang 内存模型详解(一)

    这篇文章主要介绍了Golang 内存模型详解(一),本文讲解了Go内存模型interface、,需要的朋友可以参考下
    2014-10-10
  • Golang errors包快速上手

    Golang errors包快速上手

    errors 包是用于处理错误的标准库, errors 包提供的功能比较简单,使用起来非常方便,下面就来介绍一下,感兴趣的可以了解一下
    2025-05-05
  • GO语言中的常量

    GO语言中的常量

    go语言支持的常量有字符型,字符串型,布尔型和数字型。本文实例讲述了Go语言中常量定义方法。分享给大家供大家参考。
    2015-04-04
  • Golang中web参数校验的实现

    Golang中web参数校验的实现

    本文介绍了使用Gin框架进行参数校验的几种方法,包括JSON、URL查询、表单数据的校验,常用校验规则,自定义错误信息和自定义校验规则,具有一定的参考价值,感兴趣的可以了解一下
    2025-11-11
  • go语言使用scp的方法实例分析

    go语言使用scp的方法实例分析

    这篇文章主要介绍了go语言使用scp的方法,实例分析了go语言调用scp命令的使用技巧,具有一定参考借鉴价值,需要的朋友可以参考下
    2015-03-03
  • Go语言Telnet回音服务器的实现

    Go语言Telnet回音服务器的实现

    这篇文章主要介绍了Go语言Telnet回音服务器的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-01-01
  • Go interface{} 转切片类型的实现方法

    Go interface{} 转切片类型的实现方法

    本文主要介绍了Go interface{} 转切片类型的实现方法,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-02-02
  • Go 如何使用原始套接字捕获网卡流量

    Go 如何使用原始套接字捕获网卡流量

    为了减少对环境的依赖可以使用原始套接字捕获网卡流量,然后使用 gopacket 的协议解析功能,这样就省去了解析这部分的工作量,正确性也可以得到保证,同时 CGO 也可以关闭,这篇文章主要介绍了Go 使用原始套接字捕获网卡流量,需要的朋友可以参考下
    2024-07-07
  • 一文详解Golang的中间件设计模式

    一文详解Golang的中间件设计模式

    最近在看一些rpc框架的使用原理和源码的时候,对中间件的实现非常感兴趣,所以这篇文章就来和大家聊聊Golang的中间件设计模式,希望对大家有所帮助
    2023-03-03

最新评论