Golang SSE 服务器端推送事件

 更新时间:2025年07月24日 09:56:03   作者:Rocc_  
本文主要介绍了Golang SSE 服务器端推送事件,SSE基于HTTP实现,允许服务器单向推送数据,客户端通过EventSource接收,下面就来详细的介绍一下,感兴趣的可以了解一下

写在前面(愚蠢的我犯的错误)

本应该在EventStream的怎么都在响应这里出现

后面通过查找问题知道EventSream有特殊的回复格式为:
data: [返回的内容]\n\n

示例:
data: success\n\n

返回success字符串

原因

我做了一个在线点赞的实时更新的小玩意,我想着实时更新WS全双工用不着。

SSE介绍

SSE(Server-Sent Event)是一种用于客户端与服务器端实时通讯的技术。它允许服务器端发送事件到客户端,客户端可以通过 EventSource 接口来接收这些事件。通常情况下,SSE 是基于 HTTP 协议实现的,它不需要建立和维护长连接,但服务器可以长时间向客户端推送数据,而客户端只需要等待并处理收到的数据即可。

Golang实现方式

SSE核心代码

	//sse Server-Sent-Events 服务事件流
	http.HandleFunc("/sse", func(w http.ResponseWriter, r *http.Request) {

		// 设置响应头,表明这是一个 SSE 连接
		w.Header().Set("Content-Type", "text/event-stream")
		w.Header().Set("Cache-Control", "no-cache")
		w.Header().Set("Connection", "keep-alive")

		//设置为刷新模式
		flush, ok := w.(http.Flusher)
		flush.Flush()

		if !ok {
			//判断是否转换成功,不成功则返回错误信息
			responseInfo(http.StatusOK, "response cannot convert http.Flusher", w)
			return
		}
		
		//这里因为我创建了一个Map用来存储响应IO和Flush刷新,
		//我在其他地方可以使用遍历进行给各个通信端进行发送信息
		respFlushMap[&w] = &flush
		
		select {
		case <-r.Context().Done():
			delete(respFlushMap, &w)
			return
		}
	})

发送事件请求

func main(){
	//....
	//点赞评论
	http.HandleFunc("/favorite", favorite(client))
	//...
}

// 点赞
func favorite(client *redis.Client) func(w http.ResponseWriter, r *http.Request) {
	var lock = sync.RWMutex{}
	return func(w http.ResponseWriter, r *http.Request) {
	
		/* 业务处理逻辑
			......
		*/
		
		//核心代码 将点赞信息发送到各个SSE
		for writer, flusher := range respFlushMap {
			//一定要是这个格式“data: [数据内容]\n\n”不然前端不会体现在ServeEvent中而出现在response中
			fmt.Fprintf(*writer, "data: %s@%s\n\n", member, method)
			(*flusher).Flush()
		}
	}
}

全部代码,包含了一些处理逻辑,可能比较混乱建议还是看看之前的

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"github.com/go-redis/redis/v8"
	"html/template"
	"log"
	"math/rand"
	"net"
	"net/http"
	"strconv"
	"sync"
	"time"
)

var commentNodeHashRedisKey = "commentNodeHashRedisKey"
var commentNodeSorterSetRedisKey = "commentNodeSorterSetRedisKey"

var respFlushMap = make(map[*http.ResponseWriter]*http.Flusher)

type CommentNode struct {
	Content    string  `json:"content"`    //内容
	Score      float64 `json:"score"`      //点赞数
	IP         string  `json:"ip"`         //IP
	NickName   string  `json:"nickName"`   //昵称
	IsFavorite bool    `json:"isFavorite"` //是否点赞
	Member     string  `json:"member"`     //唯一值
}

