Spring Boot 中实现 WebSocket 集群思路详解

 更新时间:2025年12月10日 16:44:00   作者:黄三技术java  
本文介绍了如何在SpringBoot中实现WebSocket集群处理,解决了会话共享和消息同步问题,通过分布式存储(Redis)和消息队列(RedisPub/Sub)实现跨节点通信,感兴趣的朋友跟随小编一起看看吧

在 Spring Boot 中实现 WebSocket 集群处理,核心挑战是解决多节点间的 WebSocket 会话共享和消息同步问题(因为单节点 WebSocket 的会话是内存存储的,多节点间无法直接通信)。以下是完整的实现方案:

一、核心思路

  1. 会话共享:使用分布式存储(如 Redis)存储所有节点的 WebSocket 会话信息(会话 ID、用户标识、节点标识等)。
  2. 消息转发:当某节点需要向用户发送消息时,先通过 Redis 判断用户连接在哪个节点,再通过消息队列(如 Redis Pub/Sub)将消息转发到目标节点,由目标节点推送消息给用户。
  3. 集群感知:每个节点启动时注册自身信息,下线时注销,确保消息能正确路由。

二、技术选型

  • WebSocket 框架:Spring WebSocket(基于 JSR-356 标准)
  • 分布式存储:Redis(存储会话映射、节点信息)
  • 消息队列:Redis Pub/Sub(节点间消息转发)
  • 依赖管理:Spring Boot Starter WebSocket + Spring Data Redis

三、实现步骤

1. 引入依赖

在pom.xml中添加以下依赖:

<!-- Spring WebSocket -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!-- Redis (用于会话存储和消息转发) -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- 连接池 (可选,优化Redis性能) -->
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
</dependency>

2. 配置 Redis

在application.yml中配置 Redis 连接:

spring:
  redis:
    host: localhost  # Redis服务器地址
    port: 6379       # Redis端口
    password:        # 密码(如有)
    lettuce:
      pool:
        max-active: 8  # 最大连接数
        max-idle: 8    # 最大空闲连接
        min-idle: 2    # 最小空闲连接

3. WebSocket 核心配置

实现 WebSocket 配置类,注册端点和消息处理器:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import org.springframework.web.socket.server.HandshakeInterceptor;
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
    private final WebSocketHandler webSocketHandler;
    private final HandshakeInterceptor handshakeInterceptor;
    // 注入自定义处理器和拦截器
    public WebSocketConfig(WebSocketHandler webSocketHandler, HandshakeInterceptor handshakeInterceptor) {
        this.webSocketHandler = webSocketHandler;
        this.handshakeInterceptor = handshakeInterceptor;
    }
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        // 注册WebSocket端点,允许跨域
        registry.addHandler(webSocketHandler, "/ws")
                .addInterceptors(handshakeInterceptor)
                .setAllowedOrigins("*"); // 生产环境需指定具体域名
    }
}

4. 会话管理与集群同步

需要解决 3 个核心问题:会话注册消息路由节点间消息转发

4.1 定义常量(Redis Key)

public class RedisKeyConstants {
    // 存储用户-节点映射:key=userId,value=nodeId
    public static final String USER_NODE_MAP = "websocket:user:node";
    // 存储节点信息:key=nodeId,value=节点信息(如IP:端口)
    public static final String NODE_INFO = "websocket:node:info";
    // Redis Pub/Sub频道(用于节点间消息转发)
    public static final String MSG_CHANNEL = "websocket:msg:channel";
}

4.2 握手拦截器(记录会话与用户映射)

在连接建立时,将用户 ID 与当前节点 ID 关联并存储到 Redis:

import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
import org.springframework.data.redis.core.StringRedisTemplate;
import javax.servlet.http.HttpSession;
import java.util.Map;
import java.util.UUID;
@Component
public class WebSocketHandshakeInterceptor implements HandshakeInterceptor {
    private final StringRedisTemplate redisTemplate;
    private final String nodeId; // 当前节点唯一标识(如UUID)
    public WebSocketHandshakeInterceptor(StringRedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
        this.nodeId = UUID.randomUUID().toString(); // 节点启动时生成唯一ID
    }
    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
                                   WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
        // 从请求参数或Session中获取用户ID(根据业务场景调整)
        ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request;
        HttpSession session = servletRequest.getServletRequest().getSession();
        String userId = (String) session.getAttribute("userId"); // 假设用户已登录,Session中存userId
        if (userId != null) {
            // 将用户与当前节点关联(存储到Redis)
            redisTemplate.opsForValue().set(
                    RedisKeyConstants.USER_NODE_MAP + ":" + userId,
                    nodeId
            );
            // 注册当前节点信息(可选,用于监控)
            redisTemplate.opsForValue().set(
                    RedisKeyConstants.NODE_INFO + ":" + nodeId,
                    request.getLocalAddress().toString() // 节点地址
            );
            attributes.put("userId", userId); // 传递userId到处理器
            return true;
        }
        return false; // 未登录用户拒绝连接
    }
    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,
                               WebSocketHandler wsHandler, Exception exception) {}
}

