Spring Boot 中实现 WebSocket 集群思路详解

 更新时间:2025年12月10日 16:44:00   作者:黄三技术java  
本文介绍了如何在SpringBoot中实现WebSocket集群处理,解决了会话共享和消息同步问题,通过分布式存储(Redis)和消息队列(RedisPub/Sub)实现跨节点通信,感兴趣的朋友跟随小编一起看看吧

在 Spring Boot 中实现 WebSocket 集群处理,核心挑战是解决多节点间的 WebSocket 会话共享和消息同步问题(因为单节点 WebSocket 的会话是内存存储的,多节点间无法直接通信)。以下是完整的实现方案:

一、核心思路

  1. 会话共享:使用分布式存储(如 Redis)存储所有节点的 WebSocket 会话信息(会话 ID、用户标识、节点标识等)。
  2. 消息转发:当某节点需要向用户发送消息时,先通过 Redis 判断用户连接在哪个节点,再通过消息队列(如 Redis Pub/Sub)将消息转发到目标节点,由目标节点推送消息给用户。
  3. 集群感知:每个节点启动时注册自身信息,下线时注销,确保消息能正确路由。

二、技术选型

  • WebSocket 框架:Spring WebSocket(基于 JSR-356 标准)
  • 分布式存储:Redis(存储会话映射、节点信息)
  • 消息队列:Redis Pub/Sub(节点间消息转发)
  • 依赖管理:Spring Boot Starter WebSocket + Spring Data Redis

三、实现步骤

1. 引入依赖

在pom.xml中添加以下依赖:

<!-- Spring WebSocket -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!-- Redis (用于会话存储和消息转发) -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- 连接池 (可选,优化Redis性能) -->
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
</dependency>

2. 配置 Redis

在application.yml中配置 Redis 连接:

spring:
  redis:
    host: localhost  # Redis服务器地址
    port: 6379       # Redis端口
    password:        # 密码(如有)
    lettuce:
      pool:
        max-active: 8  # 最大连接数
        max-idle: 8    # 最大空闲连接
        min-idle: 2    # 最小空闲连接

3. WebSocket 核心配置

实现 WebSocket 配置类,注册端点和消息处理器:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import org.springframework.web.socket.server.HandshakeInterceptor;
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
    private final WebSocketHandler webSocketHandler;
    private final HandshakeInterceptor handshakeInterceptor;
    // 注入自定义处理器和拦截器
    public WebSocketConfig(WebSocketHandler webSocketHandler, HandshakeInterceptor handshakeInterceptor) {
        this.webSocketHandler = webSocketHandler;
        this.handshakeInterceptor = handshakeInterceptor;
    }
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        // 注册WebSocket端点,允许跨域
        registry.addHandler(webSocketHandler, "/ws")
                .addInterceptors(handshakeInterceptor)
                .setAllowedOrigins("*"); // 生产环境需指定具体域名
    }
}

4. 会话管理与集群同步

需要解决 3 个核心问题:会话注册消息路由节点间消息转发

4.1 定义常量(Redis Key)

public class RedisKeyConstants {
    // 存储用户-节点映射:key=userId,value=nodeId
    public static final String USER_NODE_MAP = "websocket:user:node";
    // 存储节点信息:key=nodeId,value=节点信息(如IP:端口)
    public static final String NODE_INFO = "websocket:node:info";
    // Redis Pub/Sub频道(用于节点间消息转发)
    public static final String MSG_CHANNEL = "websocket:msg:channel";
}

4.2 握手拦截器(记录会话与用户映射)

在连接建立时,将用户 ID 与当前节点 ID 关联并存储到 Redis:

import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
import org.springframework.data.redis.core.StringRedisTemplate;
import javax.servlet.http.HttpSession;
import java.util.Map;
import java.util.UUID;
@Component
public class WebSocketHandshakeInterceptor implements HandshakeInterceptor {
    private final StringRedisTemplate redisTemplate;
    private final String nodeId; // 当前节点唯一标识(如UUID)
    public WebSocketHandshakeInterceptor(StringRedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
        this.nodeId = UUID.randomUUID().toString(); // 节点启动时生成唯一ID
    }
    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
                                   WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
        // 从请求参数或Session中获取用户ID(根据业务场景调整)
        ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request;
        HttpSession session = servletRequest.getServletRequest().getSession();
        String userId = (String) session.getAttribute("userId"); // 假设用户已登录,Session中存userId
        if (userId != null) {
            // 将用户与当前节点关联(存储到Redis)
            redisTemplate.opsForValue().set(
                    RedisKeyConstants.USER_NODE_MAP + ":" + userId,
                    nodeId
            );
            // 注册当前节点信息(可选,用于监控)
            redisTemplate.opsForValue().set(
                    RedisKeyConstants.NODE_INFO + ":" + nodeId,
                    request.getLocalAddress().toString() // 节点地址
            );
            attributes.put("userId", userId); // 传递userId到处理器
            return true;
        }
        return false; // 未登录用户拒绝连接
    }
    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,
                               WebSocketHandler wsHandler, Exception exception) {}
}

