Golang SSE 服务器端推送事件

 更新时间:2025年07月24日 09:56:03   作者:Rocc_  
本文主要介绍了Golang SSE 服务器端推送事件,SSE基于HTTP实现,允许服务器单向推送数据,客户端通过EventSource接收,下面就来详细的介绍一下,感兴趣的可以了解一下

写在前面(愚蠢的我犯的错误)

本应该在EventStream的怎么都在响应这里出现

后面通过查找问题知道EventSream有特殊的回复格式为:
data: [返回的内容]\n\n

示例:
data: success\n\n

返回success字符串

原因

我做了一个在线点赞的实时更新的小玩意,我想着实时更新WS全双工用不着。

SSE介绍

SSE(Server-Sent Event)是一种用于客户端与服务器端实时通讯的技术。它允许服务器端发送事件到客户端,客户端可以通过 EventSource 接口来接收这些事件。通常情况下,SSE 是基于 HTTP 协议实现的,它不需要建立和维护长连接,但服务器可以长时间向客户端推送数据,而客户端只需要等待并处理收到的数据即可。

Golang实现方式

SSE核心代码

	//sse Server-Sent-Events 服务事件流
	http.HandleFunc("/sse", func(w http.ResponseWriter, r *http.Request) {

		// 设置响应头,表明这是一个 SSE 连接
		w.Header().Set("Content-Type", "text/event-stream")
		w.Header().Set("Cache-Control", "no-cache")
		w.Header().Set("Connection", "keep-alive")

		//设置为刷新模式
		flush, ok := w.(http.Flusher)
		flush.Flush()

		if !ok {
			//判断是否转换成功,不成功则返回错误信息
			responseInfo(http.StatusOK, "response cannot convert http.Flusher", w)
			return
		}
		
		//这里因为我创建了一个Map用来存储响应IO和Flush刷新,
		//我在其他地方可以使用遍历进行给各个通信端进行发送信息
		respFlushMap[&w] = &flush
		
		select {
		case <-r.Context().Done():
			delete(respFlushMap, &w)
			return
		}
	})

发送事件请求

func main(){
	//....
	//点赞评论
	http.HandleFunc("/favorite", favorite(client))
	//...
}

// 点赞
func favorite(client *redis.Client) func(w http.ResponseWriter, r *http.Request) {
	var lock = sync.RWMutex{}
	return func(w http.ResponseWriter, r *http.Request) {
	
		/* 业务处理逻辑
			......
		*/
		
		//核心代码 将点赞信息发送到各个SSE
		for writer, flusher := range respFlushMap {
			//一定要是这个格式“data: [数据内容]\n\n”不然前端不会体现在ServeEvent中而出现在response中
			fmt.Fprintf(*writer, "data: %s@%s\n\n", member, method)
			(*flusher).Flush()
		}
	}
}

全部代码,包含了一些处理逻辑,可能比较混乱建议还是看看之前的

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"github.com/go-redis/redis/v8"
	"html/template"
	"log"
	"math/rand"
	"net"
	"net/http"
	"strconv"
	"sync"
	"time"
)

var commentNodeHashRedisKey = "commentNodeHashRedisKey"
var commentNodeSorterSetRedisKey = "commentNodeSorterSetRedisKey"

var respFlushMap = make(map[*http.ResponseWriter]*http.Flusher)

type CommentNode struct {
	Content    string  `json:"content"`    //内容
	Score      float64 `json:"score"`      //点赞数
	IP         string  `json:"ip"`         //IP
	NickName   string  `json:"nickName"`   //昵称
	IsFavorite bool    `json:"isFavorite"` //是否点赞
	Member     string  `json:"member"`     //唯一值
}

