SpringBoot实现SSE(Server-Sent Events)的完整指南

 更新时间:2025年09月24日 09:40:28   作者:堕落年代  
本文展示了如何在Spring Boot应用中实现SSE,通过简单的步骤和代码示例,你可以轻松地在你的Web应用中添加实时数据推送功能,需要的朋友可以参考下

引言

在 Spring Boot 中实现 SSE (Server-Sent Events) 非常简单,SSE 是一种服务器向客户端推送事件的技术。以下是完整的实现步骤:

基础实现

1. 添加依赖

确保 pom.xml 中包含 Spring Web 依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

2. 创建 SSE 控制器

import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@RestController
public class SseController {

    // 用于异步发送事件的线程池
    private final ExecutorService executor = Executors.newCachedThreadPool();

    @GetMapping(path = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter handleSse() {
        SseEmitter emitter = new SseEmitter(60_000L); // 设置超时时间(毫秒)
        
        // 在单独的线程中发送事件
        executor.execute(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    // 构建事件
                    SseEmitter.SseEventBuilder event = SseEmitter.event()
                            .id(String.valueOf(i)) // 事件ID
                            .name("message")      // 事件名称
                            .data("Event #" + i);  // 事件数据
                    
                    // 发送事件
                    emitter.send(event);
                    
                    // 模拟延迟
                    Thread.sleep(1000);
                }
                
                // 完成发送
                emitter.complete();
            } catch (IOException | InterruptedException e) {
                // 发生错误时关闭连接
                emitter.completeWithError(e);
            }
        });
        
        // 处理完成和超时事件
        emitter.onCompletion(() -> System.out.println("SSE completed"));
        emitter.onTimeout(() -> {
            System.out.println("SSE timeout");
            emitter.complete();
        });
        
        return emitter;
    }
}

3. 前端监听 SSE

<!DOCTYPE html>
<html>
<head>
    <title>SSE Demo</title>
</head>
<body>
    <div id="events"></div>

    <script>
        const eventSource = new EventSource('/sse');
        
        // 监听消息事件
        eventSource.onmessage = function(event) {
            const data = event.data;
            const element = document.createElement('p');
            element.textContent = 'Received: ' + data;
            document.getElementById('events').appendChild(element);
        };
        
        // 监听自定义事件
        eventSource.addEventListener('message', function(event) {
            console.log('Custom event:', event.data);
        });
        
        // 错误处理
        eventSource.onerror = function(error) {
            console.error('EventSource error:', error);
            eventSource.close();
        };
    </script>
</body>
</html>

添加鉴权支持

1. 基于 Token 的鉴权

@GetMapping(path = "/secure-sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter handleSecureSse(@RequestHeader("Authorization") String token) {
    // 验证Token
    if (!isValidToken(token)) {
        throw new ResponseStatusException(HttpStatus.UNAUTHORIZED, "Invalid token");
    }
    
    SseEmitter emitter = new SseEmitter();
    
    // 获取用户信息
    User user = getUserFromToken(token);
    
    executor.execute(() -> {
        try {
            // 发送个性化事件
            emitter.send(SseEmitter.event()
                .data("Welcome, " + user.getName())
                .name("greeting"));
                
            // 继续发送其他事件...
        } catch (IOException e) {
            emitter.completeWithError(e);
        }
    });
    
    return emitter;
}

private boolean isValidToken(String token) {
    // 实现Token验证逻辑
    return token != null && token.startsWith("Bearer ");
}

private User getUserFromToken(String token) {
    // 从Token中提取用户信息
    return new User("John Doe"); // 示例
}

2. 前端发送鉴权信息

const token = "Bearer your_jwt_token_here";
const eventSource = new EventSource('/secure-sse', {
    headers: {
        Authorization: token
    }
});

高级功能实现

1. 广播事件给多个客户端

import org.springframework.stereotype.Service;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

@Service
public class SseService {
    private final List<SseEmitter> emitters = new CopyOnWriteArrayList<>();
    
    public SseEmitter subscribe() {
        SseEmitter emitter = new SseEmitter(60_000L);
        emitters.add(emitter);
        
        emitter.onCompletion(() -> emitters.remove(emitter));
        emitter.onTimeout(() -> emitters.remove(emitter));
        
        return emitter;
    }
    
