基于SpringBoot + Redis Pub/Sub实现跨实例SSE消息推送

 更新时间:2026年04月23日 09:22:55   作者:五阿哥永琪  
在构建Web应用时,消息推送是一个常见需求——比如站内信、订单状态更新、告警通知等,SSE相比WebSocket更轻量,适合单向推送场景,本文介绍一种基于Redis Pub/Sub的解决方案,让SSE连接能够跨实例互通,需要的朋友可以参考下

引言

在构建Web应用时,消息推送是一个常见需求——比如站内信、订单状态更新、告警通知等。SSE(Server-Sent Events)相比WebSocket更轻量,适合单向推送场景。但当服务部署多个实例时,问题就出现了:用户A连接的是实例1,用户B连接的是实例2,A发送的消息如何推送给B?本文介绍一种基于Redis Pub/Sub的解决方案,让SSE连接能够跨实例互通。

上图展示了完整的消息流转过程:

  1. 建立连接:用户A和B分别连接到不同的后端实例,每个实例维护着自己的SSE连接池。
  2. 发送消息:用户A发起私信请求,请求落在实例1上。
  3. Redis广播:实例1将消息发布到Redis的station:message频道,所有订阅了该频道的实例都会收到。
  4. 推送消息:实例2发现目标用户B在自己身上,通过SSE连接将消息推送给B。实例1收到广播后也会检查,发现目标用户不在自己身上,则直接忽略。

一、引入依赖

Spring Boot Web提供了SSE的支持(SseEmitter),而spring-boot-starter-data-redis则为我们带来了Redis连接和Pub/Sub能力。commons-pool2是连接池,生产环境必备,避免频繁创建连接带来的性能损耗。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
</dependency>

二、SSE连接管理器

SseEmitterManager是整个方案的核心。这里有几个设计点值得注意:

  • 支持多标签页:一个用户可能打开多个浏览器标签页,所以用Map<String, Map<String, SseEmitter>>来组织,外层key是userId,内层key是token(每个标签页唯一)。
  • 生命周期管理:通过onCompletion、onTimeout、onError回调来清理连接,防止内存泄漏。
  • 不超时设置:new SseEmitter(0L)表示永不超时,你也可以根据业务需要设置一个合理的超时时间(如30分钟)。
  • 全站广播:broadcast()方法会遍历所有在线用户并推送,适合系统公告类消息。
@Component
@Slf4j
public class SseEmitterManager {
    /**
     * 用户ID -> (连接token -> SseEmitter)
     * 一个用户可能有多个浏览器标签页,用token区分
     */
    private final Map<String, Map<String, SseEmitter>> userEmitters = new ConcurrentHashMap<>();
    /**
     * 建立SSE连接
     * @param userId 用户ID
     * @param token 连接标识(可用UUID)
     */
    public SseEmitter connect(String userId, String token) {
        // 超时时间设为0表示不超时,也可设置具体毫秒数
        SseEmitter emitter = new SseEmitter(0L);
        // 注册回调:连接关闭时清理
        emitter.onCompletion(() -> removeEmitter(userId, token));
        emitter.onTimeout(() -> removeEmitter(userId, token));
        emitter.onError(e -> removeEmitter(userId, token));
        // 存储连接
        userEmitters.computeIfAbsent(userId, k -> new ConcurrentHashMap<>())
                    .put(token, emitter);
        log.info("SSE connected: userId={}, token={}, total users={}", 
                 userId, token, userEmitters.size());
        return emitter;
    }
    /**
     * 向指定用户推送消息
     */
    public void sendToUser(String userId, String message) {
        Map<String, SseEmitter> emitters = userEmitters.get(userId);
        if (emitters == null || emitters.isEmpty()) {
            log.debug("User {} not online, message stored for later", userId);
            return;
        }
        // 向该用户所有连接推送
        emitters.forEach((token, emitter) -> {
            try {
                emitter.send(SseEmitter.event()
                    .name("message")
                    .data(message));
            } catch (IOException e) {
                log.error("Send to user {} failed, removing emitter", userId, e);
                removeEmitter(userId, token);
            }
        });
    }
    /**
     * 全站广播
     */
    public void broadcast(String message) {
        userEmitters.forEach((userId, emitters) -> {
            sendToUser(userId, message);
        });
        log.info("Broadcast message to {} users", userEmitters.size());
    }
    /**
     * 获取当前在线人数
     */
    public int getOnlineCount() {
        return userEmitters.size();
    }
    private void removeEmitter(String userId, String token) {
        Map<String, SseEmitter> emitters = userEmitters.get(userId);
        if (emitters != null) {
            emitters.remove(token);
            if (emitters.isEmpty()) {
                userEmitters.remove(userId);
            }
        }
    }
}

三、Redis消息订阅者

RedisMessageSubscriber实现了MessageListener接口,它会监听station:message频道。当收到Redis消息时:

  • 将JSON反序列化为StationMessage对象。
  • 根据type字段判断是私信还是广播。
  • 调用SseEmitterManager的方法推送给目标用户。