func main() {

	//静态资源文件
	staticServer := http.FileServer(http.Dir("./template"))

	//处理静态资源文件
	http.Handle("/static/", http.StripPrefix("/static/", staticServer))

	//创建客户端
	client := redis.NewClient(&redis.Options{
		Addr:     "192.168.192.170:6379",
		Password: "",
		DB:       0,
	})

	//判断时候连接成功
	err := client.Ping(context.Background()).Err()
	if err != nil {
		log.Println("连接错误: ", err.Error())
	}
	log.Println("连接成功")

	//添加评论
	http.HandleFunc("/addComment", addComment(client))

	//点赞评论
	http.HandleFunc("/favorite", favorite(client))

	//sse Server-Sent-Events 服务事件流
	http.HandleFunc("/sse", func(w http.ResponseWriter, r *http.Request) {

		// 设置响应头,表明这是一个 SSE 连接
		w.Header().Set("Content-Type", "text/event-stream")
		w.Header().Set("Cache-Control", "no-cache")
		w.Header().Set("Connection", "keep-alive")

		//设置为刷新模式
		flush, ok := w.(http.Flusher)
		flush.Flush()

		if !ok {
			responseInfo(http.StatusOK, "response cannot convert http.Flusher", w)
			return
		}

		respFlushMap[&w] = &flush
		ticker := time.NewTicker(time.Second)
		defer ticker.Stop()

		select {
		case <-r.Context().Done():
			delete(respFlushMap, &w)
			return
		}
	})

	http.HandleFunc("/commentList", commentList(client))

	//主页
	http.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
		//获取模板
		indexFile, err := template.ParseFiles("./template/index.html")
		if err != nil {
			log.Println(err.Error())
			resp.Write([]byte("./template/index.html not found"))
			return
		}
		//将内容输出
		indexFile.Execute(resp, nil)
	})

	//启动服务
	if err := http.ListenAndServe(":80", nil); err != nil {
		log.Println("启动服务失败!" + err.Error())
	}

}

// 添加评论
func addComment(client *redis.Client) func(w http.ResponseWriter, r *http.Request) {
	return func(w http.ResponseWriter, r *http.Request) {

		query := r.URL.Query()
		nickname := query.Get("nickName")
		if nickname == "" {
			nickname = "逸士"
		}

		//判断内容是否为空
		content := query.Get("content")
		if content == "" {
			responseInfo(http.StatusBadRequest, "your comment content is empty", w)
			return
		}
		host, _, _ := net.SplitHostPort(r.RemoteAddr)

		//使用时间戳
		member := fmt.Sprint(time.Now().UnixMilli() ^ rand.Int63())
		//序列化
		comment, _ := json.Marshal(CommentNode{
			Member:   member,
			IP:       host,
			NickName: nickname,
			Content:  content,
		})

		//添加到队列中
		client.HSet(r.Context(), commentNodeHashRedisKey, member, string(comment))

		//更新排行
		client.ZAdd(r.Context(), commentNodeSorterSetRedisKey, &redis.Z{
			Score:  0,
			Member: member,
		})

		responseInfo(http.StatusOK, "add comment success", w)
	}
}

// 点赞
func favorite(client *redis.Client) func(w http.ResponseWriter, r *http.Request) {
	var lock = sync.RWMutex{}
	return func(w http.ResponseWriter, r *http.Request) {
		lock.Lock()
		defer lock.Unlock()

		//查询成员(member)是否存在
		query := r.URL.Query()
		member := query.Get("member")
		method := r.Method
		if member == "" {
			responseInfo(http.StatusBadRequest, "member cannot be empty", w)
			lock.Unlock()
			return
		}

		//获取分数
		score, err := client.ZScore(r.Context(), commentNodeSorterSetRedisKey, member).Result()
		//点赞减少
		if method == http.MethodDelete {
			score -= 2
		}
		score++

		//更新排行
		client.ZAdd(r.Context(), commentNodeSorterSetRedisKey, &redis.Z{
			Score:  score,
			Member: member,
		})

		if err != nil {
			//不存在返回错误
			responseInfo(http.StatusBadRequest, "member does not exists", w)
			return
		}

		//更新分数
		var commentNode CommentNode
		commentNodeStr, err := client.HGet(r.Context(), commentNodeHashRedisKey, member).Result()
		if err != nil {
			responseInfo(http.StatusBadRequest, "update hash scope error: "+err.Error(), w)
			return
		}
		err = json.Unmarshal([]byte(commentNodeStr), &commentNode)
		if err != nil {
			responseInfo(http.StatusBadRequest, "update hash scope error: "+err.Error(), w)
			return
		}

		commentNode.Score = score
		data, _ := json.Marshal(commentNode)
		if err = client.HSet(r.Context(), commentNodeHashRedisKey, member, data).Err(); err != nil {
			responseInfo(http.StatusBadRequest, "update hash scope error: "+err.Error(), w)
			return
		}

		//返回成功
		responseInfo(http.StatusOK, "favorite comment success", w)

		//将点赞信息发送到各个SSE
		for writer, flusher := range respFlushMap {
			fmt.Fprintf(*writer, "data: %s@%s\n\n", member, method)
			(*flusher).Flush()
		}
	}
}