    public void broadcast(String eventName, Object data) {
        for (SseEmitter emitter : emitters) {
            try {
                emitter.send(SseEmitter.event()
                    .name(eventName)
                    .data(data));
            } catch (IOException e) {
                emitter.completeWithError(e);
            }
        }
    }
}

2. 在控制器中使用广播服务

@RestController
public class SseController {
    
    private final SseService sseService;
    
    public SseController(SseService sseService) {
        this.sseService = sseService;
    }
    
    @GetMapping("/subscribe")
    public SseEmitter subscribe() {
        return sseService.subscribe();
    }
    
    @PostMapping("/broadcast")
    public ResponseEntity<String> broadcastMessage(@RequestBody String message) {
        sseService.broadcast("message", message);
        return ResponseEntity.ok("Message broadcasted");
    }
}

3. 发送 JSON 数据

emitter.send(SseEmitter.event()
    .name("userUpdate")
    .data(new User("Alice", "alice@example.com"), MediaType.APPLICATION_JSON));

4. 重连机制

let eventSource;

function connectSSE() {
    eventSource = new EventSource('/sse');
    
    eventSource.onmessage = event => {
        console.log('Received:', event.data);
    };
    
    eventSource.onerror = () => {
        console.log('Connection lost. Reconnecting...');
        eventSource.close();
        setTimeout(connectSSE, 3000); // 3秒后重连
    };
}

connectSSE(); // 初始连接

生产环境最佳实践

1. 配置超时和心跳

@Bean
public SseEmitter createSseEmitter() {
    SseEmitter emitter = new SseEmitter(120_000L); // 2分钟超时
    
    // 心跳机制
    ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
    scheduler.scheduleAtFixedRate(() -> {
        try {
            emitter.send(SseEmitter.event()
                .name("heartbeat")
                .data("ping"));
        } catch (IOException e) {
            scheduler.shutdown();
        }
    }, 0, 30, TimeUnit.SECONDS); // 每30秒发送心跳
    
    return emitter;
}

2. 异常处理

@RestControllerAdvice
public class SseExceptionHandler {

    @ExceptionHandler(SseException.class)
    public ResponseEntity<String> handleSseException(SseException ex) {
        return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
                .body("SSE Error: " + ex.getMessage());
    }
}

3. CORS 配置

@Configuration
public class WebConfig implements WebMvcConfigurer {

    @Override
    public void addCorsMappings(CorsRegistry registry) {
        registry.addMapping("/sse/**")
            .allowedOrigins("https://your-frontend.com")
            .allowedMethods("GET")
            .allowCredentials(true);
    }
}

4. 性能优化

@Configuration
public class AsyncConfig implements AsyncConfigurer {

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(50);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("SSE-Executor-");
        executor.initialize();
        return executor;
    }
}

完整示例:实时股票报价

后端控制器

@RestController
public class StockController {

    private final SseService sseService;
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

    public StockController(SseService sseService) {
        this.sseService = sseService;
        startStockUpdates();
    }

    @GetMapping("/stocks")
    public SseEmitter getStockUpdates() {
        return sseService.subscribe();
    }

    private void startStockUpdates() {
        scheduler.scheduleAtFixedRate(() -> {
            Map<String, Double> stocks = Map.of(
                "AAPL", 150 + Math.random() * 10,
                "MSFT", 250 + Math.random() * 15,
                "GOOGL", 2800 + Math.random() * 50
            );
            sseService.broadcast("stockUpdate", stocks);
        }, 0, 2, TimeUnit.SECONDS);
    }
}

前端实现

<div id="stock-prices"></div>

<script>
const eventSource = new EventSource('/stocks');

eventSource.addEventListener('stockUpdate', event => {
    const stocks = JSON.parse(event.data);
    let html = '<h2>Stock Prices</h2><ul>';
    
    for (const [symbol, price] of Object.entries(stocks)) {
        html += `<li>${symbol}: $${price.toFixed(2)}</li>`;
    }
    
    html += '</ul>';
    document.getElementById('stock-prices').innerHTML = html;
});
</script>

部署注意事项

1. 负载均衡配置

# Nginx 配置
location /sse {
    proxy_pass http://backend;
    proxy_http_version 1.1;
    proxy_set_header Connection '';
    proxy_buffering off;
}

2. Spring Boot 配置

# application.properties
server.servlet.context-path=/api
spring.mvc.async.request-timeout=120000 # 2分钟超时

3. 监控端点

