Springboot使用SSE推送消息到客户端的实现

 更新时间:2026年01月23日 08:25:11   作者:爱吃的强哥  
文章浏览阅读246次,点赞4次,收藏4次,本文介绍了在SpringBoot中实现SSE(Server-Sent Events)服务端推送功能,用于Electron桌面应用的消息推送,感兴趣的可以了解一下

1、场景:

服务端主动推动消息到客户端(Electron 桌面应用)

普通的HTTP/HTTPS请求要先客户端发送请求,然后服务端才能返回结果

服务端主动推送数据到客户端,有两种方案:

SSE:只能从服务端主动推送数据到客户端(单向),客户端发送数据还是要使用HTTP/HTTPS请求

Socket 通信:客户端和服务端都可以发送数据给对方(双向)

先记录SSE的配置(我的服务端之前是用nodejs express 写的,已经使用 socket.io与客户端适配好了,现在用Springboot重构)

2、实现:

1、springboot 配置:

1、先实现Server层,方便其他控制器调用

package com.xxx.controller;
 
import com.qiang.service.SseService;
import com.qiang.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
 
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
 
@RestController
@RequestMapping("/sse")
public class SseController {
 
    // 注入 SSE 服务类
    @Autowired
    private SseService sseService;
    @Autowired
    private UserService userService;
 
    /**
     * 建立 SSE 连接(调用 Service 的注册方法)
     */
    @GetMapping("/connect")
    public SseEmitter connect(@RequestParam String clientId) {
        // 查询会话成员表,userId通过客户端id获取该用户所有的群聊
        String userId = clientId.split("_")[2].trim();
        List<String> groupIdList = userService.getUserGroupChat(userId);
        Set<String> groupIdSet = new HashSet<>();
        if (groupIdList != null && !groupIdList.isEmpty()) {
            groupIdSet = new HashSet<>(groupIdList); // List → Set,自动去重
        }
        return sseService.registerClient(clientId, groupIdSet);
    }
 
    /**
     * 手动推送(调用 Service 的推送方法)
     */
    @PostMapping("/push")
    public String push(@RequestBody Map<String, String> params) {
        String clientId = params.get("clientId");
        String message = params.get("message");
        return sseService.sendMessage(clientId, "business", message);
    }
 
    /**
     * 获取在线客户端数量
     */
    @GetMapping("/count")
    public String getClientCount() {
        return "当前在线客户端数量:" + sseService.getConnectedClientCount();
    }
}

2、再实现控制层:

package com.xxx.controller;
 
import com.qiang.service.SseService;
import com.qiang.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
 
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
 
@RestController
@RequestMapping("/sse")
public class SseController {
 
    // 注入 SSE 服务类
    @Autowired
    private SseService sseService;
    @Autowired
    private UserService userService;
 
    /**
     * 建立 SSE 连接(调用 Service 的注册方法)
     */
    @GetMapping("/connect")
    public SseEmitter connect(@RequestParam String clientId) {
        // 查询会话成员表,userId通过客户端id获取该用户所有的群聊
        String userId = clientId.split("_")[2].trim();
        List<String> groupIdList = userService.getUserGroupChat(userId);
        Set<String> groupIdSet = new HashSet<>();
        if (groupIdList != null && !groupIdList.isEmpty()) {
            groupIdSet = new HashSet<>(groupIdList); // List → Set,自动去重
        }
        return sseService.registerClient(clientId, groupIdSet);
    }
 
    /**
     * 手动推送(调用 Service 的推送方法)
     */
    @PostMapping("/push")
    public String push(@RequestBody Map<String, String> params) {
        String clientId = params.get("clientId");
        String message = params.get("message");
        return sseService.sendMessage(clientId, "business", message);
    }
 
    /**
     * 获取在线客户端数量
     */
    @GetMapping("/count")
    public String getClientCount() {
        return "当前在线客户端数量:" + sseService.getConnectedClientCount();
    }
}

2、客户端(Electron 配置):

eventsource 是浏览器对象

我是在客户端主进程中接收消息的,要下载依赖。如果是在渲染进程或者是普通前端使用则不需要下载依赖

"eventsource": "^2.0.2"

1、工具函数:

// src/util/sseClient.js(主进程专用)
const EventSource = require('eventsource');
 
class SseClient {
    constructor(clientId) {
        this.clientId = clientId;
        this.isConnected = false; // 标记是否真正连接成功
        this.initSse();
    }
 
