基于SpringBoot + Redis Pub/Sub实现跨实例SSE消息推送

 更新时间:2026年04月23日 09:22:55   作者:五阿哥永琪  
在构建Web应用时,消息推送是一个常见需求——比如站内信、订单状态更新、告警通知等,SSE相比WebSocket更轻量,适合单向推送场景,本文介绍一种基于Redis Pub/Sub的解决方案,让SSE连接能够跨实例互通,需要的朋友可以参考下

引言

在构建Web应用时,消息推送是一个常见需求——比如站内信、订单状态更新、告警通知等。SSE(Server-Sent Events)相比WebSocket更轻量,适合单向推送场景。但当服务部署多个实例时,问题就出现了:用户A连接的是实例1,用户B连接的是实例2,A发送的消息如何推送给B?本文介绍一种基于Redis Pub/Sub的解决方案,让SSE连接能够跨实例互通。

上图展示了完整的消息流转过程:

  1. 建立连接:用户A和B分别连接到不同的后端实例,每个实例维护着自己的SSE连接池。
  2. 发送消息:用户A发起私信请求,请求落在实例1上。
  3. Redis广播:实例1将消息发布到Redis的station:message频道,所有订阅了该频道的实例都会收到。
  4. 推送消息:实例2发现目标用户B在自己身上,通过SSE连接将消息推送给B。实例1收到广播后也会检查,发现目标用户不在自己身上,则直接忽略。

一、引入依赖

Spring Boot Web提供了SSE的支持(SseEmitter),而spring-boot-starter-data-redis则为我们带来了Redis连接和Pub/Sub能力。commons-pool2是连接池,生产环境必备,避免频繁创建连接带来的性能损耗。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
</dependency>

二、SSE连接管理器

SseEmitterManager是整个方案的核心。这里有几个设计点值得注意:

  • 支持多标签页:一个用户可能打开多个浏览器标签页,所以用Map<String, Map<String, SseEmitter>>来组织,外层key是userId,内层key是token(每个标签页唯一)。
  • 生命周期管理:通过onCompletion、onTimeout、onError回调来清理连接,防止内存泄漏。
  • 不超时设置:new SseEmitter(0L)表示永不超时,你也可以根据业务需要设置一个合理的超时时间(如30分钟)。
  • 全站广播:broadcast()方法会遍历所有在线用户并推送,适合系统公告类消息。
@Component
@Slf4j
public class SseEmitterManager {
    /**
     * 用户ID -> (连接token -> SseEmitter)
     * 一个用户可能有多个浏览器标签页,用token区分
     */
    private final Map<String, Map<String, SseEmitter>> userEmitters = new ConcurrentHashMap<>();
    /**
     * 建立SSE连接
     * @param userId 用户ID
     * @param token 连接标识(可用UUID)
     */
    public SseEmitter connect(String userId, String token) {
        // 超时时间设为0表示不超时,也可设置具体毫秒数
        SseEmitter emitter = new SseEmitter(0L);
        // 注册回调:连接关闭时清理
        emitter.onCompletion(() -> removeEmitter(userId, token));
        emitter.onTimeout(() -> removeEmitter(userId, token));
        emitter.onError(e -> removeEmitter(userId, token));
        // 存储连接
        userEmitters.computeIfAbsent(userId, k -> new ConcurrentHashMap<>())
                    .put(token, emitter);
        log.info("SSE connected: userId={}, token={}, total users={}", 
                 userId, token, userEmitters.size());
        return emitter;
    }
    /**
     * 向指定用户推送消息
     */
    public void sendToUser(String userId, String message) {
        Map<String, SseEmitter> emitters = userEmitters.get(userId);
        if (emitters == null || emitters.isEmpty()) {
            log.debug("User {} not online, message stored for later", userId);
            return;
        }
        // 向该用户所有连接推送
        emitters.forEach((token, emitter) -> {
            try {
                emitter.send(SseEmitter.event()
                    .name("message")
                    .data(message));
            } catch (IOException e) {
                log.error("Send to user {} failed, removing emitter", userId, e);
                removeEmitter(userId, token);
            }
        });
    }
    /**
     * 全站广播
     */
    public void broadcast(String message) {
        userEmitters.forEach((userId, emitters) -> {
            sendToUser(userId, message);
        });
        log.info("Broadcast message to {} users", userEmitters.size());
    }
    /**
     * 获取当前在线人数
     */
    public int getOnlineCount() {
        return userEmitters.size();
    }
    private void removeEmitter(String userId, String token) {
        Map<String, SseEmitter> emitters = userEmitters.get(userId);
        if (emitters != null) {
            emitters.remove(token);
            if (emitters.isEmpty()) {
                userEmitters.remove(userId);
            }
        }
    }
}

三、Redis消息订阅者

RedisMessageSubscriber实现了MessageListener接口,它会监听station:message频道。当收到Redis消息时:

  • 将JSON反序列化为StationMessage对象。
  • 根据type字段判断是私信还是广播。
  • 调用SseEmitterManager的方法推送给目标用户。