@Endpoint(id = "sse")
public class SseEndpoint {

    private final SseService sseService;

    public SseEndpoint(SseService sseService) {
        this.sseService = sseService;
    }

    @ReadOperation
    public Map<String, Object> sseMetrics() {
        return Map.of(
            "activeConnections", sseService.getActiveConnections(),
            "lastBroadcast", sseService.getLastBroadcastTime()
        );
    }
}

最佳实践总结

  1. 使用专用服务类:封装 SSE 逻辑,提高代码复用性
  2. 实现心跳机制:防止连接超时断开
  3. 添加鉴权支持:保护敏感数据
  4. 优雅处理错误:实现异常处理和重连机制
  5. 监控连接状态:使用 Actuator 端点监控 SSE 连接
  6. 优化线程池:合理配置异步处理线程
  7. 前端重连逻辑:自动恢复断开连接

通过以上实现,您可以在 Spring Boot 应用中轻松创建 SSE 端点,实现服务器向客户端的实时事件推送,同时满足鉴权需求。

以上就是SpringBoot实现SSE(Server-Sent Events)完整指南的详细内容,更多关于SpringBoot实现SSE(Server-Sent Events)的资料请关注脚本之家其它相关文章!

相关文章

  • Spring中的Sentinel规则持久化解析

    Spring中的Sentinel规则持久化解析

    这篇文章主要介绍了Spring中的Sentinel规则持久化解析,具体内容包括,Sentinel规则推送三种模式介绍,原始模式,拉模式,推模式,并对基于Nacos配置中心控制台实现推送进行详尽介绍,需要的朋友可以参考下
    2023-09-09
  • Spring中缓存注解@Cache的使用详解

    Spring中缓存注解@Cache的使用详解

    这篇文章主要介绍了Spring中缓存注解@Cache的使用详解,使用注解对数据进行缓存功能的框架,只需要简单地加一个注解,就能实现缓存功能,大大简化我们在业务中操作缓存的代码,需要的朋友可以参考下
    2023-07-07
  • Liquibase结合SpringBoot使用实现数据库管理功能

    Liquibase结合SpringBoot使用实现数据库管理功能

    Liquibase 是一个强大的数据库管理工具,它帮助你通过自动化管理数据库的变更、版本控制、和回滚,简化了开发中的数据库迁移工作,这篇文章主要介绍了Liquibase结合SpringBoot使用实现数据库管理,需要的朋友可以参考下
    2024-12-12
  • Java 并发编程学习笔记之核心理论基础

    Java 并发编程学习笔记之核心理论基础

    编写优质的并发代码是一件难度极高的事情。Java语言从第一版本开始内置了对多线程的支持,这一点在当年是非常了不起的,但是当我们对并发编程有了更深刻的认识和更多的实践后,实现并发编程就有了更多的方案和更好的选择。本文是对并发编程的核心理论做了下小结
    2016-05-05
  • 详谈Java静态动态的问题

    详谈Java静态动态的问题

    下面小编就为大家带来一篇详谈Java静态动态的问题。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-09-09
  • 深入探究HashMap二次Hash原因

    深入探究HashMap二次Hash原因

    在java开发中,HashMap是最常用、最常见的集合容器类之一,文中通过示例代码介绍HashMap为啥要二次Hash,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-01-01
  • 详解SpringBoot初始教程之Tomcat、Https配置以及Jetty优化

    详解SpringBoot初始教程之Tomcat、Https配置以及Jetty优化

    本篇文章主要介绍了详解SpringBoot初始教程之Tomcat、Https配置以及Jetty优化,具有一定的参考价值,有兴趣的可以了解一下
    2017-09-09
  • Spring Native 基础环境搭建过程

    Spring Native 基础环境搭建过程

    Spring Native可以通过GraalVM将Spring应用程序编译成原生镜像,提供了一种新的方式来部署Spring应用,本文介绍Spring Native基础环境搭建,感兴趣的朋友跟随小编一起看看吧
    2024-02-02
  • SpringMVC中重定向model值的获取方式

    SpringMVC中重定向model值的获取方式

    这篇文章主要介绍了SpringMVC中重定向model值的获取方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-12-12
  • Sentinel Dashboard限流规则保存方式

    Sentinel Dashboard限流规则保存方式

    这篇文章主要介绍了Sentinel Dashboard限流规则保存方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-06-06

最新评论