4.3 WebSocket 消息处理器

处理消息接收,并通过 Redis Pub/Sub 转发跨节点消息:

import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class WebSocketHandler extends TextWebSocketHandler {
    // 本地会话缓存(当前节点的WebSocket会话)
    private final Map<String, WebSocketSession> localSessions = new ConcurrentHashMap<>();
    private final StringRedisTemplate redisTemplate;
    private final ObjectMapper objectMapper;
    private final String nodeId; // 当前节点ID(与拦截器一致)
    public WebSocketHandler(StringRedisTemplate redisTemplate, ObjectMapper objectMapper,
                           @Value("${websocket.node.id}") String nodeId) {
        this.redisTemplate = redisTemplate;
        this.objectMapper = objectMapper;
        this.nodeId = nodeId;
        // 订阅Redis频道,接收其他节点的消息
        subscribeToRedisChannel();
    }
    // 连接建立时,缓存本地会话
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        String userId = (String) session.getAttributes().get("userId");
        localSessions.put(userId, session);
    }
    // 接收客户端消息
    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        String userId = (String) session.getAttributes().get("userId");
        String payload = message.getPayload(); // 客户端发送的消息
        // 解析消息(假设消息格式:{"targetUserId": "xxx", "content": "xxx"})
        Map<String, String> msg = objectMapper.readValue(payload, Map.class);
        String targetUserId = msg.get("targetUserId");
        String content = msg.get("content");
        // 发送消息给目标用户(跨节点则转发)
        sendMessageToUser(targetUserId, content);
    }
    // 发送消息给指定用户
    public void sendMessageToUser(String targetUserId, String content) throws IOException {
        // 1. 从Redis获取目标用户所在节点
        String targetNodeId = redisTemplate.opsForValue().get(
                RedisKeyConstants.USER_NODE_MAP + ":" + targetUserId
        );
        if (targetNodeId == null) {
            throw new RuntimeException("用户未连接");
        }
        // 2. 若目标节点是当前节点,直接发送;否则通过Redis转发
        if (targetNodeId.equals(nodeId)) {
            WebSocketSession session = localSessions.get(targetUserId);
            if (session != null && session.isOpen()) {
                session.sendMessage(new TextMessage(content));
            }
        } else {
            // 构造跨节点消息(包含目标用户、内容、目标节点)
            Map<String, String> crossMsg = new HashMap<>();
            crossMsg.put("targetUserId", targetUserId);
            crossMsg.put("content", content);
            crossMsg.put("targetNodeId", targetNodeId);
            // 发布到Redis频道
            redisTemplate.convertAndSend(
                    RedisKeyConstants.MSG_CHANNEL,
                    objectMapper.writeValueAsString(crossMsg)
            );
        }
    }
    // 订阅Redis频道,接收其他节点的消息并转发给本地用户
    private void subscribeToRedisChannel() {
        redisTemplate.getConnectionFactory().getConnection().subscribe(
                message -> {
                    try {
                        String payload = new String(message.getBody());
                        Map<String, String> crossMsg = objectMapper.readValue(payload, Map.class);
                        // 仅处理目标节点为当前节点的消息
                        if (crossMsg.get("targetNodeId").equals(nodeId)) {
                            String targetUserId = crossMsg.get("targetUserId");
                            String content = crossMsg.get("content");
                            WebSocketSession session = localSessions.get(targetUserId);
                            if (session != null && session.isOpen()) {
                                session.sendMessage(new TextMessage(content));
                            }
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                },
                RedisKeyConstants.MSG_CHANNEL.getBytes()
        );
    }
    // 连接关闭时,清理会话和Redis映射
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        String userId = (String) session.getAttributes().get("userId");
        localSessions.remove(userId);
        // 删除Redis中的用户-节点映射
        redisTemplate.delete(RedisKeyConstants.USER_NODE_MAP + ":" + userId);
    }
}

4.4 节点 ID 配置

在application.yml中配置节点 ID(也可启动时自动生成,确保唯一):

websocket:
  node:
    id: ${HOSTNAME:node-${random.uuid}} # 优先使用主机名,否则随机生成

