SpringBoot实现SSE(Server-Sent Events)的完整指南

 更新时间:2025年09月24日 09:40:28   作者:堕落年代  
本文展示了如何在Spring Boot应用中实现SSE,通过简单的步骤和代码示例,你可以轻松地在你的Web应用中添加实时数据推送功能,需要的朋友可以参考下

引言

在 Spring Boot 中实现 SSE (Server-Sent Events) 非常简单,SSE 是一种服务器向客户端推送事件的技术。以下是完整的实现步骤:

基础实现

1. 添加依赖

确保 pom.xml 中包含 Spring Web 依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

2. 创建 SSE 控制器

import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@RestController
public class SseController {

    // 用于异步发送事件的线程池
    private final ExecutorService executor = Executors.newCachedThreadPool();

    @GetMapping(path = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter handleSse() {
        SseEmitter emitter = new SseEmitter(60_000L); // 设置超时时间(毫秒)
        
        // 在单独的线程中发送事件
        executor.execute(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    // 构建事件
                    SseEmitter.SseEventBuilder event = SseEmitter.event()
                            .id(String.valueOf(i)) // 事件ID
                            .name("message")      // 事件名称
                            .data("Event #" + i);  // 事件数据
                    
                    // 发送事件
                    emitter.send(event);
                    
                    // 模拟延迟
                    Thread.sleep(1000);
                }
                
                // 完成发送
                emitter.complete();
            } catch (IOException | InterruptedException e) {
                // 发生错误时关闭连接
                emitter.completeWithError(e);
            }
        });
        
        // 处理完成和超时事件
        emitter.onCompletion(() -> System.out.println("SSE completed"));
        emitter.onTimeout(() -> {
            System.out.println("SSE timeout");
            emitter.complete();
        });
        
        return emitter;
    }
}

3. 前端监听 SSE

<!DOCTYPE html>
<html>
<head>
    <title>SSE Demo</title>
</head>
<body>
    <div id="events"></div>

    <script>
        const eventSource = new EventSource('/sse');
        
        // 监听消息事件
        eventSource.onmessage = function(event) {
            const data = event.data;
            const element = document.createElement('p');
            element.textContent = 'Received: ' + data;
            document.getElementById('events').appendChild(element);
        };
        
        // 监听自定义事件
        eventSource.addEventListener('message', function(event) {
            console.log('Custom event:', event.data);
        });
        
        // 错误处理
        eventSource.onerror = function(error) {
            console.error('EventSource error:', error);
            eventSource.close();
        };
    </script>
</body>
</html>

添加鉴权支持

1. 基于 Token 的鉴权

@GetMapping(path = "/secure-sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter handleSecureSse(@RequestHeader("Authorization") String token) {
    // 验证Token
    if (!isValidToken(token)) {
        throw new ResponseStatusException(HttpStatus.UNAUTHORIZED, "Invalid token");
    }
    
    SseEmitter emitter = new SseEmitter();
    
    // 获取用户信息
    User user = getUserFromToken(token);
    
    executor.execute(() -> {
        try {
            // 发送个性化事件
            emitter.send(SseEmitter.event()
                .data("Welcome, " + user.getName())
                .name("greeting"));
                
            // 继续发送其他事件...
        } catch (IOException e) {
            emitter.completeWithError(e);
        }
    });
    
    return emitter;
}

private boolean isValidToken(String token) {
    // 实现Token验证逻辑
    return token != null && token.startsWith("Bearer ");
}

private User getUserFromToken(String token) {
    // 从Token中提取用户信息
    return new User("John Doe"); // 示例
}

2. 前端发送鉴权信息

const token = "Bearer your_jwt_token_here";
const eventSource = new EventSource('/secure-sse', {
    headers: {
        Authorization: token
    }
});

高级功能实现

1. 广播事件给多个客户端

import org.springframework.stereotype.Service;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

@Service
public class SseService {
    private final List<SseEmitter> emitters = new CopyOnWriteArrayList<>();
    
    public SseEmitter subscribe() {
        SseEmitter emitter = new SseEmitter(60_000L);
        emitters.add(emitter);
        
        emitter.onCompletion(() -> emitters.remove(emitter));
        emitter.onTimeout(() -> emitters.remove(emitter));
        
        return emitter;
    }
    