这里有个细节:每个后端实例都会收到自己发布的消息,所以推送前需要判断目标用户是否在当前实例上——这个判断逻辑其实隐含在sendToUser中:如果目标用户不在本实例的连接池里,就直接返回,不会报错。

@Component
@Slf4j
public class RedisMessageSubscriber implements MessageListener {
    
    @Autowired
    private SseEmitterManager sseEmitterManager;
    
    @Autowired
    private ObjectMapper objectMapper;
    
    @Override
    public void onMessage(Message message, byte[] pattern) {
        try {
            String channel = new String(message.getChannel());
            String body = new String(message.getBody());
            
            // 解析消息
            StationMessage msg = objectMapper.readValue(body, StationMessage.class);
            
            log.info("Received Redis message: channel={}, type={}, target={}", 
                     channel, msg.getType(), msg.getTargetUserId());
            
            // 根据消息类型分发
            if ("user".equals(msg.getType())) {
                // 私信:发给指定用户
                sseEmitterManager.sendToUser(msg.getTargetUserId(), msg.getContent());
            } else if ("broadcast".equals(msg.getType())) {
                // 广播:发给所有在线用户
                sseEmitterManager.broadcast(msg.getContent());
            }
            
        } catch (Exception e) {
            log.error("Failed to process Redis message", e);
        }
    }
}

四、Redis配置

RedisMessageListenerContainer是Spring Data Redis提供的消息容器,它会自动管理订阅和监听线程。注意这里订阅的是station:message频道,你可以根据业务需要定义多个频道(比如station:notice、station:system)。

RedisTemplate的序列化配置也很重要:key使用StringRedisSerializer保证可读性,value使用Jackson2JsonRedisSerializer来支持对象存储。

@Configuration
public class RedisConfig {
    
    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(
            RedisConnectionFactory connectionFactory,
            RedisMessageSubscriber subscriber) {
        
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        // 订阅站内信频道
        container.addMessageListener(subscriber, new ChannelTopic("station:message"));
        return container;
    }
    
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(factory);
        template.setKeySerializer(new StringRedisSerializer());
        template.setValueSerializer(new Jackson2JsonRedisSerializer<>(Object.class));
        return template;
    }
}

五、消息实体

StationMessage是消息的载体,在Redis中传输的JSON格式就对应这个结构。字段设计上:

  • id:消息唯一标识,可用于去重和消息历史记录。
  • type:区分私信和广播,方便订阅者做路由。
  • targetUserId:私信时的接收者,广播时可为空。
  • timestamp:时间戳,客户端可以用来做消息排序或展示发送时间。
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class StationMessage {
    private String id;           // 消息ID
    private String type;         // user:私信, broadcast:广播
    private String targetUserId; // 私信时的目标用户ID
    private String content;      // 消息内容
    private String senderId;     // 发送者ID
    private Long timestamp;      // 时间戳
}

六、消息发送服务

MessageService封装了发送逻辑。核心动作很简单:构造StationMessage -> 序列化为JSON -> redisTemplate.convertAndSend()。发布之后,所有实例的订阅者都会收到消息,相当于Redis帮我们做了一个“广播式”的跨实例通信。

这种设计的优点是:发送方不需要知道消息最终由哪个实例处理,也不需要维护实例之间的网络连接,所有协调工作都交给了Redis。

@Service
@Slf4j
public class MessageService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @Autowired
    private SseEmitterManager sseEmitterManager;
    
    @Autowired
    private ObjectMapper objectMapper;
    
    /**
     * 发送私信
     */
    public void sendPrivateMessage(String fromUserId, String toUserId, String content) {
        StationMessage msg = StationMessage.builder()
            .id(UUID.randomUUID().toString())
            .type("user")
            .targetUserId(toUserId)
            .senderId(fromUserId)
            .content(content)
            .timestamp(System.currentTimeMillis())
            .build();
        
        try {
            String json = objectMapper.writeValueAsString(msg);
            // 发布到Redis,所有实例都会收到
            redisTemplate.convertAndSend("station:message", json);
            log.info("Private message sent: {} -> {}", fromUserId, toUserId);
        } catch (JsonProcessingException e) {
            log.error("Failed to serialize message", e);
        }
    }
    
    /**
     * 全站广播
     */
    public void broadcast(String fromUserId, String content) {
        StationMessage msg = StationMessage.builder()
            .id(UUID.randomUUID().toString())
            .type("broadcast")
            .senderId(fromUserId)
            .content(content)
            .timestamp(System.currentTimeMillis())
            .build();
        
        try {
            String json = objectMapper.writeValueAsString(msg);
            redisTemplate.convertAndSend("station:message", json);
            log.info("Broadcast message sent by: {}", fromUserId);
        } catch (JsonProcessingException e) {
            log.error("Failed to serialize broadcast", e);
        }
    }
}

七、Controller层