4.3 WebSocket 消息处理器

处理消息接收,并通过 Redis Pub/Sub 转发跨节点消息:

import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class WebSocketHandler extends TextWebSocketHandler {
    // 本地会话缓存(当前节点的WebSocket会话)
    private final Map<String, WebSocketSession> localSessions = new ConcurrentHashMap<>();
    private final StringRedisTemplate redisTemplate;
    private final ObjectMapper objectMapper;
    private final String nodeId; // 当前节点ID(与拦截器一致)
    public WebSocketHandler(StringRedisTemplate redisTemplate, ObjectMapper objectMapper,
                           @Value("${websocket.node.id}") String nodeId) {
        this.redisTemplate = redisTemplate;
        this.objectMapper = objectMapper;
        this.nodeId = nodeId;
        // 订阅Redis频道,接收其他节点的消息
        subscribeToRedisChannel();
    }
    // 连接建立时,缓存本地会话
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        String userId = (String) session.getAttributes().get("userId");
        localSessions.put(userId, session);
    }
    // 接收客户端消息
    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        String userId = (String) session.getAttributes().get("userId");
        String payload = message.getPayload(); // 客户端发送的消息
        // 解析消息(假设消息格式:{"targetUserId": "xxx", "content": "xxx"})
        Map<String, String> msg = objectMapper.readValue(payload, Map.class);
        String targetUserId = msg.get("targetUserId");
        String content = msg.get("content");
        // 发送消息给目标用户(跨节点则转发)
        sendMessageToUser(targetUserId, content);
    }
    // 发送消息给指定用户
    public void sendMessageToUser(String targetUserId, String content) throws IOException {
        // 1. 从Redis获取目标用户所在节点
        String targetNodeId = redisTemplate.opsForValue().get(
                RedisKeyConstants.USER_NODE_MAP + ":" + targetUserId
        );
        if (targetNodeId == null) {
            throw new RuntimeException("用户未连接");
        }
        // 2. 若目标节点是当前节点,直接发送;否则通过Redis转发
        if (targetNodeId.equals(nodeId)) {
            WebSocketSession session = localSessions.get(targetUserId);
            if (session != null && session.isOpen()) {
                session.sendMessage(new TextMessage(content));
            }
        } else {
            // 构造跨节点消息(包含目标用户、内容、目标节点)
            Map<String, String> crossMsg = new HashMap<>();
            crossMsg.put("targetUserId", targetUserId);
            crossMsg.put("content", content);
            crossMsg.put("targetNodeId", targetNodeId);
            // 发布到Redis频道
            redisTemplate.convertAndSend(
                    RedisKeyConstants.MSG_CHANNEL,
                    objectMapper.writeValueAsString(crossMsg)
            );
        }
    }
    // 订阅Redis频道,接收其他节点的消息并转发给本地用户
    private void subscribeToRedisChannel() {
        redisTemplate.getConnectionFactory().getConnection().subscribe(
                message -> {
                    try {
                        String payload = new String(message.getBody());
                        Map<String, String> crossMsg = objectMapper.readValue(payload, Map.class);
                        // 仅处理目标节点为当前节点的消息
                        if (crossMsg.get("targetNodeId").equals(nodeId)) {
                            String targetUserId = crossMsg.get("targetUserId");
                            String content = crossMsg.get("content");
                            WebSocketSession session = localSessions.get(targetUserId);
                            if (session != null && session.isOpen()) {
                                session.sendMessage(new TextMessage(content));
                            }
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                },
                RedisKeyConstants.MSG_CHANNEL.getBytes()
        );
    }
    // 连接关闭时,清理会话和Redis映射
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        String userId = (String) session.getAttributes().get("userId");
        localSessions.remove(userId);
        // 删除Redis中的用户-节点映射
        redisTemplate.delete(RedisKeyConstants.USER_NODE_MAP + ":" + userId);
    }
}