    public void broadcast(String eventName, Object data) {
        for (SseEmitter emitter : emitters) {
            try {
                emitter.send(SseEmitter.event()
                    .name(eventName)
                    .data(data));
            } catch (IOException e) {
                emitter.completeWithError(e);
            }
        }
    }
}

2. 在控制器中使用广播服务

@RestController
public class SseController {
    
    private final SseService sseService;
    
    public SseController(SseService sseService) {
        this.sseService = sseService;
    }
    
    @GetMapping("/subscribe")
    public SseEmitter subscribe() {
        return sseService.subscribe();
    }
    
    @PostMapping("/broadcast")
    public ResponseEntity<String> broadcastMessage(@RequestBody String message) {
        sseService.broadcast("message", message);
        return ResponseEntity.ok("Message broadcasted");
    }
}

3. 发送 JSON 数据

emitter.send(SseEmitter.event()
    .name("userUpdate")
    .data(new User("Alice", "alice@example.com"), MediaType.APPLICATION_JSON));

4. 重连机制

let eventSource;

function connectSSE() {
    eventSource = new EventSource('/sse');
    
    eventSource.onmessage = event => {
        console.log('Received:', event.data);
    };
    
    eventSource.onerror = () => {
        console.log('Connection lost. Reconnecting...');
        eventSource.close();
        setTimeout(connectSSE, 3000); // 3秒后重连
    };
}

connectSSE(); // 初始连接

生产环境最佳实践

1. 配置超时和心跳

@Bean
public SseEmitter createSseEmitter() {
    SseEmitter emitter = new SseEmitter(120_000L); // 2分钟超时
    
    // 心跳机制
    ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
    scheduler.scheduleAtFixedRate(() -> {
        try {
            emitter.send(SseEmitter.event()
                .name("heartbeat")
                .data("ping"));
        } catch (IOException e) {
            scheduler.shutdown();
        }
    }, 0, 30, TimeUnit.SECONDS); // 每30秒发送心跳
    
    return emitter;
}

2. 异常处理

@RestControllerAdvice
public class SseExceptionHandler {

    @ExceptionHandler(SseException.class)
    public ResponseEntity<String> handleSseException(SseException ex) {
        return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
                .body("SSE Error: " + ex.getMessage());
    }
}

3. CORS 配置

@Configuration
public class WebConfig implements WebMvcConfigurer {

    @Override
    public void addCorsMappings(CorsRegistry registry) {
        registry.addMapping("/sse/**")
            .allowedOrigins("https://your-frontend.com")
            .allowedMethods("GET")
            .allowCredentials(true);
    }
}

4. 性能优化

@Configuration
public class AsyncConfig implements AsyncConfigurer {

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(50);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("SSE-Executor-");
        executor.initialize();
        return executor;
    }
}

完整示例:实时股票报价

后端控制器

@RestController
public class StockController {

    private final SseService sseService;
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

    public StockController(SseService sseService) {
        this.sseService = sseService;
        startStockUpdates();
    }

    @GetMapping("/stocks")
    public SseEmitter getStockUpdates() {
        return sseService.subscribe();
    }

    private void startStockUpdates() {
        scheduler.scheduleAtFixedRate(() -> {
            Map<String, Double> stocks = Map.of(
                "AAPL", 150 + Math.random() * 10,
                "MSFT", 250 + Math.random() * 15,
                "GOOGL", 2800 + Math.random() * 50
            );
            sseService.broadcast("stockUpdate", stocks);
        }, 0, 2, TimeUnit.SECONDS);
    }
}

前端实现

<div id="stock-prices"></div>

<script>
const eventSource = new EventSource('/stocks');

eventSource.addEventListener('stockUpdate', event => {
    const stocks = JSON.parse(event.data);
    let html = '<h2>Stock Prices</h2><ul>';
    
    for (const [symbol, price] of Object.entries(stocks)) {
        html += `<li>${symbol}: $${price.toFixed(2)}</li>`;
    }
    
    html += '</ul>';
    document.getElementById('stock-prices').innerHTML = html;
});
</script>

部署注意事项

1. 负载均衡配置

# Nginx 配置
location /sse {
    proxy_pass http://backend;
    proxy_http_version 1.1;
    proxy_set_header Connection '';
    proxy_buffering off;
}

2. Spring Boot 配置

# application.properties
server.servlet.context-path=/api
spring.mvc.async.request-timeout=120000 # 2分钟超时

3. 监控端点

@Endpoint(id = "sse")
public class SseEndpoint {

    private final SseService sseService;