这里有个细节:每个后端实例都会收到自己发布的消息,所以推送前需要判断目标用户是否在当前实例上——这个判断逻辑其实隐含在sendToUser中:如果目标用户不在本实例的连接池里,就直接返回,不会报错。

@Component
@Slf4j
public class RedisMessageSubscriber implements MessageListener {
    
    @Autowired
    private SseEmitterManager sseEmitterManager;
    
    @Autowired
    private ObjectMapper objectMapper;
    
    @Override
    public void onMessage(Message message, byte[] pattern) {
        try {
            String channel = new String(message.getChannel());
            String body = new String(message.getBody());
            
            // 解析消息
            StationMessage msg = objectMapper.readValue(body, StationMessage.class);
            
            log.info("Received Redis message: channel={}, type={}, target={}", 
                     channel, msg.getType(), msg.getTargetUserId());
            
            // 根据消息类型分发
            if ("user".equals(msg.getType())) {
                // 私信:发给指定用户
                sseEmitterManager.sendToUser(msg.getTargetUserId(), msg.getContent());
            } else if ("broadcast".equals(msg.getType())) {
                // 广播:发给所有在线用户
                sseEmitterManager.broadcast(msg.getContent());
            }
            
        } catch (Exception e) {
            log.error("Failed to process Redis message", e);
        }
    }
}

四、Redis配置

RedisMessageListenerContainer是Spring Data Redis提供的消息容器,它会自动管理订阅和监听线程。注意这里订阅的是station:message频道,你可以根据业务需要定义多个频道(比如station:notice、station:system)。

RedisTemplate的序列化配置也很重要:key使用StringRedisSerializer保证可读性,value使用Jackson2JsonRedisSerializer来支持对象存储。

@Configuration
public class RedisConfig {
    
    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(
            RedisConnectionFactory connectionFactory,
            RedisMessageSubscriber subscriber) {
        
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        // 订阅站内信频道
        container.addMessageListener(subscriber, new ChannelTopic("station:message"));
        return container;
    }
    
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(factory);
        template.setKeySerializer(new StringRedisSerializer());
        template.setValueSerializer(new Jackson2JsonRedisSerializer<>(Object.class));
        return template;
    }
}

五、消息实体

StationMessage是消息的载体,在Redis中传输的JSON格式就对应这个结构。字段设计上:

  • id:消息唯一标识,可用于去重和消息历史记录。
  • type:区分私信和广播,方便订阅者做路由。
  • targetUserId:私信时的接收者,广播时可为空。
  • timestamp:时间戳,客户端可以用来做消息排序或展示发送时间。
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class StationMessage {
    private String id;           // 消息ID
    private String type;         // user:私信, broadcast:广播
    private String targetUserId; // 私信时的目标用户ID
    private String content;      // 消息内容
    private String senderId;     // 发送者ID
    private Long timestamp;      // 时间戳
}

六、消息发送服务

MessageService封装了发送逻辑。核心动作很简单:构造StationMessage -> 序列化为JSON -> redisTemplate.convertAndSend()。发布之后,所有实例的订阅者都会收到消息,相当于Redis帮我们做了一个“广播式”的跨实例通信。

这种设计的优点是:发送方不需要知道消息最终由哪个实例处理,也不需要维护实例之间的网络连接,所有协调工作都交给了Redis。

@Service
@Slf4j
public class MessageService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @Autowired
    private SseEmitterManager sseEmitterManager;
    
    @Autowired
    private ObjectMapper objectMapper;
    
    /**
     * 发送私信
     */
    public void sendPrivateMessage(String fromUserId, String toUserId, String content) {
        StationMessage msg = StationMessage.builder()
            .id(UUID.randomUUID().toString())
            .type("user")
            .targetUserId(toUserId)
            .senderId(fromUserId)
            .content(content)
            .timestamp(System.currentTimeMillis())
            .build();
        
        try {
            String json = objectMapper.writeValueAsString(msg);
            // 发布到Redis,所有实例都会收到
            redisTemplate.convertAndSend("station:message", json);
            log.info("Private message sent: {} -> {}", fromUserId, toUserId);
        } catch (JsonProcessingException e) {
            log.error("Failed to serialize message", e);
        }
    }
    
    /**
     * 全站广播
     */
    public void broadcast(String fromUserId, String content) {
        StationMessage msg = StationMessage.builder()
            .id(UUID.randomUUID().toString())
            .type("broadcast")
            .senderId(fromUserId)
            .content(content)
            .timestamp(System.currentTimeMillis())
            .build();
        
        try {
            String json = objectMapper.writeValueAsString(msg);
            redisTemplate.convertAndSend("station:message", json);
            log.info("Broadcast message sent by: {}", fromUserId);
        } catch (JsonProcessingException e) {
            log.error("Failed to serialize broadcast", e);
        }
    }
}

七、Controller层

