SpringBoot实现SSE(Server-Sent Events)的完整指南

 更新时间:2025年09月24日 09:40:28   作者:堕落年代  
本文展示了如何在Spring Boot应用中实现SSE,通过简单的步骤和代码示例,你可以轻松地在你的Web应用中添加实时数据推送功能,需要的朋友可以参考下

引言

在 Spring Boot 中实现 SSE (Server-Sent Events) 非常简单,SSE 是一种服务器向客户端推送事件的技术。以下是完整的实现步骤:

基础实现

1. 添加依赖

确保 pom.xml 中包含 Spring Web 依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

2. 创建 SSE 控制器

import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@RestController
public class SseController {

    // 用于异步发送事件的线程池
    private final ExecutorService executor = Executors.newCachedThreadPool();

    @GetMapping(path = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter handleSse() {
        SseEmitter emitter = new SseEmitter(60_000L); // 设置超时时间(毫秒)
        
        // 在单独的线程中发送事件
        executor.execute(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    // 构建事件
                    SseEmitter.SseEventBuilder event = SseEmitter.event()
                            .id(String.valueOf(i)) // 事件ID
                            .name("message")      // 事件名称
                            .data("Event #" + i);  // 事件数据
                    
                    // 发送事件
                    emitter.send(event);
                    
                    // 模拟延迟
                    Thread.sleep(1000);
                }
                
                // 完成发送
                emitter.complete();
            } catch (IOException | InterruptedException e) {
                // 发生错误时关闭连接
                emitter.completeWithError(e);
            }
        });
        
        // 处理完成和超时事件
        emitter.onCompletion(() -> System.out.println("SSE completed"));
        emitter.onTimeout(() -> {
            System.out.println("SSE timeout");
            emitter.complete();
        });
        
        return emitter;
    }
}

3. 前端监听 SSE

<!DOCTYPE html>
<html>
<head>
    <title>SSE Demo</title>
</head>
<body>
    <div id="events"></div>

    <script>
        const eventSource = new EventSource('/sse');
        
        // 监听消息事件
        eventSource.onmessage = function(event) {
            const data = event.data;
            const element = document.createElement('p');
            element.textContent = 'Received: ' + data;
            document.getElementById('events').appendChild(element);
        };
        
        // 监听自定义事件
        eventSource.addEventListener('message', function(event) {
            console.log('Custom event:', event.data);
        });
        
        // 错误处理
        eventSource.onerror = function(error) {
            console.error('EventSource error:', error);
            eventSource.close();
        };
    </script>
</body>
</html>

添加鉴权支持

1. 基于 Token 的鉴权

@GetMapping(path = "/secure-sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter handleSecureSse(@RequestHeader("Authorization") String token) {
    // 验证Token
    if (!isValidToken(token)) {
        throw new ResponseStatusException(HttpStatus.UNAUTHORIZED, "Invalid token");
    }
    
    SseEmitter emitter = new SseEmitter();
    
    // 获取用户信息
    User user = getUserFromToken(token);
    
    executor.execute(() -> {
        try {
            // 发送个性化事件
            emitter.send(SseEmitter.event()
                .data("Welcome, " + user.getName())
                .name("greeting"));
                
            // 继续发送其他事件...
        } catch (IOException e) {
            emitter.completeWithError(e);
        }
    });
    
    return emitter;
}

private boolean isValidToken(String token) {
    // 实现Token验证逻辑
    return token != null && token.startsWith("Bearer ");
}

private User getUserFromToken(String token) {
    // 从Token中提取用户信息
    return new User("John Doe"); // 示例
}

2. 前端发送鉴权信息

const token = "Bearer your_jwt_token_here";
const eventSource = new EventSource('/secure-sse', {
    headers: {
        Authorization: token
    }
});

高级功能实现

1. 广播事件给多个客户端

import org.springframework.stereotype.Service;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

@Service
public class SseService {
    private final List<SseEmitter> emitters = new CopyOnWriteArrayList<>();
    
    public SseEmitter subscribe() {
        SseEmitter emitter = new SseEmitter(60_000L);
        emitters.add(emitter);
        
        emitter.onCompletion(() -> emitters.remove(emitter));
        emitter.onTimeout(() -> emitters.remove(emitter));
        
        return emitter;
    }
    