    initSse() {
        this.close(); // 关闭旧连接
 
        const sseUrl = `http://localhost:8088/sse/connect?clientId=${this.clientId}`;
        console.log(`[SSE] 尝试连接:${sseUrl}`);
 
        this.es = new EventSource(sseUrl);
 
        // 1. 连接成功(标记真正的连接状态)
        this.es.onopen = () => {
            this.isConnected = true;
            console.log('[SSE] 连接成功');
        };
 
        // 2. 优化错误处理(过滤无害错误)
        this.es.onerror = (e) => {
            // 过滤:连接成功前的无消息错误(无害)
            if (!this.isConnected && e.message === undefined) {
                console.log('[SSE] 初始化阶段临时错误(无害):', e.type);
                return; // 不打印错误,避免干扰
            }
 
            // 真正的错误(连接断开/失败)
            this.isConnected = false;
            console.error('[SSE] 真正的连接错误:', {
                type: e.type,
                message: e.message || '未知错误',
                readyState: this.es.readyState // 0:连接中, 1:已连接, 2:已关闭
            });
 
            // 仅在连接关闭时重连
            if (this.es.readyState === EventSource.CLOSED) {
                console.log(`[SSE] 3秒后尝试重连...`);
                setTimeout(() => this.initSse(), 3000);
            }
        };
 
        // 3. 正常接收消息
        this.es.onmessage = (e) => {
            try {
                const cleanData = e.data.replace(/^data: /, '').trim();
                const messageObj = JSON.parse(cleanData);
                // 打印解析结果(验证)
                console.log('[SSE] 解析后的完整对象:', messageObj);
            } catch (err) {
                // 解析失败时的容错
                console.warn('[SSE] 解析失败,原始数据:', e.data);
                console.error('[SSE] 解析错误详情:', err);
            }
        };
 
        // 4. 监听自定义事件(如 notification/business)
        this.es.addEventListener('notification', (e) => {
            const data = JSON.parse(e.data);
            console.log('[SSE] 通知消息:', data);
        });
    }
 
    close() {
        if (this.es) {
            this.es.close();
            this.es = null;
            this.isConnected = false;
        }
    }
 
    // 手动推送(修复后的 POST 版本)
    async triggerPush(message) {
        if (!this.isConnected) {
            console.warn('[SSE] 未连接,无法推送');
            return null;
        }
        try {
            const response = await fetch(`${this.serverUrl || 'http://localhost:8088'}/sse/push`, {
                method: 'POST',
                headers: { 'Content-Type': 'application/json' },
                body: JSON.stringify({ clientId: this.clientId, message })
            });
            const result = await response.text();
            console.log('[SSE] 手动推送结果:', result);
            return result;
        } catch (err) {
            console.error('[SSE] 手动推送失败:', err);
            return null;
        }
    }
}
 
module.exports = SseClient;

2、调用:

// clientId 要唯一,否则服务端推送消息时会有影响 
new SseClient("自定义的clientId");

到此这篇关于Springboot使用SSE推送消息到客户端的实现的文章就介绍到这了,更多相关Springboot SSE推送内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • springboot+dubbo+validation 进行rpc参数校验的实现方法

    springboot+dubbo+validation 进行rpc参数校验的实现方法

    这篇文章主要介绍了springboot+dubbo+validation 进行rpc参数校验的实现方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-09-09
  • 四步轻松搞定java web每天定时执行任务

    四步轻松搞定java web每天定时执行任务

    本篇文章主要介绍了四步轻松搞定java web每天定时执行任务,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-01-01
  • SpringBoot中的application.properties无法加载问题定位技巧

    SpringBoot中的application.properties无法加载问题定位技巧

    这篇文章主要介绍了SpringBoot中的application.properties无法加载问题定位技巧,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-05-05
  • SpringBoot Cache 二级缓存的使用

    SpringBoot Cache 二级缓存的使用

    本文主要介绍了SpringBoot Cache 二级缓存的使用,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-06-06
  • IDEA中设置Tab健为4个空格的方法

    IDEA中设置Tab健为4个空格的方法

    这篇文章给大家介绍了代码缩进用空格还是Tab?(IDEA中设置Tab健为4个空格)的相关知识,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧
    2021-03-03
  • Mybatis的动态Sql组合模式详情

    Mybatis的动态Sql组合模式详情

    这篇文章主要介绍了Mybatis的动态Sql组合模式详情,这篇文章从组合模式的角度分析了Mybatis动态sql的部分,SqlNode是组合模式的Component接口,更多相关内容需要的小伙伴可以参考一下
    2022-08-08
  • 基于SpringBoot实现多文件批量下载并打包为ZIP压缩包的完整解决方案

    基于SpringBoot实现多文件批量下载并打包为ZIP压缩包的完整解决方案

    在日常的 Web 开发中,文件下载是非常常见的功能需求,而多文件批量下载并打包为 ZIP 压缩包 更是高频场景(比如批量下载合同、图片、报表等),本文将基于 SpringBoot 框架,手把手教你实现这一功能,从核心思路到完整代码,让你快速掌握,需要的朋友可以参考下
    2026-02-02
  • java实现批量导入Excel表格数据到数据库

    java实现批量导入Excel表格数据到数据库

    这篇文章主要为大家详细介绍了java实现批量导入Excel表格数据到数据库,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2020-08-08
  • Java中常用的6种排序算法详细分解

    Java中常用的6种排序算法详细分解

    这篇文章主要介绍了Java中常用的6种排序算法详细分解,着重说明每个算法的计算过程分解,是探究实现原理级的文章,对于深入理解这些算法有很大帮助,需要的朋友可以参考下
    2014-07-07
  • 详解SpringBoot如何自定义自己的Starter组件

    详解SpringBoot如何自定义自己的Starter组件

    这篇文章主要为大家详细介绍了在SpringBoot中如何自定义自己的Starter组件,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下
    2024-03-03

最新评论