Controller对外暴露了三个核心接口:

  • GET /api/sse/connect:客户端通过EventSource或fetch API调用这个接口建立SSE连接。每个连接会生成一个唯一token,用于后续的清理。
  • POST /api/sse/private:发送私信,需要提供发送者、接收者和消息内容。
  • POST /api/sse/broadcast:发送广播消息。
  • GET /api/sse/online-count:查询当前在线人数,可用于展示“在线状态”或做监控。

生产环境建议给这些接口加上认证鉴权(比如从token中解析userId),避免伪造身份。

@RestController
@RequestMapping("/api/sse")
@Slf4j
public class SseController {
    
    @Autowired
    private SseEmitterManager sseEmitterManager;
    
    @Autowired
    private MessageService messageService;
    
    /**
     * SSE连接端点
     */
    @GetMapping(value = "/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter connect(@RequestParam String userId) {
        String token = UUID.randomUUID().toString();
        return sseEmitterManager.connect(userId, token);
    }
    
    /**
     * 发送私信
     */
    @PostMapping("/private")
    public ResponseEntity<?> sendPrivate(@RequestBody PrivateMessageRequest request) {
        messageService.sendPrivateMessage(
            request.getFromUserId(),
            request.getToUserId(),
            request.getContent()
        );
        return ResponseEntity.ok().build();
    }
    
    /**
     * 全站广播
     */
    @PostMapping("/broadcast")
    public ResponseEntity<?> broadcast(@RequestBody BroadcastRequest request) {
        messageService.broadcast(request.getFromUserId(), request.getContent());
        return ResponseEntity.ok().build();
    }
    
    /**
     * 获取在线人数
     */
    @GetMapping("/online-count")
    public ResponseEntity<Integer> getOnlineCount() {
        return ResponseEntity.ok(sseEmitterManager.getOnlineCount());
    }
}

以上就是基于SpringBoot + Redis Pub/Sub实现跨实例SSE消息推送的详细内容,更多关于SpringBoot跨实例SSE消息推送的资料请关注脚本之家其它相关文章!

相关文章

  • Java使用fastjson对String、JSONObject、JSONArray相互转换

    Java使用fastjson对String、JSONObject、JSONArray相互转换

    这篇文章主要介绍了Java使用fastjson对String、JSONObject、JSONArray相互转换,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-11-11
  • 浅谈JVM中的JOL

    浅谈JVM中的JOL

    我们天天都在使用java来new对象,但估计很少有人知道new出来的对象到底长的什么样子?对于普通的java程序员来说,可能从来没有考虑过java中对象的问题,不懂这些也可以写好代码。今天,给大家介绍一款工具JOL,可以满足大家对java对象的所有想象。
    2021-06-06
  • Java String类和StringBuffer类的区别介绍

    Java String类和StringBuffer类的区别介绍

    这篇文章主要介绍了Java String类和StringBuffer类的区别, 关于java的字符串处理我们一般使用String类和StringBuffer类有什么不同呢,下面我们一起来看看详细介绍吧
    2022-03-03
  • Spring Cloud 专题之Sleuth 服务跟踪实现方法

    Spring Cloud 专题之Sleuth 服务跟踪实现方法

    这篇文章主要介绍了Spring Cloud 专题之Sleuth 服务跟踪,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-08-08
  • Java中的Kotlin 内部类原理

    Java中的Kotlin 内部类原理

    这篇文章主要介绍了Java中的Kotlin 内部类原理,文章围绕主题展开详细的内容介绍,具有一定的参考价值,感兴趣的小伙伴可以参考一下
    2022-06-06
  • JAVA--HashMap热门面试题

    JAVA--HashMap热门面试题

    这篇文章主要介绍了JAVA关于HashMap容易被提问的面试题,文中题目提问频率高,相信对你的面试有一定帮助,想要入职JAVA的朋友可以了解下
    2020-06-06
  • JDK动态代理接口和接口实现类深入详解

    JDK动态代理接口和接口实现类深入详解

    这篇文章主要介绍了JDK动态代理接口和接口实现类,JDK动态代理是代理模式的一种实现方式,因为它是基于接口来做代理的,所以也常被称为接口代理,文中通过实例代码介绍的非常详细,需要的朋友可以参考下
    2022-06-06
  • 浅谈@PostConstruct不被调用的原因

    浅谈@PostConstruct不被调用的原因

    这篇文章主要介绍了浅谈@PostConstruct不被调用的原因及分析,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-02-02
  • 关于Java 项目封装sqlite连接池操作持久化数据的方法

    关于Java 项目封装sqlite连接池操作持久化数据的方法

    这篇文章主要介绍了Java 项目封装sqlite连接池操作持久化数据的方法,文中给大家介绍了sqlite的体系结构及封装java的sqlite连接池的详细过程,需要的朋友可以参考下
    2021-11-11
  • Java BufferWriter写文件写不进去或缺失数据的解决

    Java BufferWriter写文件写不进去或缺失数据的解决

    这篇文章主要介绍了Java BufferWriter写文件写不进去或缺失数据的解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-07-07

最新评论