    public SseEndpoint(SseService sseService) {
        this.sseService = sseService;
    }

    @ReadOperation
    public Map<String, Object> sseMetrics() {
        return Map.of(
            "activeConnections", sseService.getActiveConnections(),
            "lastBroadcast", sseService.getLastBroadcastTime()
        );
    }
}

最佳实践总结

  1. 使用专用服务类:封装 SSE 逻辑,提高代码复用性
  2. 实现心跳机制:防止连接超时断开
  3. 添加鉴权支持:保护敏感数据
  4. 优雅处理错误:实现异常处理和重连机制
  5. 监控连接状态:使用 Actuator 端点监控 SSE 连接
  6. 优化线程池:合理配置异步处理线程
  7. 前端重连逻辑:自动恢复断开连接

通过以上实现,您可以在 Spring Boot 应用中轻松创建 SSE 端点,实现服务器向客户端的实时事件推送,同时满足鉴权需求。

以上就是SpringBoot实现SSE(Server-Sent Events)完整指南的详细内容,更多关于SpringBoot实现SSE(Server-Sent Events)的资料请关注脚本之家其它相关文章!

相关文章

  • Spring MVC数据绑定概述及原理详解

    Spring MVC数据绑定概述及原理详解

    这篇文章主要介绍了Spring MVC数据绑定概述及原理详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-06-06
  • Java自动化工具Ant的基础使用教程

    Java自动化工具Ant的基础使用教程

    这篇文章主要介绍了Java自动化工具Ant的基础使用教程,例子在Windows系统下操作演示,讲解了Ant基本的文件操作和属性,需要的朋友可以参考下
    2016-02-02
  • java判断域名无法访问自行访问下一条

    java判断域名无法访问自行访问下一条

    这篇文章主要为大家介绍了java实现判断域名无法访问的时候自行访问下一条域名示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-12-12
  • 解决在Gradle/IDEA中无法正常使用readLine的问题原因

    解决在Gradle/IDEA中无法正常使用readLine的问题原因

    这篇文章主要介绍了在Gradle/IDEA中无法正常使用readLine的解决方法,原因是由于Gradle的标准输入默认并不与系统标准输入绑定,需手动设置,需要的朋友可以参考下
    2021-12-12
  • java操作hdfs的方法示例代码

    java操作hdfs的方法示例代码

    这篇文章主要介绍了java操作hdfs的相关资料,在本地配置Hadoop和Maven的环境变量,首先需从官网下载与服务器相同版本的Hadoop安装包,配置环境变量后,引入Maven的配置文件,以便管理项目依赖,最后,编写代码实现对HDFS的连接和操作,完成数据的读写,需要的朋友可以参考下
    2022-02-02
  • Java 批量生成条码的示例代码

    Java 批量生成条码的示例代码

    这篇文章主要介绍了Java 批量生成条码的示例代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-08-08
  • SpringBoot集成SOL链的详细过程

    SpringBoot集成SOL链的详细过程

    Solanaj 是一个用于与 Solana 区块链交互的 Java 库,它为 Java 开发者提供了一套功能丰富的 API,使得在 Java 环境中可以轻松构建与 Solana 区块链交互的应用程序,这篇文章主要介绍了SpringBoot集成SOL链的详细过程,需要的朋友可以参考下
    2025-01-01
  • springboot实现执行sql语句打印到控制台

    springboot实现执行sql语句打印到控制台

    这篇文章主要介绍了springboot实现执行sql语句打印到控制台的操作,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-06-06
  • SpringBoot监听事件和处理事件程序示例详解

    SpringBoot监听事件和处理事件程序示例详解

    这篇文章主要介绍了SpringBoot监听事件和处理事件程序示例,监听和处理事件是一种常用的模式,用于在应用程序的不同部分之间传递信息,Spring 的事件发布/订阅模型允许我们创建自定义事件,并在这些事件发生时由注册的监听器进行处理,需要的朋友可以参考下
    2022-06-06
  • JAVA实现网络/本地图片转BASE64存储代码示例

    JAVA实现网络/本地图片转BASE64存储代码示例

    这篇文章主要给大家介绍了关于JAVA实现网络/本地图片转BASE64存储的相关资料,Base64是网络上最常见的用于传输8Bit字节码的编码方式之一,Base64就是一种基于64个可打印字符来表示二进制数据的方法,需要的朋友可以参考下
    2023-07-07

最新评论