SpringBoot使用SSE进行实时通知前端的实现代码

 更新时间:2023年06月06日 10:17:41   作者:谁不想飞舞青春  
这篇文章主要介绍了SpringBoot使用SSE进行实时通知前端,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

说明

项目有个需求是要实时通知前端,告诉前端这个任务加载好了。然后想了2个方案,一种是用websocket进行长连接,一种是使用SSE(Sever Send Event),是HTTP协议中的一种,Content-Type为text/event-stream,能够保持长连接。
websocket是前端既能向后端发送消息,后端也能向前端发送消息。
SSE是只能后端向前端发送消息。
因为只需要后端通知,所以我这里选择了使用SSE实现。
这里先做个笔记,怕以后忘记怎么使用。

maven依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.3</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.project</groupId>
    <artifactId>test</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>test</name>
    <description>test</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <!--web依赖,内嵌入tomcat,SSE依赖于该jar包,只要有该依赖就能使用SSE-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--lombok依赖,用来对象省略写set、get方法-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.22</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

SSE工具类代码

package com.etone.project.utils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
@Slf4j
public class SseEmitterServer {
    /**
     * 当前连接数
     */
    private static AtomicInteger count = new AtomicInteger(0);
    private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
    public static SseEmitter connect(String userId){
        //设置超时时间,0表示不过期,默认是30秒,超过时间未完成会抛出异常
        SseEmitter sseemitter = new SseEmitter(0L);
        //注册回调
        sseemitter.onCompletion(completionCallBack(userId));
        //这个onError在springbooot低版本没有这个方法,公司springboot1.4.2版本,没有这个方法,可以进行注释。
        sseemitter.onError(errorCallBack(userId));
        sseemitter.onTimeout(timeoutCallBack(userId));
        sseEmitterMap.put(userId,sseemitter);
        //数量+1
        count.getAndIncrement();
        log.info("create new sse connect ,current user:{}",userId);
        return sseemitter;
    }
    /**
     * 给指定用户发消息
     */
    public static void sendMessage(String userId, String message){
        if(sseEmitterMap.containsKey(userId)){
            try{
                sseEmitterMap.get(userId).send(message);
            }catch (IOException e){
                log.error("user id:{}, send message error:{}",userId,e.getMessage());
                e.printStackTrace();
            }
        }
    }
    /**
     * 想多人发送消息,组播
     */
    public static void groupSendMessage(String groupId, String message){
        if(sseEmitterMap!=null&&!sseEmitterMap.isEmpty()){
            sseEmitterMap.forEach((k,v) -> {
                try{
                    if(k.startsWith(groupId)){
                        v.send(message, MediaType.APPLICATION_JSON);
                    }
                }catch (IOException e){
                    log.error("user id:{}, send message error:{}",groupId,message);
                    removeUser(k);
                }
            });
        }
    }
    public static void batchSendMessage(String message) {
        sseEmitterMap.forEach((k,v)->{
            try{
                v.send(message,MediaType.APPLICATION_JSON);
            }catch (IOException e){
                log.error("user id:{}, send message error:{}",k,e.getMessage());
                removeUser(k);
            }
        });
    }
    /**
     * 群发消息
     */
    public static void batchSendMessage(String message, Set<String> userIds){
        userIds.forEach(userid->sendMessage(userid,message));
    }
    //移除用户
    public static void removeUser(String userid){
        sseEmitterMap.remove(userid);
        //数量-1
        count.getAndDecrement();
        log.info("remove user id:{}",userid);
    }
    public static List<String> getIds(){
        return new ArrayList<>(sseEmitterMap.keySet());
    }
    public static int getUserCount(){
        return count.intValue();
    }
    private static Runnable completionCallBack(String userId) {
        return () -> {
            log.info("结束连接,{}",userId);
            removeUser(userId);
        };
    }
    private static Runnable timeoutCallBack(String userId){
        return ()->{
            log.info("连接超时,{}",userId);
            removeUser(userId);
        };
    }
    private static Consumer<Throwable> errorCallBack(String userId){
        return throwable -> {
            log.error("连接异常,{}",userId);
            removeUser(userId);
        };
    }
}

Controller测试代码

package com.project.test.controller;
import com.hjl.test.util.SseEmitterServer;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.HashMap;
import java.util.Map;
@RestController
@RequestMapping(value = "/test")
public class TestController {
    //sse连接接口
    @GetMapping (value = "/sse/connect/{id}")
    public SseEmitter connect(@PathVariable String id){
        return SseEmitterServer.connect(id);
    }
    //sse向指定用户发送消息接口
    @GetMapping (value = "/sse/send/{id}")
    public Map<String,Object> send(@PathVariable String id,@RequestParam(value = "message", required = false) String message){
        Map<String,Object> returnMap = new HashMap<>();
        //向指定用户发送信息
        SseEmitterServer.sendMessage(id,message);
        returnMap.put("message","向id为"+id+"的用户发送:"+message+"成功!");
        returnMap.put("status","200");
        returnMap.put("result",null);
        return returnMap;
    }
    //sse向所有已连接用户发送消息接口
    @GetMapping (value = "/sse/batchSend")
    public Map<String,Object> batchSend(@RequestParam(value = "message", required = false) String message){
        Map<String,Object> returnMap = new HashMap<>();
        //向指定用户发送信息
        SseEmitterServer.batchSendMessage(message);
        returnMap.put("message",message+"消息发送成功!");
        returnMap.put("status","200");
        returnMap.put("result",null);
        return returnMap;
    }
    //sse关闭接口
    @GetMapping (value = "/sse/close/{id}")
    public Map<String,Object> close(@PathVariable String id){
        Map<String,Object> returnMap = new HashMap<>();
        //移除id
        SseEmitterServer.removeUser(id);
        System.out.println("当前连接用户id:"+SseEmitterServer.getIds());
        returnMap.put("message","连接关闭成功!");
        returnMap.put("status","200");
        returnMap.put("result",null);
        return returnMap;
    }
}