四、集群测试

  1. 启动多个 Spring Boot 实例(模拟集群节点),确保都连接到同一个 Redis。
  2. 客户端 1 连接节点 A,客户端 2 连接节点 B。
  3. 客户端 1 发送消息给客户端 2,消息流程:节点 A 接收消息,查询 Redis 发现客户端 2 在节点 B。节点 A 通过 Redis Pub/Sub 发布消息到MSG_CHANNEL。节点 B 订阅了该频道,接收消息并推送给客户端 2。

五、优化建议

  1. 会话过期清理:给 Redis 中的USER_NODE_MAP设置过期时间(如 30 分钟),并通过 WebSocket 心跳机制续约。
  2. 消息可靠性:Redis Pub/Sub 是 fire-and-forget 模式,若需可靠消息,可替换为 RabbitMQ 或 Kafka。
  3. 负载均衡:前端通过 Nginx 等负载均衡器连接 WebSocket 集群(需配置proxy_set_header Upgrade $http_upgrade;等参数支持 WebSocket)。
  4. 监控:通过 Redis 的NODE_INFO键监控节点状态,实现故障转移

到此这篇关于Spring Boot 中实现 WebSocket 集群处理的文章就介绍到这了,更多相关Spring Boot WebSocket 集群内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 一文让你了解透彻Java中的IO模型

    一文让你了解透彻Java中的IO模型

    本文只是说明了IO模型,让你了解IO模型是什么,怎么区分IO模型,以及分析了Java中的三种IO模型,本文是纯理论知识,看完之后会让你对IO有更加深刻的理解,感兴趣的同学可以参考一下
    2023-05-05
  • 基于Java编写一个html转pdf的工具类

    基于Java编写一个html转pdf的工具类

    这篇文章主要为大家详细介绍了如何基于Java编写一个html转pdf的工具类,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起了解下
    2025-09-09
  • Java中DecimalFormat用法及符号含义

    Java中DecimalFormat用法及符号含义

    DecimalFormat是NumberFormat的一个具体子类,用于格式化十进制数字。这篇文章介绍了DecimalFormat的用法及符号含义,需要的朋友可以收藏下,方便下次浏览观看
    2021-12-12
  • Java中Map与对象之间互相转换的几种常用方式

    Java中Map与对象之间互相转换的几种常用方式

    在Java中将对象和Map相互转换是常见的操作,可以通过不同的方式实现这种转换,下面这篇文章主要给大家介绍了关于Java中Map与对象之间互相转换的几种常用方式,需要的朋友可以参考下
    2024-01-01
  • Spring Boot基于 JWT 优化 Spring Security 无状态登录实战指南

    Spring Boot基于 JWT 优化 Spring Security 无状态登录实战指南

    本文介绍如何使用JWT优化SpringSecurity实现无状态登录,提高接口安全性,并通过实际操作步骤展示了如何配置JWT参数、实现JWT登录接口、认证过滤器等,感兴趣的朋友跟随小编一起看看吧
    2025-11-11
  • java并发中DelayQueue延迟队列原理剖析

    java并发中DelayQueue延迟队列原理剖析

    DelayQueue队列是一个延迟队列,本文将结合实例代码,详细的介绍DelayQueue延迟队列的源码分析,感兴趣的小伙伴们可以参考一下
    2021-06-06
  • Java中MapStruct 映射过程中忽略某个字段的实现

    Java中MapStruct 映射过程中忽略某个字段的实现

    在MapStruct中,如果你想要在映射过程中忽略某个字段,可以使用 @Mapping注解的ignore属性,本文就来介绍一下Java中MapStruct映射过程中忽略某个字段的实现,感兴趣的可以了解一下
    2025-11-11
  • Java中的信号量Semaphore详细解读

    Java中的信号量Semaphore详细解读

    这篇文章主要介绍了Java中的信号量Semaphore详细解读,Java信号量机制可以用来保证线程互斥,创建Semaphore对象传入一个整形参数,类似于公共资源,需要的朋友可以参考下
    2023-11-11
  • 简单总结单例模式的4种写法

    简单总结单例模式的4种写法

    今天带大家学习java的相关知识,文章围绕着单例模式的4种写法展开,文中有非常详细的介绍及代码示例,需要的朋友可以参考下
    2021-06-06
  • Java SerialVersionUID作用详解

    Java SerialVersionUID作用详解

    这篇文章主要介绍了Java SerialVersionUID作用详解,本篇文章通过简要的案例,讲解了该项技术的了解与使用,以下就是详细内容,需要的朋友可以参考下
    2021-08-08

最新评论