java中消息推送功能的关键实现代码

 更新时间:2025年09月30日 09:18:21   作者:hqxstudying  
在日常开发中,消息推送是非常典型的业务需求,下面这篇文章主要介绍了java中消息推送功能的关键实现代码,文中通过代码介绍的非常详细,需要的朋友可以参考下

前言

在 Java 项目中实现消息推送服务,后端核心需要解决实时性可靠性扩展性(集群支持)和连接管理四大问题。以下从技术选型、核心架构、关键实现和注意事项四个方面展开说明:

一、技术选型:根据场景选协议

消息推送的核心是服务器主动向客户端发送数据,需根据实时性、客户端类型(Web/APP/ 物联网设备)选择合适的通信协议:

协议 / 方案适用场景优势劣势Java 技术栈支持
WebSocketWeb 端实时通信(如聊天、通知)全双工、低延迟、长连接部分老旧浏览器不支持Spring WebSocket、Netty、Tomcat 原生
MQTT物联网设备(低带宽、不稳定网络)轻量、支持 QoS(消息质量等级)需额外部署 MQTT broker(如 EMQX)Eclipse Paho、Spring Integration
长轮询(Long Polling)兼容性要求高的场景(如老浏览器)实现简单、兼容性好延迟较高、服务器资源消耗大Servlet + 异步处理
Server-Sent Events (SSE)服务器单向推送(如实时日志)轻量、仅服务器向客户端推送不支持客户端向服务器发送数据Spring WebFlux、原生 Servlet

二、核心架构:后端模块设计

无论选择哪种协议,消息推送服务的后端架构通常包含以下核心模块:

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   业务系统API   │───>│   消息路由模块   │───>│   连接管理模块   │───> 客户端
└─────────────────┘    └─────────────────┘    └─────────────────┘
        │                       │                       │
        ▼                       ▼                       ▼
┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│  离线消息存储    │<───│  集群通信模块    │<───│  会话管理模块    │
└─────────────────┘    └─────────────────┘    └─────────────────┘
  • 业务系统 API:提供接口给业务系统(如订单系统、通知系统)调用,触发消息推送(例如:pushMessage(String userId, String content))。

  • 会话管理模块:维护客户端与服务器的连接会话(Session),核心是用户 ID 与连接的映射关系

    • 单机:用ConcurrentHashMap<String, Session>存储(key 为用户 ID,value 为连接会话)。
    • 集群:用 Redis 存储分布式会话(需序列化 Session 信息,或仅存储 “用户 ID - 节点 IP” 映射)。
  • 连接管理模块:处理客户端的连接建立、断开、心跳检测。

    • 例如 WebSocket 的onOpen()(建立连接时绑定用户 ID 与 Session)、onClose()(移除映射)、onError()(异常处理)。
  • 消息路由模块:根据目标用户 ID,找到对应的连接会话并发送消息。

    • 单机:直接从本地ConcurrentHashMap获取 Session 发送。
    • 集群:通过 Redis Pub/Sub 广播消息,所有节点收到后检查本地是否有目标用户的连接,有则发送。
  • 离线消息模块:当用户不在线时,将消息暂存(如 MySQL、MongoDB 或 Kafka),待用户上线后拉取。

  • 集群通信模块:解决多节点部署时的消息同步问题(如 Redis Pub/Sub、RabbitMQ 的 Fanout 交换机)。

三、关键实现:以 WebSocket + Spring Boot 为例

以最常用的 Web 端实时推送为例,基于 Spring Boot + WebSocket + Redis(集群支持)实现核心流程:

1. 依赖配置(Maven)

<!-- WebSocket核心依赖 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!-- Redis(集群通信与会话存储) -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