    public void broadcast(String eventName, Object data) {
        for (SseEmitter emitter : emitters) {
            try {
                emitter.send(SseEmitter.event()
                    .name(eventName)
                    .data(data));
            } catch (IOException e) {
                emitter.completeWithError(e);
            }
        }
    }
}

2. 在控制器中使用广播服务

@RestController
public class SseController {
    
    private final SseService sseService;
    
    public SseController(SseService sseService) {
        this.sseService = sseService;
    }
    
    @GetMapping("/subscribe")
    public SseEmitter subscribe() {
        return sseService.subscribe();
    }
    
    @PostMapping("/broadcast")
    public ResponseEntity<String> broadcastMessage(@RequestBody String message) {
        sseService.broadcast("message", message);
        return ResponseEntity.ok("Message broadcasted");
    }
}

3. 发送 JSON 数据

emitter.send(SseEmitter.event()
    .name("userUpdate")
    .data(new User("Alice", "alice@example.com"), MediaType.APPLICATION_JSON));

4. 重连机制

let eventSource;

function connectSSE() {
    eventSource = new EventSource('/sse');
    
    eventSource.onmessage = event => {
        console.log('Received:', event.data);
    };
    
    eventSource.onerror = () => {
        console.log('Connection lost. Reconnecting...');
        eventSource.close();
        setTimeout(connectSSE, 3000); // 3秒后重连
    };
}

connectSSE(); // 初始连接

生产环境最佳实践

1. 配置超时和心跳

@Bean
public SseEmitter createSseEmitter() {
    SseEmitter emitter = new SseEmitter(120_000L); // 2分钟超时
    
    // 心跳机制
    ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
    scheduler.scheduleAtFixedRate(() -> {
        try {
            emitter.send(SseEmitter.event()
                .name("heartbeat")
                .data("ping"));
        } catch (IOException e) {
            scheduler.shutdown();
        }
    }, 0, 30, TimeUnit.SECONDS); // 每30秒发送心跳
    
    return emitter;
}

2. 异常处理

@RestControllerAdvice
public class SseExceptionHandler {

    @ExceptionHandler(SseException.class)
    public ResponseEntity<String> handleSseException(SseException ex) {
        return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
                .body("SSE Error: " + ex.getMessage());
    }
}

3. CORS 配置

@Configuration
public class WebConfig implements WebMvcConfigurer {

    @Override
    public void addCorsMappings(CorsRegistry registry) {
        registry.addMapping("/sse/**")
            .allowedOrigins("https://your-frontend.com")
            .allowedMethods("GET")
            .allowCredentials(true);
    }
}

4. 性能优化

@Configuration
public class AsyncConfig implements AsyncConfigurer {

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(50);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("SSE-Executor-");
        executor.initialize();
        return executor;
    }
}

完整示例:实时股票报价

后端控制器

@RestController
public class StockController {

    private final SseService sseService;
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

    public StockController(SseService sseService) {
        this.sseService = sseService;
        startStockUpdates();
    }

    @GetMapping("/stocks")
    public SseEmitter getStockUpdates() {
        return sseService.subscribe();
    }

    private void startStockUpdates() {
        scheduler.scheduleAtFixedRate(() -> {
            Map<String, Double> stocks = Map.of(
                "AAPL", 150 + Math.random() * 10,
                "MSFT", 250 + Math.random() * 15,
                "GOOGL", 2800 + Math.random() * 50
            );
            sseService.broadcast("stockUpdate", stocks);
        }, 0, 2, TimeUnit.SECONDS);
    }
}

前端实现

<div id="stock-prices"></div>

<script>
const eventSource = new EventSource('/stocks');

eventSource.addEventListener('stockUpdate', event => {
    const stocks = JSON.parse(event.data);
    let html = '<h2>Stock Prices</h2><ul>';
    
    for (const [symbol, price] of Object.entries(stocks)) {
        html += `<li>${symbol}: $${price.toFixed(2)}</li>`;
    }
    
    html += '</ul>';
    document.getElementById('stock-prices').innerHTML = html;
});
</script>

部署注意事项

1. 负载均衡配置

# Nginx 配置
location /sse {
    proxy_pass http://backend;
    proxy_http_version 1.1;
    proxy_set_header Connection '';
    proxy_buffering off;
}

2. Spring Boot 配置

# application.properties
server.servlet.context-path=/api
spring.mvc.async.request-timeout=120000 # 2分钟超时