// 评论列表
func commentList(client *redis.Client) func(resp http.ResponseWriter, req *http.Request) {
	return func(resp http.ResponseWriter, req *http.Request) {

		query := req.URL.Query()
		offset, err := strconv.Atoi(query.Get("offset"))
		if err != nil {
			offset = 100
		}

		//连接人地址
		connectionAddr := req.RemoteAddr
		log.Printf("连接人地址: %s\n", connectionAddr)

		//获取offset偏移量的排行
		result, err := client.ZRevRangeWithScores(req.Context(), commentNodeSorterSetRedisKey, 0, int64(offset-1)).Result()

		if err != nil || result == nil {
			responseInfo(http.StatusOK, fmt.Sprint("错误:", err), resp)
			return
		}

		//获取评论详细信息
		members := make([]string, 0)
		scopeMap := make(map[string]float64)
		for _, item := range result {
			members = append(members, item.Member.(string))
			scopeMap[item.Member.(string)] = item.Score
		}
		rlt, err := client.HMGet(req.Context(), commentNodeHashRedisKey, members...).Result()
		if err != nil {
			responseInfo(http.StatusInternalServerError, err.Error(), resp)
			return
		}
		data, _ := json.Marshal(rlt)

		responseInfo(http.StatusOK, string(data), resp)
	}
}

func responseInfo(code int, info string, w http.ResponseWriter) {
	w.WriteHeader(code)
	w.Write([]byte(info))
}

到此这篇关于Golang SSE 服务器端推送事件的文章就介绍到这了,更多相关Golang SSE推送内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • go语言变量定义用法实例

    go语言变量定义用法实例

    这篇文章主要介绍了go语言变量定义用法,实例分析了go语言变量的定义及使用技巧,具有一定参考借鉴价值,需要的朋友可以参考下
    2015-02-02
  • 详解Golang利用反射reflect动态调用方法

    详解Golang利用反射reflect动态调用方法

    这篇文章主要介绍了详解Golang利用反射reflect动态调用方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2018-11-11
  • go格式“占位符”输入输出 类似python的input

    go格式“占位符”输入输出 类似python的input

    这篇文章主要介绍了go格式“占位符”, 输入输出,类似python的input,本文给大家介绍的非常详细,具有一定的参考借鉴价值,需要的朋友可以参考下
    2019-04-04
  • 浅谈用Go构建不可变的数据结构的方法

    浅谈用Go构建不可变的数据结构的方法

    这篇文章主要介绍了用Go构建不可变的数据结构的方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-09-09
  • 一步步教你在Linux上安装Go语言环境

    一步步教你在Linux上安装Go语言环境

    本文将介绍如何在Linux操作系统下搭建Go语言环境,Go语言是一种开源的编程语言,具有高效、简洁和并发性强的特点,适用于开发各种类型的应用程序,搭建Go语言环境是开始学习和开发Go语言项目的第一步,本文将详细介绍安装Go语言、配置环境变量以及验证安装是否成功的步骤
    2023-10-10
  • golang JSON序列化和反序列化示例详解

    golang JSON序列化和反序列化示例详解

    通过使用Go语言的encoding/json包,你可以轻松地处理JSON数据,无论是在客户端应用、服务器端应用还是其他类型的Go程序中,这篇文章主要介绍了golang JSON序列化和反序列化,需要的朋友可以参考下
    2024-04-04
  • Golang技巧之重试机制详解

    Golang技巧之重试机制详解

    重试机制是一种在程序执行过程中出现错误后重新尝试执行程序的一种机制,可以减少程序运行过程中出现的错误,从而提高程序的可靠性,本文就来讲讲Golang中是如何实现重试机制的吧
    2023-05-05
  • Go语言实现一个Http Server框架(二) Server的抽象

    Go语言实现一个Http Server框架(二) Server的抽象

    上一篇文章对http库的基本使用做了说明,这篇文章主要介绍了如何实现一个简单地httpServer,文中代码示例非常详细,感兴趣的朋友可以参考下
    2023-04-04
  • Go语言实现类似c++中的多态功能实例

    Go语言实现类似c++中的多态功能实例

    Go本身不具有多态的特性,不能够像Java、C++那样编写多态类、多态方法。但是,使用Go可以编写具有多态功能的类绑定的方法。下面来一起看看吧
    2016-09-09
  • 详解golang中Context超时控制与原理

    详解golang中Context超时控制与原理

    Context本身的含义是上下文,我们可以理解为它内部携带了超时信息、退出信号,以及其他一些上下文相关的值,本文给大家详细介绍了golang中Context超时控制与原理,文中有相关的代码示例供大家参考,需要的朋友可以参考下
    2024-01-01

最新评论