Spring Boot 2.7 + JDK 8 实现 WebSocket 集群分布式部署方案(基于 Redis Pub/Sub 方案)

 更新时间:2026年03月30日 09:13:18   作者:weixin_42502300  
本文详细介绍了在SpringBoot2.7+JDK8环境下,WebSocketSession无法直接序列化存储到Redis的问题,并提供了行业标准的集群方案,本文介绍的非常详细,感兴趣的朋友跟随小编一起看看吧

Spring Boot 2.7 + JDK 8 环境下,WebSocketSession 无法直接序列化存储到 Redis(它是与服务器节点绑定的TCP连接对象,跨JVM/跨节点无法复用)。

核心解决方案

行业标准集群方案:本地内存管理会话 + Redis 发布/订阅(Pub/Sub)广播消息

  1. 每个节点保留本地会话存储(你原有的ConcurrentHashMap完全保留,负责管理当前节点的连接);
  2. 发送消息时,通过 Redis Pub/Sub 将消息广播到所有集群节点;
  3. 所有节点监听Redis消息,收到后给本地的目标用户推送消息。

该方案无需序列化WebSocketSession,完美支持集群分布式部署。

完整改造步骤

一、引入依赖

pom.xml 添加 Redis 依赖(Spring Data Redis 适配 Spring Boot 2.7):

<!-- Redis 核心依赖 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- Redis 连接池 -->
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
</dependency>

二、配置Redis连接

application.yml 配置Redis:

spring:
  redis:
    host: 127.0.0.1
    port: 6379
    password:  # 有密码填写
    database: 0
    lettuce:
      pool:
        max-active: 8
        max-idle: 8
        min-idle: 0
        max-wait: -1ms

三、定义Redis常量

创建常量类,统一管理WebSocket消息频道:

package com.example.demo.config.websocket;
/**
 * WebSocket Redis 常量
 */
public interface WebSocketRedisConstants {
    /**
     * WebSocket 消息发布订阅频道
     */
    String WEBSOCKET_MESSAGE_CHANNEL = "websocket:message:channel";
}

四、改造会话管理类(核心)

将原静态工具类改为 Spring Bean,注入RedisTemplate,保留本地会话管理,新增Redis消息广播逻辑:

package com.example.demo.config.websocket;
import com.example.demo.config.LoginUser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
/**
 * 移动端WebSocket 在线用户管理(支持Redis集群)
 */
@Slf4j
@Component // 改为Spring Bean,支持注入Redis
public class MobileWebSocketUserHolder {
    /**
     * 【本地存储】在线用户会话(保留原逻辑,仅管理当前节点连接)
     */
    private static final Map<String, Set<WebSocketSession>> ONLINE_USER_MAP = new ConcurrentHashMap<>();
    @Autowired
    private StringRedisTemplate redisTemplate;
    private final ObjectMapper objectMapper = new ObjectMapper();
    // ====================== 原绑定/解绑逻辑 完全保留 ======================
    /**
     * 绑定用户与WebSocket会话(连接成功时调用)
     */
    public void bindSession(LoginUser user, WebSocketSession session) {
        if (user == null || user.getUserId() == null) {
            return;
        }
        String userId = user.getUserId();
        ONLINE_USER_MAP.computeIfAbsent(userId, k -> new CopyOnWriteArraySet<>()).add(session);
        log.info("用户{}绑定WebSocket会话,当前节点在线用户数:{}", userId, ONLINE_USER_MAP.size());
    }
    /**
     * 解绑用户与WebSocket会话(连接关闭时调用)
     */
    public void unbindSession(LoginUser user, WebSocketSession session) {
        if (user == null || user.getUserId() == null) {
            return;
        }
        String userId = user.getUserId();
        Set<WebSocketSession> sessions = ONLINE_USER_MAP.get(userId);
        if (sessions != null) {
            sessions.remove(session);
            if (sessions.isEmpty()) {
                ONLINE_USER_MAP.remove(userId);
            }
        }
        log.info("用户{}解绑WebSocket会话,当前节点在线用户数:{}", userId, ONLINE_USER_MAP.size());
    }
    // ====================== 消息发送:本地发送 + Redis广播 ======================
    /**
     * 给指定用户发送消息(集群模式)
     */
    public void sendMessageToUser(String userId, String message) {
        // 1. 当前节点直接发送消息
        sendLocalMessage(userId, message);
        // 2. 发布消息到Redis,广播给所有集群节点
        try {
            Map<String, String> msgMap = new HashMap<>(2);
            msgMap.put("userId", userId);
            msgMap.put("message", message);
            String redisMsg = objectMapper.writeValueAsString(msgMap);
            redisTemplate.convertAndSend(WebSocketRedisConstants.WEBSOCKET_MESSAGE_CHANNEL, redisMsg);
        } catch (JsonProcessingException e) {
            log.error("Redis消息序列化失败", e);
        }
    }
    /**
     * 给所有用户广播消息(集群模式)
     */
    public void sendMessageToAll(String message) {
        // 1. 当前节点广播
        ONLINE_USER_MAP.keySet().forEach(userId -> sendLocalMessage(userId, message));
        // 2. Redis广播所有节点
        try {
            Map<String, String> msgMap = new HashMap<>(2);
            msgMap.put("userId", "ALL");
            msgMap.put("message", message);
            String redisMsg = objectMapper.writeValueAsString(msgMap);
            redisTemplate.convertAndSend(WebSocketRedisConstants.WEBSOCKET_MESSAGE_CHANNEL, redisMsg);
        } catch (JsonProcessingException e) {
            log.error("Redis广播消息序列化失败", e);
        }
    }
    // ====================== 本地消息发送(私有方法) ======================
    /**
     * 仅给【当前节点】的用户发送消息
     */
    private void sendLocalMessage(String userId, String message) {
        if (userId == null || !ONLINE_USER_MAP.containsKey(userId)) {
            return;
        }
        Set<WebSocketSession> sessions = ONLINE_USER_MAP.get(userId);
        for (WebSocketSession session : sessions) {
            try {
                if (session.isOpen()) {
                    session.sendMessage(new TextMessage(message));
                }
            } catch (Exception e) {
                log.error("本地给用户{}发送消息失败", userId, e);
            }
        }
    }
    // ====================== 原查询逻辑 完全保留 ======================
    public Set<String> getOnlineUsers() {
        return new HashSet<>(ONLINE_USER_MAP.keySet());
    }
    public boolean isOnline(String userId) {
        return userId != null && ONLINE_USER_MAP.containsKey(userId);
    }
}