3. 监控端点

@Endpoint(id = "sse")
public class SseEndpoint {

    private final SseService sseService;

    public SseEndpoint(SseService sseService) {
        this.sseService = sseService;
    }

    @ReadOperation
    public Map<String, Object> sseMetrics() {
        return Map.of(
            "activeConnections", sseService.getActiveConnections(),
            "lastBroadcast", sseService.getLastBroadcastTime()
        );
    }
}

最佳实践总结

  1. 使用专用服务类:封装 SSE 逻辑,提高代码复用性
  2. 实现心跳机制:防止连接超时断开
  3. 添加鉴权支持:保护敏感数据
  4. 优雅处理错误:实现异常处理和重连机制
  5. 监控连接状态:使用 Actuator 端点监控 SSE 连接
  6. 优化线程池:合理配置异步处理线程
  7. 前端重连逻辑:自动恢复断开连接

通过以上实现,您可以在 Spring Boot 应用中轻松创建 SSE 端点,实现服务器向客户端的实时事件推送,同时满足鉴权需求。

以上就是SpringBoot实现SSE(Server-Sent Events)完整指南的详细内容,更多关于SpringBoot实现SSE(Server-Sent Events)的资料请关注脚本之家其它相关文章!

相关文章

  • Springboot启动原理详细讲解

    Springboot启动原理详细讲解

    这篇文章主要介绍了SpringBoot启动原理的分析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-07-07
  • java的各种类型转换全部汇总(推荐)

    java的各种类型转换全部汇总(推荐)

    下面小编就为大家带来一篇java的各种类型转换全部汇总(推荐)。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2016-05-05
  • 借助Maven搭建Hadoop开发环境的最详细教程分享

    借助Maven搭建Hadoop开发环境的最详细教程分享

    在Maven插件的帮助下,VSCode写Java其实非常方便,所以本文就来和大家详细讲讲如何借助maven用VScode搭建Hadoop开发环境,需要的可以参考下
    2023-05-05
  • 在Java的Struts框架下进行web编程的入门教程

    在Java的Struts框架下进行web编程的入门教程

    这篇文章主要介绍了在Java的Struts框架下进行web编程的入门教程,需要的朋友可以参考下
    2015-11-11
  • 快速解决 MyBatis-Plus 中 ID 自增问题(推荐)

    快速解决 MyBatis-Plus 中 ID 自增问题(推荐)

    本文介绍了MyBatis-Plus中自动生成ID过长导致的问题及解决方法,结合示例代码给大家介绍的非常详细,感兴趣的朋友一起看看吧
    2025-02-02
  • springboot整合kaptcha验证码的示例代码

    springboot整合kaptcha验证码的示例代码

    kaptcha是一个很有用的验证码生成工具,本篇文章主要介绍了springboot整合kaptcha验证码的示例代码,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-06-06
  • Java实现调用ElasticSearch API的示例详解

    Java实现调用ElasticSearch API的示例详解

    这篇文章主要为大家详细介绍了Java调用ElasticSearch API的效果资料,文中的示例代码讲解详细,具有一定的参考价值,感兴趣的可以了解一下
    2023-03-03
  • SpringBoot使用minio进行文件管理的流程步骤

    SpringBoot使用minio进行文件管理的流程步骤

    MinIO 是一个高性能的对象存储系统,兼容 Amazon S3 API,该软件设计用于处理非结构化数据,如图片、视频、日志文件以及备份数据等,本文给大家介绍了SpringBoot使用minio进行文件管理的流程步骤,需要的朋友可以参考下
    2025-01-01
  • IntelliJ IDEA安装目录和设置目录的说明(IntelliJ IDEA快速入门)

    IntelliJ IDEA安装目录和设置目录的说明(IntelliJ IDEA快速入门)

    这篇文章主要介绍了IntelliJ IDEA安装目录和设置目录的说明(IntelliJ IDEA快速入门),本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-04-04
  • SpringBoot实现健康检查的完整指南

    SpringBoot实现健康检查的完整指南

    本文介绍了Spring Boot实现健康检查的方法,包括使用Actuator进行应用健康检查,自定义健康检查项目,搭建可视化监控大屏和配置告警系统,此外,还提供了实战案例和避坑指南,帮助读者更好地理解和应用Spring Boot健康检查功能,需要的朋友可以参考下
    2025-12-12

最新评论