测试结果如下:

这里测试SSE连接,就像正常接口那样请求就行。
本地调用接口/sse/connect/1如下:
这里我连接2个用户,用来模拟向指定用户id发送信息和批量向已连接的用户发送消。

后端服务打印如下:

本地调用接口/sse/send/1如下:

用户1的结果如下,发现它收到了消息:

用户2没有收到结果,如下:

本地调用接口/sse/batchSend如下:
批量向所有已经连接的用户发送消息。

用户1结果如下,发现接收到了消息:

用户2结果如下,发现也接收到了消息:

测试结果都符合预期。
点击postman的close按钮,关闭连接:

发现前端连接虽然关闭了,但是后端实际还在连接中,根本没有移除用户的提示:

所以这里还需要自己手动写关闭接口测试。
本地调用接口/sse/close/1如下:

可以看到把用户id为1的给移除了,只剩用户2还在连接中。

这里所有测试完成,结果符合预期。

注意

将超时时间由原来的0改为默认的30秒,会报错。

测试结果如下:

这里直接出现了一个异常:org.springframework.web.context.request.async.AsyncRequestTimeoutException
甚至连接都断开了。

将springboot降为低版本如1.4.2.RELEASE。

使用postman进行测试的时候,发现它不是一直在请求中:如下:

将Springboot降为1.4.2.RELEASE

springboot的1.4.2.RELEASE版本没有onError方法,需要注释掉。

postman测试如下:
低版本测试的时候发现它有一个这个连接可以直接看到,而使用springboot版本2.x版本就发现它一直处于发送请求的状态,什么时候后端向前端发送了消息,它就显示这个。
springboot的1.4.2.RELEASE版本结果:

springboot的2.7.3版本结果:

这里先将这种情况先记录下来先,等后面有时间再研究。怎么高版本就不能向低版本那样返回这个连接信息呢?所以SpringBoot高版本使用SSE连接的时候一直处于Sending request这种情况,这种情况是正常的吗?

到此这篇关于SpringBoot使用SSE进行实时通知前端的文章就介绍到这了,更多相关SpringBoot实时通知前端内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • springboot 集成easy-captcha实现图像验证码显示和登录

    springboot 集成easy-captcha实现图像验证码显示和登录

    本文主要介绍了springboot 集成easy-captcha实现图像验证码显示和登录,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-04-04
  • Java线程池ThreadPoolExecutor的使用及其原理详细解读

    Java线程池ThreadPoolExecutor的使用及其原理详细解读

    这篇文章主要介绍了Java线程池ThreadPoolExecutor的使用及其原理详细解读,线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务,线程池线程都是后台线程,需要的朋友可以参考下
    2023-12-12
  • 一文教你如何更改IDEA已有项目的路径/名称

    一文教你如何更改IDEA已有项目的路径/名称

    由于IDEA项目路径中有中文、空格等特殊符号,影响正常使用,想要修改路径名称,怎么正确修改IDEA项目名称,使其正常运行呢?所以本文小编讲给大家详细的介绍了更改IDEA已有项目的路径/名称解决方案,需要的朋友可以参考下
    2023-11-11
  • MyBatis详细执行流程的全纪录

    MyBatis详细执行流程的全纪录

    这篇文章主要给大家介绍了关于MyBatis详细执行流程的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-04-04
  • java compareTo和compare方法比较详解

    java compareTo和compare方法比较详解

    这篇文章主要介绍了java compareTo和compare方法比较详解,本篇文章通过简要的案例,讲解了该项技术的了解与使用,以下就是详细内容,需要的朋友可以参考下
    2021-09-09
  • servlet监听器的学习使用(三)

    servlet监听器的学习使用(三)

    这篇文章主要为大家详细介绍了servlet监听器学习使用的相关资料,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-09-09
  • java中使用xls格式化xml的实例

    java中使用xls格式化xml的实例

    这篇文章主要介绍了java中调用xls格式化xml的实例的相关资料,需要的朋友可以参考下
    2017-07-07
  • mybatis的使用-Mapper文件各种语法介绍

    mybatis的使用-Mapper文件各种语法介绍

    这篇文章主要介绍了mybatis的使用-Mapper文件各种语法介绍,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-11-11
  • Java发送https请求并跳过ssl证书验证方法

    Java发送https请求并跳过ssl证书验证方法

    最近在负责一个对接第三方服务的事情,在对接期间因为第三方服务为https的请求,这篇文章主要给大家介绍了关于Java发送https请求并跳过ssl证书验证的相关资料,需要的朋友可以参考下
    2023-11-11
  • Java8 lambda表达式2种常用方法代码解析

    Java8 lambda表达式2种常用方法代码解析

    这篇文章主要介绍了Java8 lambda表达式2种常用方法代码解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-08-08

最新评论