2. WebSocket 配置(连接建立与处理器)

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
    @Autowired
    private MessageHandler messageHandler; // 自定义消息处理器

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        // 配置WebSocket端点:客户端通过ws://ip:port/ws?token=xxx连接
        registry.addHandler(messageHandler, "/ws")
                .setAllowedOrigins("*") // 允许跨域(生产环境需限制域名)
                .addInterceptors(new HandshakeInterceptor() {
                    // 握手前验证用户(如Token解析用户ID)
                    @Override
                    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, 
                                                  WebSocketHandler wsHandler, Map<String, Object> attributes) {
                        String token = ((ServletServerHttpRequest) request).getServletRequest().getParameter("token");
                        String userId = parseToken(token); // 解析Token获取用户ID
                        if (userId == null) {
                            response.setStatusCode(HttpStatus.UNAUTHORIZED);
                            return false; // 验证失败,拒绝连接
                        }
                        attributes.put("userId", userId); // 存储用户ID到属性中
                        return true;
                    }

                    @Override
                    public void afterHandshake(...) {}
                });
    }
}

3. 消息处理器(连接管理与会话绑定)

@Component
public class MessageHandler extends TextWebSocketHandler {
    // 本地会话映射:用户ID -> WebSocket会话(单机用)
    private final Map<String, WebSocketSession> localSessions = new ConcurrentHashMap<>();
    @Autowired
    private StringRedisTemplate redisTemplate; // Redis操作模板
    @Autowired
    private OfflineMessageService offlineMessageService; // 离线消息服务

    // 连接建立时:绑定用户ID与Session
    @Override
    public void afterConnectionEstablished(WebSocketSession session) {
        String userId = (String) session.getAttributes().get("userId");
        localSessions.put(userId, session);
        
        // 1. 拉取离线消息并推送
        List<Message> offlineMessages = offlineMessageService.getByUserId(userId);
        for (Message msg : offlineMessages) {
            sendMessage(session, msg.getContent());
        }
        offlineMessageService.deleteByUserId(userId); // 清除已推送的离线消息
        
        // 2. 集群:将用户ID与当前节点IP存入Redis(便于其他节点定位)
        redisTemplate.opsForValue().set("user:session:" + userId, getLocalIp());
    }

    // 连接关闭时:移除会话映射
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
        String userId = (String) session.getAttributes().get("userId");
        localSessions.remove(userId);
        redisTemplate.delete("user:session:" + userId); // 集群:删除Redis映射
    }

    // 发送消息到指定用户(核心方法)
    public void pushToUser(String userId, String content) {
        // 1. 先检查本地是否有该用户的连接
        WebSocketSession session = localSessions.get(userId);
        if (session != null && session.isOpen()) {
            sendMessage(session, content);
            return;
        }

        // 2. 集群:检查用户是否连接在其他节点
        String targetNodeIp = redisTemplate.opsForValue().get("user:session:" + userId);
        if (targetNodeIp != null) {
            // 发布消息到Redis频道,目标节点订阅后发送
            redisTemplate.convertAndSend("push:channel", 
                JSON.toJSONString(new PushMessage(userId, content)));
            return;
        }

        // 3. 用户不在线:存入离线消息
        offlineMessageService.save(new Message(userId, content, LocalDateTime.now()));
    }

    // 实际发送消息(封装异常处理)
    private void sendMessage(WebSocketSession session, String content) {
        try {
            session.sendMessage(new TextMessage(content));
        } catch (IOException e) {
            log.error("消息发送失败", e);
        }
    }
}

4. 集群支持:Redis Pub/Sub 订阅

@Component
public class RedisMessageListener {
    @Autowired
    private MessageHandler messageHandler;

    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        // 订阅推送频道,接收其他节点的消息
        container.addMessageListener((message, pattern) -> {
            String json = new String(message.getBody());
            PushMessage pushMsg = JSON.parseObject(json, PushMessage.class);
            // 调用本地消息处理器发送(此时用户应在当前节点)
            messageHandler.pushToUser(pushMsg.getUserId(), pushMsg.getContent());
        }, new ChannelTopic("push:channel"));
        return container;
    }
}

5. 业务调用接口

@RestController
@RequestMapping("/api/message")
public class MessageController {
    @Autowired
    private MessageHandler messageHandler;

    // 业务系统调用此接口触发推送
    @PostMapping("/push")
    public Result push(@RequestBody PushRequest request) {
        messageHandler.pushToUser(request.getUserId(), request.getContent());
        return Result.success();
    }
}