Controller对外暴露了三个核心接口:

  • GET /api/sse/connect:客户端通过EventSource或fetch API调用这个接口建立SSE连接。每个连接会生成一个唯一token,用于后续的清理。
  • POST /api/sse/private:发送私信,需要提供发送者、接收者和消息内容。
  • POST /api/sse/broadcast:发送广播消息。
  • GET /api/sse/online-count:查询当前在线人数,可用于展示“在线状态”或做监控。

生产环境建议给这些接口加上认证鉴权(比如从token中解析userId),避免伪造身份。

@RestController
@RequestMapping("/api/sse")
@Slf4j
public class SseController {
    
    @Autowired
    private SseEmitterManager sseEmitterManager;
    
    @Autowired
    private MessageService messageService;
    
    /**
     * SSE连接端点
     */
    @GetMapping(value = "/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter connect(@RequestParam String userId) {
        String token = UUID.randomUUID().toString();
        return sseEmitterManager.connect(userId, token);
    }
    
    /**
     * 发送私信
     */
    @PostMapping("/private")
    public ResponseEntity<?> sendPrivate(@RequestBody PrivateMessageRequest request) {
        messageService.sendPrivateMessage(
            request.getFromUserId(),
            request.getToUserId(),
            request.getContent()
        );
        return ResponseEntity.ok().build();
    }
    
    /**
     * 全站广播
     */
    @PostMapping("/broadcast")
    public ResponseEntity<?> broadcast(@RequestBody BroadcastRequest request) {
        messageService.broadcast(request.getFromUserId(), request.getContent());
        return ResponseEntity.ok().build();
    }
    
    /**
     * 获取在线人数
     */
    @GetMapping("/online-count")
    public ResponseEntity<Integer> getOnlineCount() {
        return ResponseEntity.ok(sseEmitterManager.getOnlineCount());
    }
}

以上就是基于SpringBoot + Redis Pub/Sub实现跨实例SSE消息推送的详细内容,更多关于SpringBoot跨实例SSE消息推送的资料请关注脚本之家其它相关文章!

相关文章

  • java 保留两位小数的几种方法

    java 保留两位小数的几种方法

    这篇文章主要介绍了JAVA中小数点后保留两位的几种方法,并有小实例,希望能帮助有所需要的同学
    2016-07-07
  • SpringBoot如何配置Controller实现Web请求处理

    SpringBoot如何配置Controller实现Web请求处理

    这篇文章主要介绍了SpringBoot如何配置Controller实现Web请求处理,文中通过图解示例介绍的很详细,具有有一定的参考价值,需要的小伙伴可以参考一下
    2023-05-05
  • java中的FileReader和FileWriter读写流

    java中的FileReader和FileWriter读写流

    这篇文章主要介绍了java中的FileReader和FileWriter读写流,在java中对数据输入输出的操作陈作为流我们对不同的文件进行操作,或者对操作文件进行输入和输出时所用的流都是不同的,因此在java.io的包下存在很多流的类或者接口提供给我们对应的操作,需要的朋友可以参考下
    2023-10-10
  • jcl与jul log4j1 log4j2 logback日志系统机制及集成原理

    jcl与jul log4j1 log4j2 logback日志系统机制及集成原理

    这篇文章主要介绍了jcl与jul log4j1 log4j2 logback的集成原理,Apache Commons-logging 通用日志框架与日志系统的机制,有需要的朋友可以借鉴参考下
    2022-03-03
  • 使用Java编写一个图片word互转工具

    使用Java编写一个图片word互转工具

    这篇文章主要介绍了使用Java编写一个PDF Word文件转换工具的相关资料,需要的朋友可以参考下
    2023-01-01
  • Java的堵塞队列BlockingQueue详解

    Java的堵塞队列BlockingQueue详解

    这篇文章主要介绍了Java的堵塞队列BlockingQueue详解,阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程,需要的朋友可以参考下
    2023-12-12
  • springboot集成druid,多数据源可视化,p6spy问题

    springboot集成druid,多数据源可视化,p6spy问题

    这篇文章主要介绍了springboot集成druid,多数据源可视化,p6spy问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-01-01
  • springboot中@RestController注解实现

    springboot中@RestController注解实现

    在JavaWeb开发中,Spring框架及其组件SpringMVC因高效和强大功能而广受欢迎,@RestController注解是SpringMVC中的重要组成部分,下面就来介绍一下,感兴趣的可以了解一下
    2024-09-09
  • Java实现赫夫曼树(哈夫曼树)的创建

    Java实现赫夫曼树(哈夫曼树)的创建

    给定N个权值作为N个叶子结点,构造一棵二叉树,若该树的带权路径长度(WPL)达到最小,称这样的二叉树为最优二叉树,也称为哈夫曼树(Huffman Tree)。这篇文章主要就是为大家介绍如何通过Java实现赫夫曼树,需要的朋友可以参考一下
    2021-12-12
  • Spring中BeanFactory与FactoryBean接口的区别详解

    Spring中BeanFactory与FactoryBean接口的区别详解

    这篇文章主要给大家介绍了关于Spring中BeanFactory与FactoryBean接口的区别的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者使用Spring具有一定的参考学习价值,需要的朋友们下面来一起学习学习吧
    2019-03-03

最新评论