Spring Boot 中实现 WebSocket 集群思路详解
在 Spring Boot 中实现 WebSocket 集群处理,核心挑战是解决多节点间的 WebSocket 会话共享和消息同步问题(因为单节点 WebSocket 的会话是内存存储的,多节点间无法直接通信)。以下是完整的实现方案:
一、核心思路
- 会话共享:使用分布式存储(如 Redis)存储所有节点的 WebSocket 会话信息(会话 ID、用户标识、节点标识等)。
- 消息转发:当某节点需要向用户发送消息时,先通过 Redis 判断用户连接在哪个节点,再通过消息队列(如 Redis Pub/Sub)将消息转发到目标节点,由目标节点推送消息给用户。
- 集群感知:每个节点启动时注册自身信息,下线时注销,确保消息能正确路由。
二、技术选型
- WebSocket 框架:Spring WebSocket(基于 JSR-356 标准)
- 分布式存储:Redis(存储会话映射、节点信息)
- 消息队列:Redis Pub/Sub(节点间消息转发)
- 依赖管理:Spring Boot Starter WebSocket + Spring Data Redis
三、实现步骤
1. 引入依赖
在pom.xml中添加以下依赖:
<!-- Spring WebSocket -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!-- Redis (用于会话存储和消息转发) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- 连接池 (可选,优化Redis性能) -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>2. 配置 Redis
在application.yml中配置 Redis 连接:
spring:
redis:
host: localhost # Redis服务器地址
port: 6379 # Redis端口
password: # 密码(如有)
lettuce:
pool:
max-active: 8 # 最大连接数
max-idle: 8 # 最大空闲连接
min-idle: 2 # 最小空闲连接3. WebSocket 核心配置
实现 WebSocket 配置类,注册端点和消息处理器:
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import org.springframework.web.socket.server.HandshakeInterceptor;
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
private final WebSocketHandler webSocketHandler;
private final HandshakeInterceptor handshakeInterceptor;
// 注入自定义处理器和拦截器
public WebSocketConfig(WebSocketHandler webSocketHandler, HandshakeInterceptor handshakeInterceptor) {
this.webSocketHandler = webSocketHandler;
this.handshakeInterceptor = handshakeInterceptor;
}
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
// 注册WebSocket端点,允许跨域
registry.addHandler(webSocketHandler, "/ws")
.addInterceptors(handshakeInterceptor)
.setAllowedOrigins("*"); // 生产环境需指定具体域名
}
}4. 会话管理与集群同步
需要解决 3 个核心问题:会话注册、消息路由、节点间消息转发。
4.1 定义常量(Redis Key)
public class RedisKeyConstants {
// 存储用户-节点映射:key=userId,value=nodeId
public static final String USER_NODE_MAP = "websocket:user:node";
// 存储节点信息:key=nodeId,value=节点信息(如IP:端口)
public static final String NODE_INFO = "websocket:node:info";
// Redis Pub/Sub频道(用于节点间消息转发)
public static final String MSG_CHANNEL = "websocket:msg:channel";
}4.2 握手拦截器(记录会话与用户映射)
在连接建立时,将用户 ID 与当前节点 ID 关联并存储到 Redis:
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
import org.springframework.data.redis.core.StringRedisTemplate;
import javax.servlet.http.HttpSession;
import java.util.Map;
import java.util.UUID;
@Component
public class WebSocketHandshakeInterceptor implements HandshakeInterceptor {
private final StringRedisTemplate redisTemplate;
private final String nodeId; // 当前节点唯一标识(如UUID)
public WebSocketHandshakeInterceptor(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
this.nodeId = UUID.randomUUID().toString(); // 节点启动时生成唯一ID
}
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
// 从请求参数或Session中获取用户ID(根据业务场景调整)
ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request;
HttpSession session = servletRequest.getServletRequest().getSession();
String userId = (String) session.getAttribute("userId"); // 假设用户已登录,Session中存userId
if (userId != null) {
// 将用户与当前节点关联(存储到Redis)
redisTemplate.opsForValue().set(
RedisKeyConstants.USER_NODE_MAP + ":" + userId,
nodeId
);
// 注册当前节点信息(可选,用于监控)
redisTemplate.opsForValue().set(
RedisKeyConstants.NODE_INFO + ":" + nodeId,
request.getLocalAddress().toString() // 节点地址
);
attributes.put("userId", userId); // 传递userId到处理器
return true;
}
return false; // 未登录用户拒绝连接
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler wsHandler, Exception exception) {}
}4.3 WebSocket 消息处理器
处理消息接收,并通过 Redis Pub/Sub 转发跨节点消息:
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class WebSocketHandler extends TextWebSocketHandler {
// 本地会话缓存(当前节点的WebSocket会话)
private final Map<String, WebSocketSession> localSessions = new ConcurrentHashMap<>();
private final StringRedisTemplate redisTemplate;
private final ObjectMapper objectMapper;
private final String nodeId; // 当前节点ID(与拦截器一致)
public WebSocketHandler(StringRedisTemplate redisTemplate, ObjectMapper objectMapper,
@Value("${websocket.node.id}") String nodeId) {
this.redisTemplate = redisTemplate;
this.objectMapper = objectMapper;
this.nodeId = nodeId;
// 订阅Redis频道,接收其他节点的消息
subscribeToRedisChannel();
}
// 连接建立时,缓存本地会话
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
String userId = (String) session.getAttributes().get("userId");
localSessions.put(userId, session);
}
// 接收客户端消息
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
String userId = (String) session.getAttributes().get("userId");
String payload = message.getPayload(); // 客户端发送的消息
// 解析消息(假设消息格式:{"targetUserId": "xxx", "content": "xxx"})
Map<String, String> msg = objectMapper.readValue(payload, Map.class);
String targetUserId = msg.get("targetUserId");
String content = msg.get("content");
// 发送消息给目标用户(跨节点则转发)
sendMessageToUser(targetUserId, content);
}
// 发送消息给指定用户
public void sendMessageToUser(String targetUserId, String content) throws IOException {
// 1. 从Redis获取目标用户所在节点
String targetNodeId = redisTemplate.opsForValue().get(
RedisKeyConstants.USER_NODE_MAP + ":" + targetUserId
);
if (targetNodeId == null) {
throw new RuntimeException("用户未连接");
}
// 2. 若目标节点是当前节点,直接发送;否则通过Redis转发
if (targetNodeId.equals(nodeId)) {
WebSocketSession session = localSessions.get(targetUserId);
if (session != null && session.isOpen()) {
session.sendMessage(new TextMessage(content));
}
} else {
// 构造跨节点消息(包含目标用户、内容、目标节点)
Map<String, String> crossMsg = new HashMap<>();
crossMsg.put("targetUserId", targetUserId);
crossMsg.put("content", content);
crossMsg.put("targetNodeId", targetNodeId);
// 发布到Redis频道
redisTemplate.convertAndSend(
RedisKeyConstants.MSG_CHANNEL,
objectMapper.writeValueAsString(crossMsg)
);
}
}
// 订阅Redis频道,接收其他节点的消息并转发给本地用户
private void subscribeToRedisChannel() {
redisTemplate.getConnectionFactory().getConnection().subscribe(
message -> {
try {
String payload = new String(message.getBody());
Map<String, String> crossMsg = objectMapper.readValue(payload, Map.class);
// 仅处理目标节点为当前节点的消息
if (crossMsg.get("targetNodeId").equals(nodeId)) {
String targetUserId = crossMsg.get("targetUserId");
String content = crossMsg.get("content");
WebSocketSession session = localSessions.get(targetUserId);
if (session != null && session.isOpen()) {
session.sendMessage(new TextMessage(content));
}
}
} catch (Exception e) {
e.printStackTrace();
}
},
RedisKeyConstants.MSG_CHANNEL.getBytes()
);
}
// 连接关闭时,清理会话和Redis映射
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
String userId = (String) session.getAttributes().get("userId");
localSessions.remove(userId);
// 删除Redis中的用户-节点映射
redisTemplate.delete(RedisKeyConstants.USER_NODE_MAP + ":" + userId);
}
}4.4 节点 ID 配置
在application.yml中配置节点 ID(也可启动时自动生成,确保唯一):
websocket:
node:
id: ${HOSTNAME:node-${random.uuid}} # 优先使用主机名,否则随机生成四、集群测试
- 启动多个 Spring Boot 实例(模拟集群节点),确保都连接到同一个 Redis。
- 客户端 1 连接节点 A,客户端 2 连接节点 B。
- 客户端 1 发送消息给客户端 2,消息流程:节点 A 接收消息,查询 Redis 发现客户端 2 在节点 B。节点 A 通过 Redis Pub/Sub 发布消息到MSG_CHANNEL。节点 B 订阅了该频道,接收消息并推送给客户端 2。
五、优化建议
- 会话过期清理:给 Redis 中的USER_NODE_MAP设置过期时间(如 30 分钟),并通过 WebSocket 心跳机制续约。
- 消息可靠性:Redis Pub/Sub 是 fire-and-forget 模式,若需可靠消息,可替换为 RabbitMQ 或 Kafka。
- 负载均衡:前端通过 Nginx 等负载均衡器连接 WebSocket 集群(需配置proxy_set_header Upgrade $http_upgrade;等参数支持 WebSocket)。
- 监控:通过 Redis 的NODE_INFO键监控节点状态,实现故障转移
到此这篇关于Spring Boot 中实现 WebSocket 集群处理的文章就介绍到这了,更多相关Spring Boot WebSocket 集群内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
Java 注解@PostConstruct的原理及最佳使用场景分析
@PostConstruct 是 Java 中非常实用的注解,尤其是在 Spring 等框架中,它使得开发者可以方便地在 Bean 初始化后执行额外的操作,本文给大家介绍@PostConstruct 的原理、使用场景及最佳实践,感兴趣的朋友一起看看吧2025-04-04
关于RedisTemplate之opsForValue的使用说明
这篇文章主要介绍了关于RedisTemplate之opsForValue的使用说明,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教2022-06-06
基于spring boot 1.5.4 集成 jpa+hibernate+jdbcTemplate(详解)
下面小编就为大家带来一篇基于spring boot 1.5.4 集成 jpa+hibernate+jdbcTemplate(详解)。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧2017-06-06


最新评论