func main() {

	//静态资源文件
	staticServer := http.FileServer(http.Dir("./template"))

	//处理静态资源文件
	http.Handle("/static/", http.StripPrefix("/static/", staticServer))

	//创建客户端
	client := redis.NewClient(&redis.Options{
		Addr:     "192.168.192.170:6379",
		Password: "",
		DB:       0,
	})

	//判断时候连接成功
	err := client.Ping(context.Background()).Err()
	if err != nil {
		log.Println("连接错误: ", err.Error())
	}
	log.Println("连接成功")

	//添加评论
	http.HandleFunc("/addComment", addComment(client))

	//点赞评论
	http.HandleFunc("/favorite", favorite(client))

	//sse Server-Sent-Events 服务事件流
	http.HandleFunc("/sse", func(w http.ResponseWriter, r *http.Request) {

		// 设置响应头,表明这是一个 SSE 连接
		w.Header().Set("Content-Type", "text/event-stream")
		w.Header().Set("Cache-Control", "no-cache")
		w.Header().Set("Connection", "keep-alive")

		//设置为刷新模式
		flush, ok := w.(http.Flusher)
		flush.Flush()

		if !ok {
			responseInfo(http.StatusOK, "response cannot convert http.Flusher", w)
			return
		}

		respFlushMap[&w] = &flush
		ticker := time.NewTicker(time.Second)
		defer ticker.Stop()

		select {
		case <-r.Context().Done():
			delete(respFlushMap, &w)
			return
		}
	})

	http.HandleFunc("/commentList", commentList(client))

	//主页
	http.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
		//获取模板
		indexFile, err := template.ParseFiles("./template/index.html")
		if err != nil {
			log.Println(err.Error())
			resp.Write([]byte("./template/index.html not found"))
			return
		}
		//将内容输出
		indexFile.Execute(resp, nil)
	})

	//启动服务
	if err := http.ListenAndServe(":80", nil); err != nil {
		log.Println("启动服务失败!" + err.Error())
	}

}

// 添加评论
func addComment(client *redis.Client) func(w http.ResponseWriter, r *http.Request) {
	return func(w http.ResponseWriter, r *http.Request) {

		query := r.URL.Query()
		nickname := query.Get("nickName")
		if nickname == "" {
			nickname = "逸士"
		}

		//判断内容是否为空
		content := query.Get("content")
		if content == "" {
			responseInfo(http.StatusBadRequest, "your comment content is empty", w)
			return
		}
		host, _, _ := net.SplitHostPort(r.RemoteAddr)

		//使用时间戳
		member := fmt.Sprint(time.Now().UnixMilli() ^ rand.Int63())
		//序列化
		comment, _ := json.Marshal(CommentNode{
			Member:   member,
			IP:       host,
			NickName: nickname,
			Content:  content,
		})

		//添加到队列中
		client.HSet(r.Context(), commentNodeHashRedisKey, member, string(comment))

		//更新排行
		client.ZAdd(r.Context(), commentNodeSorterSetRedisKey, &redis.Z{
			Score:  0,
			Member: member,
		})

		responseInfo(http.StatusOK, "add comment success", w)
	}
}

// 点赞
func favorite(client *redis.Client) func(w http.ResponseWriter, r *http.Request) {
	var lock = sync.RWMutex{}
	return func(w http.ResponseWriter, r *http.Request) {
		lock.Lock()
		defer lock.Unlock()

		//查询成员(member)是否存在
		query := r.URL.Query()
		member := query.Get("member")
		method := r.Method
		if member == "" {
			responseInfo(http.StatusBadRequest, "member cannot be empty", w)
			lock.Unlock()
			return
		}

		//获取分数
		score, err := client.ZScore(r.Context(), commentNodeSorterSetRedisKey, member).Result()
		//点赞减少
		if method == http.MethodDelete {
			score -= 2
		}
		score++

		//更新排行
		client.ZAdd(r.Context(), commentNodeSorterSetRedisKey, &redis.Z{
			Score:  score,
			Member: member,
		})

		if err != nil {
			//不存在返回错误
			responseInfo(http.StatusBadRequest, "member does not exists", w)
			return
		}

		//更新分数
		var commentNode CommentNode
		commentNodeStr, err := client.HGet(r.Context(), commentNodeHashRedisKey, member).Result()
		if err != nil {
			responseInfo(http.StatusBadRequest, "update hash scope error: "+err.Error(), w)
			return
		}
		err = json.Unmarshal([]byte(commentNodeStr), &commentNode)
		if err != nil {
			responseInfo(http.StatusBadRequest, "update hash scope error: "+err.Error(), w)
			return
		}

		commentNode.Score = score
		data, _ := json.Marshal(commentNode)
		if err = client.HSet(r.Context(), commentNodeHashRedisKey, member, data).Err(); err != nil {
			responseInfo(http.StatusBadRequest, "update hash scope error: "+err.Error(), w)
			return
		}

		//返回成功
		responseInfo(http.StatusOK, "favorite comment success", w)

		//将点赞信息发送到各个SSE
		for writer, flusher := range respFlushMap {
			fmt.Fprintf(*writer, "data: %s@%s\n\n", member, method)
			(*flusher).Flush()
		}
	}
}

