springboot -sse -flux 服务器推送消息的方法

 更新时间:2023年11月24日 14:52:07   作者:浮生若梦l  
这篇文章主要介绍了springboot -sse -flux 服务器推送消息的相关知识,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧

先说BUG处理,遇到提示异步问题 Async support must be enabled on a servlet and for all filters involved in async request processing. This is done in Java code using the Servlet API or by adding "<async-supported>true</async-supported>" to servlet and filter declarations in web.xml.

springboot在@WebFilter注解处,加入urlPatterns = { "/*" },asyncSupported = true

springmvc在web.xml处理

<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee"
         xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
         version="3.0">
<filter-mapping>
  <filter-name>shiroFilter</filter-name>
  <url-pattern>/*</url-pattern>
  <dispatcher>REQUEST</dispatcher>
  <dispatcher>ASYNC</dispatcher>
</filter-mapping>

demo1,服务器间隔一定时间推送内容    

接口方法

@GetMapping(path = "/sse/{userId}",produces = MediaType.TEXT_EVENT_STREAM_VALUE )
	public Flux<ServerSentEvent<String>> sse(@PathVariable String userId) {
        // 每两秒推送一次
        return Flux.interval(Duration.ofSeconds(2)).map(seq->
            Tuples.of(seq, LocalDateTime.now())).log()//序号和时间
                .map(data-> ServerSentEvent.<String>builder().id(userId).data(data.getT1().toString()).build());//推送内容
    }

2.前端代码

<!DOCTYPE html>
<html xmlns:th="http://www.thymeleaf.org">
<head>
    <meta charset="UTF-8"/>
    <title>服务器推送事件</title>
</head>
<body>
<div>    
    <div id="data"></div>    
    <div id="result"></div><br/>
</div>
<script th:inline="javascript" >
//服务器推送事件
if (typeof (EventSource) !== "undefined") { 
    var source1 = new EventSource("http://localhost:9000/api/admin/test/sse/1");
    //当抓取到消息时
    source1.onmessage = function (evt) {
        document.getElementById("data").innerHTML = document.getElementById("data").innerHTML+"股票行情:" + evt.data;
    };
} else {
    //注意:ie浏览器不支持
    document.getElementById("result").innerHTML = "抱歉,你的浏览器不支持 server-sent 事件...";  
    var xhr;
    var xhr2;
    if (window.XMLHttpRequest){
        //IE7+, Firefox, Chrome, Opera, Safari浏览器支持该方法
        xhr=new XMLHttpRequest();
        xhr2=new XMLHttpRequest();
    }else{
        //IE6, IE5 浏览器不支持,使用ActiveXObject方法代替
        xhr=new ActiveXObject("Microsoft.XMLHTTP");
        xhr2=new ActiveXObject("Microsoft.XMLHTTP");
    }
    console.log(xhr);
    console.log(xhr2);
    xhr.open('GET', '/sse/countDown');
    xhr.send(null);//发送请求
    xhr.onreadystatechange = function() {
        console.log("s响应状态:" + xhr.readyState);
        //2是空响应,3是响应一部分,4是响应完成
        if (xhr.readyState > 2) {
            //这儿可以使用response(对应json)与responseText(对应text)
            var newData = xhr.response.substr(xhr.seenBytes);
            newData = newData.replace(/\n/g, "#");
            newData = newData.substring(0, newData.length - 1);
            var data = newData.split("#");
            console.log("获取到的数据:" + data);
            document.getElementById("result").innerHTML = data;
            //长度重新赋值,下次截取时需要使用
            xhr.seenBytes = xhr.response.length;
        }
    }
    xhr2.open('GET', '/sse/retrieve');
    xhr2.send(null);//发送请求
    xhr2.onreadystatechange = function() {
        console.log("s响应状态:" + xhr2.readyState);
        //0: 请求未初始化,2 请求已接收,3 请求处理中,4  请求已完成,且响应已就绪
        if (xhr2.readyState > 2) {
            //这儿可以使用response(对应json)与responseText(对应text)
            var newData1 = xhr2.response.substr(xhr2.seenBytes);
            newData1 = newData1.replace(/\n/g, "#");
            newData1 = newData1.substring(0, newData1.length - 1);
            var data1 = newData1.split("#");
            console.log("获取到的数据:" + data1);
            document.getElementById("data").innerHTML = data1;
            //长度重新赋值,下次截取时需要使用
            xhr2.seenBytes = xhr2.response.length;
        }
    }
}
</script>
</body>
</html>

demo2 订阅服务器消息,服务器send推送消息完成后,关闭sse.close

1.接口方法以及工具类

@GetMapping(path = "/sse/sub",produces = MediaType.TEXT_EVENT_STREAM_VALUE )
public SseEmitter subscribe(@RequestParam String questionId,HttpServletResponse response) {
		// 简单异步发消息 ====
        //questionId 订阅id,id对应了sse对象
        new Thread(() -> {
            try {
                Thread.sleep(1000);
                for (int i = 0; i < 10; i++) {
                    Thread.sleep(500);
                    SSEUtils.pubMsg(questionId, questionId + " - kingtao come " + i);
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                // 消息发送完关闭订阅
                SSEUtils.closeSub(questionId);
            }
        }).start();
        // =================
        return SSEUtils.addSub(questionId);
    }

工具类

import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class SSEUtils {
    // timeout
    private static Long DEFAULT_TIME_OUT = 2*60*1000L;
    // 订阅表
    private static Map<String, SseEmitter> subscribeMap = new ConcurrentHashMap<>();
    /** 添加订阅 */
    public static SseEmitter addSub(String questionId) {
        if (null == questionId || "".equals(questionId)) {
            return null;
        }
        SseEmitter emitter = subscribeMap.get(questionId);
        if (null == emitter) {
            emitter = new SseEmitter(DEFAULT_TIME_OUT);
            subscribeMap.put(questionId, emitter);
        }
        return emitter;
    }
    /** 发消息 */
    public static void pubMsg(String questionId, String msg) {
        SseEmitter emitter = subscribeMap.get(questionId);
        if (null != emitter) {
            try {
                // 更规范的消息结构看源码
                emitter.send(SseEmitter.event().data(msg));
            } catch (Exception e) {
                // e.printStackTrace();
            }
        }
    }
    /**
     * 关闭订阅 
     * @param questionId
     */
    public static void closeSub(String questionId) {
        SseEmitter emitter = subscribeMap.get(questionId);
        if (null != emitter) {
            try {
                emitter.complete();
                subscribeMap.remove(questionId);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

2.前端代码

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>sse</title>
</head>
<body>
<div>
    <label>问题id</label>
    <input type="text" id="questionId">
    <button onclick="subscribe()">订阅</button>
    <hr>
    <label>F12-console控制台查看消息</label>
</div>
<script>
    function subscribe() {
        let questionId = document.getElementById('questionId').value;
        let url = 'http://localhost:9000/api/admin/test/sse/sub?questionId=' + questionId;
        let eventSource = new EventSource(url);
        eventSource.onmessage = function (e) {
            console.log(e.data);
        };
        eventSource.onopen = function (e) {
			 console.log(e,1);
            // todo
        };
        eventSource.onerror = function (e) {
            // todo
						 console.log(e,2);
            eventSource.close()
        };
    }
</script>
</body>
</html>

到此这篇关于springboot -sse -flux 服务器推送消息的文章就介绍到这了,更多相关springboot sse flux 服务器内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • MyBatis关闭一级缓存的两种方式(分注解和xml两种方式)

    MyBatis关闭一级缓存的两种方式(分注解和xml两种方式)

    这篇文章主要介绍了MyBatis关闭一级缓存的两种方式(分注解和xml两种方式),mybatis默认开启一级缓存,执行2次相同sql,但是第一次查询sql结果会加工处理这个时候需要关闭一级缓存,本文给大家详细讲解需要的朋友可以参考下
    2022-11-11
  • jpa介绍以及在spring boot中使用详解

    jpa介绍以及在spring boot中使用详解

    最近在项目中使用了一下jpa,发现还是挺好用的。这里就来讲一下jpa以及在spring boot中的使用。在这里我们先来了解一下jpa,希望能给你带来帮助
    2021-08-08
  • MyBatis XML方式的基本用法之多表查询功能的示例代码

    MyBatis XML方式的基本用法之多表查询功能的示例代码

    这篇文章主要介绍了MyBatis XML方式的基本用法之多表查询功能的示例代码,本文通过示例文字相结合的形式给大家介绍的非常详细,具有一定的参考借鉴价值,需要的朋友可以参考下
    2019-07-07
  • springboot中RestTemplate配置HttpClient连接池详解

    springboot中RestTemplate配置HttpClient连接池详解

    这篇文章主要介绍了springboot中RestTemplate配置HttpClient连接池详解,这些Http连接工具,使用起来都比较复杂,如果项目中使用的是Spring框架,可以使用Spring自带的RestTemplate来进行Http连接请求,需要的朋友可以参考下
    2023-11-11
  • Java反射之类的实例对象的三种表示方式总结

    Java反射之类的实例对象的三种表示方式总结

    下面小编就为大家带来一篇Java反射之类的实例对象的三种表示方式总结。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2016-10-10
  • mybatis-plus插入一条数据,获取插入数据自动生成的主键问题

    mybatis-plus插入一条数据,获取插入数据自动生成的主键问题

    这篇文章主要介绍了mybatis-plus插入一条数据,获取插入数据自动生成的主键问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2023-12-12
  • java中常见的死锁以及解决方法代码

    java中常见的死锁以及解决方法代码

    这篇文章主要介绍了java中常见的死锁以及解决方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-03-03
  • MybatisPlus多表连接查询的具体实现

    MybatisPlus多表连接查询的具体实现

    MyBatis Plus是一款针对MyBatis框架的增强工具, 它提供了很多方便的方法来实现多表联查,本文主要介绍了MybatisPlus多表连接查询的具体实现,具有一定的参考价值,感兴趣的可以了解一下
    2023-10-10
  • SpringCloud OpenFeign Post请求400错误解决方案

    SpringCloud OpenFeign Post请求400错误解决方案

    这篇文章主要介绍了SpringCloud OpenFeign Post请求400错误解决方案,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-09-09
  • java哈夫曼树实例代码

    java哈夫曼树实例代码

    这篇文章主要为大家介绍了java哈夫曼树实例代码,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2016-08-08

最新评论