4.4 节点 ID 配置

在application.yml中配置节点 ID(也可启动时自动生成,确保唯一):

websocket:
  node:
    id: ${HOSTNAME:node-${random.uuid}} # 优先使用主机名,否则随机生成

四、集群测试

  1. 启动多个 Spring Boot 实例(模拟集群节点),确保都连接到同一个 Redis。
  2. 客户端 1 连接节点 A,客户端 2 连接节点 B。
  3. 客户端 1 发送消息给客户端 2,消息流程:节点 A 接收消息,查询 Redis 发现客户端 2 在节点 B。节点 A 通过 Redis Pub/Sub 发布消息到MSG_CHANNEL。节点 B 订阅了该频道,接收消息并推送给客户端 2。

五、优化建议

  1. 会话过期清理:给 Redis 中的USER_NODE_MAP设置过期时间(如 30 分钟),并通过 WebSocket 心跳机制续约。
  2. 消息可靠性:Redis Pub/Sub 是 fire-and-forget 模式,若需可靠消息,可替换为 RabbitMQ 或 Kafka。
  3. 负载均衡:前端通过 Nginx 等负载均衡器连接 WebSocket 集群(需配置proxy_set_header Upgrade $http_upgrade;等参数支持 WebSocket)。
  4. 监控:通过 Redis 的NODE_INFO键监控节点状态,实现故障转移

到此这篇关于Spring Boot 中实现 WebSocket 集群处理的文章就介绍到这了,更多相关Spring Boot WebSocket 集群内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • java实现简单单链表

    java实现简单单链表

    这篇文章主要为大家详细介绍了java实现简单单链表,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-02-02
  • resultMap如何处理复杂映射问题

    resultMap如何处理复杂映射问题

    这篇文章主要介绍了resultMap如何处理复杂映射问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2025-04-04
  • Mybatis_plus基础教程(总结篇)

    Mybatis_plus基础教程(总结篇)

    这篇文章主要介绍了Mybatis_plus基础教程(总结篇),本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-09-09
  • SpringBoot一个请求的处理全过程分享

    SpringBoot一个请求的处理全过程分享

    本文详细介绍了SpringBoot请求处理的全过程,包括过滤器链、拦截器链、路径映射、参数绑定、Controller方法执行、返回值处理、异常解析和视图解析渲染等步骤,同时,文中还列举了请求处理过程中常见的问题及解决方案
    2024-12-12
  • Java基础之如何学好Java

    Java基础之如何学好Java

    这篇文章已经是有数年“网龄”的老文,不过在今天看来仍然经典。如何学习java?本篇文章可以说也是面对编程初学者的一篇指导文章,其中对于如何学习Java的步骤的介绍,很多也适用于开发领域其他技能的学习。
    2014-10-10
  • Java 注解@PostConstruct的原理及最佳使用场景分析

    Java 注解@PostConstruct的原理及最佳使用场景分析

    @PostConstruct 是 Java 中非常实用的注解,尤其是在 Spring 等框架中,它使得开发者可以方便地在 Bean 初始化后执行额外的操作,本文给大家介绍@PostConstruct 的原理、使用场景及最佳实践,感兴趣的朋友一起看看吧
    2025-04-04
  • 列举java语言中反射的常用方法及实例代码

    列举java语言中反射的常用方法及实例代码

    反射机制指的是程序在运行时能够获取自身的信息。这篇文章主要介绍了列举java语言中反射的常用方法,需要的朋友可以参考下
    2019-07-07
  • ava实现一致性Hash算法

    ava实现一致性Hash算法

    本文主要详细介绍了Java如何实现一致性Hash算法,其实现原理将key映射到 2^32 - 1 的空间中,将这个数字的首尾相连,形成一个环。想了解更多的同学,可以参考本文
    2023-03-03
  • 关于RedisTemplate之opsForValue的使用说明

    关于RedisTemplate之opsForValue的使用说明

    这篇文章主要介绍了关于RedisTemplate之opsForValue的使用说明,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-06-06
  • 基于spring boot 1.5.4 集成 jpa+hibernate+jdbcTemplate(详解)

    基于spring boot 1.5.4 集成 jpa+hibernate+jdbcTemplate(详解)

    下面小编就为大家带来一篇基于spring boot 1.5.4 集成 jpa+hibernate+jdbcTemplate(详解)。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-06-06

最新评论