// 评论列表
func commentList(client *redis.Client) func(resp http.ResponseWriter, req *http.Request) {
	return func(resp http.ResponseWriter, req *http.Request) {

		query := req.URL.Query()
		offset, err := strconv.Atoi(query.Get("offset"))
		if err != nil {
			offset = 100
		}

		//连接人地址
		connectionAddr := req.RemoteAddr
		log.Printf("连接人地址: %s\n", connectionAddr)

		//获取offset偏移量的排行
		result, err := client.ZRevRangeWithScores(req.Context(), commentNodeSorterSetRedisKey, 0, int64(offset-1)).Result()

		if err != nil || result == nil {
			responseInfo(http.StatusOK, fmt.Sprint("错误:", err), resp)
			return
		}

		//获取评论详细信息
		members := make([]string, 0)
		scopeMap := make(map[string]float64)
		for _, item := range result {
			members = append(members, item.Member.(string))
			scopeMap[item.Member.(string)] = item.Score
		}
		rlt, err := client.HMGet(req.Context(), commentNodeHashRedisKey, members...).Result()
		if err != nil {
			responseInfo(http.StatusInternalServerError, err.Error(), resp)
			return
		}
		data, _ := json.Marshal(rlt)

		responseInfo(http.StatusOK, string(data), resp)
	}
}

func responseInfo(code int, info string, w http.ResponseWriter) {
	w.WriteHeader(code)
	w.Write([]byte(info))
}

到此这篇关于Golang SSE 服务器端推送事件的文章就介绍到这了,更多相关Golang SSE推送内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • go语言Pflag Viper Cobra 核心功能使用介绍

    go语言Pflag Viper Cobra 核心功能使用介绍

    这篇文章主要为大家介绍了go语言Pflag Viper Cobra 核心功能使用介绍,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-09-09
  • Golang RPC的原理与简单调用详解

    Golang RPC的原理与简单调用详解

    RPC(Remote Procedure Call),主要是帮助我们屏蔽网络编程细节 ,使我们更专注于业务逻辑,所以本文主要来和大家聊聊RPC的原理与简单调用,希望对大家有所帮助
    2023-05-05
  • Go语言实现的可读性更高的并发神库详解

    Go语言实现的可读性更高的并发神库详解

    这篇文章主要为大家介绍了Go语言实现的可读性更高的并发神库详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-01-01
  • go语言中函数与方法介绍

    go语言中函数与方法介绍

    这篇文章介绍了go语言中的函数与方法,文中通过示例代码介绍的非常详细。对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2022-07-07
  • go语言中的json与map相互转换实现

    go语言中的json与map相互转换实现

    本文主要介绍了go语言中的json与map相互转换实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-08-08
  • golang中日期操作之日期格式化及日期转换

    golang中日期操作之日期格式化及日期转换

    在编程中,程序员会经常使用到日期相关操作,下面这篇文章主要给大家介绍了关于golang中日期操作之日期格式化及日期转换的相关资料,文中通过实例代码介绍的非常详细,需要的朋友可以参考下
    2022-11-11
  • 详解golang各种类型是如何进行比较的

    详解golang各种类型是如何进行比较的

    在日常开发中,比较操作是最常用的基本操作之一,可以用来判断变量之间是否相等或者对应的大小关系,比较操作对于排序、查找和集合数据结构的实现至关重要,本文将深入解析golang各种类型是如何进行比较的,需要的朋友可以参考下
    2024-01-01
  • Go语言题解LeetCode463岛屿的周长示例详解

    Go语言题解LeetCode463岛屿的周长示例详解

    这篇文章主要为大家介绍了Go语言题解LeetCode463岛屿的周长示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-12-12
  • Golang使用Apache PLC4X连接modbus的示例代码

    Golang使用Apache PLC4X连接modbus的示例代码

    Modbus是一种串行通信协议,是Modicon公司于1979年为使用可编程逻辑控制器(PLC)通信而发表,这篇文章主要介绍了Golang使用Apache PLC4X连接modbus的示例代码,需要的朋友可以参考下
    2024-07-07
  • Go语言学习教程之指针的示例详解

    Go语言学习教程之指针的示例详解

    这篇文章主要通过简单的练习来让大家对Go语言中的指针有所了解,文中的示例代码讲解详细,对我们学习Go语言有一定帮助,需要的可以参考一下
    2022-09-09

最新评论