五、创建Redis消息监听器

监听Redis频道,接收广播消息并本地推送

package com.example.demo.config.websocket;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
 * Redis WebSocket 消息监听器
 */
@Slf4j
@Component
public class WebSocketRedisListener implements MessageListener {
    @Autowired
    private MobileWebSocketUserHolder webSocketUserHolder;
    private final ObjectMapper objectMapper = new ObjectMapper();
    @Override
    public void onMessage(Message message, byte[] pattern) {
        try {
            // 解析Redis消息
            String msgBody = new String(message.getBody());
            Map<String, String> msgMap = objectMapper.readValue(msgBody, new TypeReference<Map<String, String>>() {});
            String userId = msgMap.get("userId");
            String content = msgMap.get("message");
            // 本地发送消息
            if ("ALL".equals(userId)) {
                webSocketUserHolder.getOnlineUsers().forEach(uid -> webSocketUserHolder.sendLocalMessage(uid, content));
            } else {
                webSocketUserHolder.sendLocalMessage(userId, content);
            }
        } catch (Exception e) {
            log.error("处理Redis WebSocket消息失败", e);
        }
    }
}

六、配置Redis发布/订阅

注册Redis监听容器,绑定频道和监听器:

package com.example.demo.config;
import com.example.demo.config.websocket.WebSocketRedisConstants;
import com.example.demo.config.websocket.WebSocketRedisListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
@Configuration
public class RedisConfig {
    /**
     * 注册Redis消息监听器
     */
    @Bean
    public MessageListenerAdapter webSocketListenerAdapter(WebSocketRedisListener listener) {
        return new MessageListenerAdapter(listener);
    }
    /**
     * 配置Redis监听容器
     */
    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(
            RedisConnectionFactory connectionFactory,
            MessageListenerAdapter webSocketListenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        // 绑定监听频道
        container.addMessageListener(webSocketListenerAdapter,
                new PatternTopic(WebSocketRedisConstants.WEBSOCKET_MESSAGE_CHANNEL));
        return container;
    }
}

七、修改WebSocket处理器(调用处适配)

由于MobileWebSocketUserHolder改为了Spring Bean,你需要在WebSocket处理器中注入使用,而非直接静态调用:

// 原代码(静态调用)
MobileWebSocketUserHolder.bindSession(user, session);
// 改造后(注入调用)
@Autowired
private MobileWebSocketUserHolder webSocketUserHolder;
webSocketUserHolder.bindSession(user, session);