四、注意事项

  • 安全性

    • 连接建立时必须验证用户身份(如 Token、Session),防止未授权连接。
    • 生产环境需限制 WebSocket 的跨域来源(setAllowedOrigins不要用*)。
  • 可靠性

    • 实现消息确认机制(客户端收到消息后回复 ACK,服务器未收到则重试)。
    • 离线消息需持久化(建议用 Kafka 或数据库,支持消息过期清理)。
  • 性能与扩展

    • 长连接数量大时,用 Netty 替代 Tomcat 原生 WebSocket(Netty 的 NIO 模型更高效)。
    • 集群部署时,通过 Redis 或 MQ 实现消息广播,避免 “消息孤岛”。
  • 监控与运维

    • 监控连接数、消息发送成功率、节点负载(如用 Prometheus + Grafana)。
    • 实现连接心跳检测(定期发送 ping 帧,超时未响应则主动断开)。

总结

后端消息推送服务的核心是 **“连接管理”+“消息路由”**,小规模场景可用 Spring WebSocket + 本地会话;中大规模集群需结合 Redis 实现分布式会话与跨节点通信;物联网场景优先选 MQTT 协议。根据实时性和规模需求,可逐步迭代优化(从单机到集群,从基础功能到可靠性保障)。

到此这篇关于java中消息推送功能的文章就介绍到这了,更多相关java消息推送功能内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • JavaBean和Map转换封装类的方法

    JavaBean和Map转换封装类的方法

    下面小编就为大家带来一篇JavaBean和Map转换封装类的方法。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2016-10-10
  • Java中的length和length()深入分析

    Java中的length和length()深入分析

    java中的length属性是针对数组说的,比如说你声明了一个数组,想知道这个数组的长度则用到了length这个属性。java中的length()方法是针对字符串String说的,如果想看这个字符串的长度则用到length()这个方法。这篇文章将介绍几个关于Java数组的关键概念。
    2016-11-11
  • mybatis 连接mysql数据库 tinyint 为boolean类型详解

    mybatis 连接mysql数据库 tinyint 为boolean类型详解

    这篇文章主要介绍了mybatis 连接mysql数据库 tinyint 为boolean类型详解,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-11-11
  • Maven基础:错误对应:was cached in the local repository的解决

    Maven基础:错误对应:was cached in the local&nbs

    这篇文章主要介绍了Maven基础:错误对应:was cached in the local repository的解决方案,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2025-03-03
  • SpringBoot错误处理流程深入详解

    SpringBoot错误处理流程深入详解

    在项目开发中出现异常时很平常不过的事情,我们处理异常也有很多种方式。本文将详细为大家讲解SpringBoot实现异常处理几种方法,感兴趣的可以学习一下
    2022-10-10
  • JDK version和class file version(Class编译版本号)对应关系解读

    JDK version和class file version(Class编译版本号)对应关系解读

    这篇文章主要介绍了JDK version和class file version(Class编译版本号)对应关系,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-07-07
  • Java中对于双属性枚举的使用案例

    Java中对于双属性枚举的使用案例

    今天小编就为大家分享一篇关于Java中对于双属性枚举的使用案例,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
    2018-12-12
  • 一文梳理Java 8后的新功能

    一文梳理Java 8后的新功能

    Java 8是Java自Java 5(发布于2004年)之后的最重要的版本,下面这篇文章主要给大家介绍了关于Java8后新功能的相关资料,文中通过实例代码介绍的非常详细,需要的朋友可以参考下
    2022-02-02
  • spring装配bean的3种方式总结

    spring装配bean的3种方式总结

    这篇文章主要给大家介绍了关于spring装配bean的3种方式,文中通过示例代码介绍的非常详细,对大家的学习或者使用Spring具有一定的参考学习价值,需要的朋友们下面来一起学习学习吧
    2019-03-03
  • Java构建菜单树的实现示例

    Java构建菜单树的实现示例

    本文主要介绍了Java构建菜单树的实现示例,像一级菜单,二级菜单,三级菜单甚至更多层级的菜单,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-05-05

最新评论