方案原理说明

  1. 本地会话管理:每个节点只管理自己的WebSocketSession(内存存储,性能最高),不跨节点共享;
  2. Redis广播:任意节点调用发送消息接口时,会先给本地用户发消息,再通过Redis Pub/Sub把消息发给所有集群节点;
  3. 集群推送:所有节点监听Redis消息,收到后给本地的目标用户推送消息,实现全集群消息触达。

集群部署注意事项

  1. 用户认证一致性:集群所有节点的登录认证逻辑必须一致(用户ID生成规则相同);
  2. Redis 高可用:生产环境使用Redis集群/哨兵模式,避免单点故障;
  3. Session 共享非必须:本方案不需要共享WebSocketSession,这是最轻量化、最高效的集群方案;
  4. 心跳/重连:保留原有的WebSocket心跳机制,客户端断开后自动重连到任意集群节点即可。

总结

  1. 核心方案:本地内存管理会话 + Redis Pub/Sub 广播消息(Spring Boot WebSocket集群标准方案);
  2. 无需序列化:规避了WebSocketSession无法存储Redis的问题;
  3. 兼容原有逻辑:90%代码复用,仅改造消息发送逻辑;
  4. 生产可用:支持多节点集群部署,无状态、高可用。

到此这篇关于Spring Boot 2.7 + JDK 8 实现 WebSocket 集群分布式部署方案(基于 Redis Pub/Sub 方案)的文章就介绍到这了,更多相关Spring Boot JDK WebSocket 集群内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • spring boot3自动配置与手动配置的全过程

    spring boot3自动配置与手动配置的全过程

    本文给大家介绍了spring boot3自动配置与手动配置的全过程,本文结合实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧
    2026-01-01
  • SpringMVC中解决@ResponseBody注解返回中文乱码问题

    SpringMVC中解决@ResponseBody注解返回中文乱码问题

    这篇文章主要介绍了SpringMVC中解决@ResponseBody注解返回中文乱码问题, 小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-04-04
  • SpringBoot 整合mybatis+mybatis-plus的详细步骤

    SpringBoot 整合mybatis+mybatis-plus的详细步骤

    这篇文章主要介绍了SpringBoot 整合mybatis+mybatis-plus的步骤,本文通过图文并茂的形式给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2022-06-06
  • 阿里巴巴 Sentinel + InfluxDB + Chronograf 实现监控大屏

    阿里巴巴 Sentinel + InfluxDB + Chronograf 实现监控大屏

    这篇文章主要介绍了阿里巴巴 Sentinel + InfluxDB + Chronograf 实现监控大屏,本文通过实例代码给大家介绍的非常详细,具有一定的参考借鉴价值,需要的朋友可以参考下
    2019-09-09
  • Java service层获取HttpServletRequest工具类的方法

    Java service层获取HttpServletRequest工具类的方法

    今天小编就为大家分享一篇关于Java service层获取HttpServletRequest工具类的方法,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
    2018-12-12
  • java8中forkjoin和optional框架使用

    java8中forkjoin和optional框架使用

    这篇文章主要介绍了java8中forkjoin和optional框架使用心得以及用法讲解,需要的朋友参考下吧。
    2017-12-12
  • Java设计模式中的观察者模式

    Java设计模式中的观察者模式

    观察者模式定义对象之间的一种一对多的依赖关系,使得每当一个对象的状态发生变化时,其相关的依赖对象都可以得到通知并被自动更新。主要用于多个不同的对象对一个对象的某个方法会做出不同的反应
    2023-02-02
  • Java创建线程的方式解析

    Java创建线程的方式解析

    这篇文章主要介绍了Java创建线程的方式解析,文章围绕主题展开详细的内容介绍,具有一定的参考价值,需要的小伙伴可以参考一下,希望对你的学习有所帮助
    2022-07-07
  • 如何利用Java Agent 做Spring MVC Controller 层的出参入参打印日志

    如何利用Java Agent 做Spring MVC Controller 层的出参入参打印日志

    本文介绍了如何使用JavaAgent进行Spring MVC Controller层的出参入参打印日志,首先,建立了一个包含javassist和fastJSON依赖的Agent jar工程,并创建了一个Agent类,然后,编译并部署了这个Agent jar,最后,在Demo Web工程中启用Agent以实现日志打印
    2024-11-11
  • Spring Security跳转页面失败问题解决

    Spring Security跳转页面失败问题解决

    这篇文章主要介绍了Spring Security跳转页面失败问题解决,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-01-01

最新评论