Springboot+Stomp协议实现聊天功能

 更新时间:2024年02月11日 09:34:26   作者:suyukangchen  
本示例实现一个功能,前端通过websocket发送消息给后端服务,后端服务接收到该消息时,原样将消息返回给前端,前端技术栈html+stomp.js,后端SpringBoot,需要的朋友可以参考下

前端代码

这里我对Stomp.js进行了一个简单的封装,写在stomp-client.js里面

/**
 * 对 stomp 客户端进行封装
 */

var client;
var subscribes = [];
var errorTimes = 0;

var endpoint = "/ws";

/**
 * 建立websocket连接
 * @param {Function} onConnecting 开始连接时的回调
 * @param {Function} onConnected 连接成功回调
 * @param {Function} onError 连接异常或断开回调
 */
function connect(onConnecting, onConnected, onError) {
    onConnecting instanceof Function && onConnecting();
    var sock = new SockJS(endpoint);
    client = Stomp.over(sock);
    console.log("ws: start connect to " + endpoint);
    client.connect({}, function (frame) {
        errorTimes = 0;
        console.log('connected: ' + frame);
        // 连接成功后重新订阅
        subscribes.forEach(function (item) {
            client.subscribe(item.destination, function (resp) {
                console.debug("ws收到消息: ", resp);
                item.cb(JSON.parse(resp.body));
            });
        });
        onConnected instanceof Function && onConnected();
    }, function (err) {
        errorTimes = errorTimes > 8 ? 0 : errorTimes;
        var nextTime = ++errorTimes * 3000;
        console.warn("与服务器断开连接," + nextTime + " 秒后重新连接", err);
        setTimeout(function () {
            console.log("尝试重连……");
            connect(onConnecting, onConnected, onError);
        }, nextTime);
        onError instanceof Function && onError();
    });
}

/**
 * 订阅消息,若当前未连接,则会在连接成功后自动订阅
 *
 * 注意,为防止重连导致重复订阅,请勿使用匿名函数做回调
 *
 * @param {String} destination 目标
 * @param {Function} cb 回调
 */
function subscribe(destination, cb) {
    var exist = subscribes.filter(function (sub) {
        return sub.destination === destination && sub.cb === cb
    });
    // 防止重复订阅
    if (exist && exist.length) {
        return;
    }
    // 记录所有订阅,在连接成功时统一处理
    subscribes.push({
        destination: destination,
        cb: cb
    });
    if (client && client.connected) {
        client.subscribe(destination, function (resp) {
            console.debug("ws收到消息: ", resp);
            cb instanceof Function && cb(JSON.parse(resp.body));
        });
    } else {
        console.warn("ws未连接,暂时无法订阅:" + destination)
    }
}

/**
 * 发送消息
 * @param {String} destination 目标
 * @param {Object} msg 消息体对象
 */
function send(destination, msg) {
    if (!client) {
        console.error("客户端未连接,无法发送消息!")
    }
    client.send(destination, {}, JSON.stringify(msg));
}

window.onbeforeunload = function () {
    // 当窗口关闭时断开连接
    if (client && client.connected) {
        client.disconnect(function () {
            console.log("websocket disconnected ");
        });
    }
};

前端的html页面index.html如下:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>STOMP</title>
</head>
<body>
<h1 id="tip">Welcome!</h1>
<p>状态: <span id="status"></span></p>
<input type="text" id="content" placeholder="请输入要发送的消息"> <br>
<button onclick="sendTextMsg()">发送</button>
<ul id="ul">
</ul>
<script th:src="@{lib/sockjs.min.js}"></script>
<script th:src="@{lib/stomp.min.js}"></script>
<script th:src="@{stomp-client.js}"></script>
<script>
    connect(function () {
        statusChange("连接中...");
    }, function () {
        statusChange("在线");
        // 注意,为防止重连导致重复订阅,请勿使用匿名函数做回调
        subscribe("/user/topic/subNewMsg", onNewMsg);
    }, function () {
        statusChange("离线");
    });

    function onNewMsg(msg) {
        var li = document.createElement("li");
        li.innerText = msg.content;
        document.getElementById("ul").appendChild(li);
    }

    function sendTextMsg() {
        var content = document.getElementById("content").value;
        var msg = {
            msgType: 1,
            content: content
        };
        send("/app/echo", msg);
    }

    function statusChange(status) {
        document.getElementById("status").innerText = status;
    }
</script>
</body>
</html>

后端代码

依赖引入,主要引入下面的包,其它的包略过

 <dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-websocket</artifactId>
 </dependency>

配置类

@Slf4j
@Setter
@Configuration
@EnableWebSocketMessageBroker
@ConfigurationProperties(prefix = "websocket")
@RequiredArgsConstructor(onConstructor_ = {@Autowired})
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer, ApplicationListener<BrokerAvailabilityEvent> {
    private final BrokerConfig brokerConfig;
    private String[] allowOrigins;

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        // 继承DefaultHandshakeHandler并重写determineUser方法,可以自定义如何确定用户
        // 添加方法:registry.addEndpoint("/ws").setHandshakeHandler(handshakeHandler)
        registry.addEndpoint("/ws")
                .setAllowedOrigins(allowOrigins)
                .withSockJS();
    }

    /**
     * 配置消息代理
     */
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.setApplicationDestinationPrefixes("/app");

        if (brokerConfig.isUseSimpleBroker()) {
            // 使用 SimpleBroker
            // 配置前缀, 有这些前缀的消息会路由到broker
            registry.enableSimpleBroker("/topic", "/queue")
                    //配置stomp协议里, server返回的心跳
                    .setHeartbeatValue(new long[]{10000L, 10000L})
                    //配置发送心跳的scheduler
                    .setTaskScheduler(new DefaultManagedTaskScheduler());
        } else {
            // 使用外部 Broker
            // 指定前缀,有这些前缀的消息会路由到broker
            registry.enableStompBrokerRelay("/topic", "/queue")
                    // 广播用户目标,如果要推送的用户不在本地,则通过 broker 广播给集群的其他成员
                    .setUserDestinationBroadcast("/topic/log-unresolved-user")
                    // 用户注册广播,一旦有用户登录,则广播给集群中的其他成员
                    .setUserRegistryBroadcast("/topic/log-user-registry")
                    // 虚拟地址
                    .setVirtualHost(brokerConfig.getVirtualHost())
                    // 用户密码
                    .setSystemLogin(brokerConfig.getUsername())
                    .setSystemPasscode(brokerConfig.getPassword())
                    .setClientLogin(brokerConfig.getUsername())
                    .setClientPasscode(brokerConfig.getPassword())
                    // 心跳间隔
                    .setSystemHeartbeatSendInterval(10000)
                    .setSystemHeartbeatReceiveInterval(10000)
                    // 使用 setTcpClient 以配置多个 broker 地址,setRelayHost/Port 只能配置一个
                    .setTcpClient(createTcpClient());
        }
    }

    /**
     * 创建 TcpClient 工厂,用于配置多个 broker 地址
     */
    private ReactorNettyTcpClient<byte[]> createTcpClient() {
        return new ReactorNettyTcpClient<>(
                // BrokerAddressSupplier 用于获取中继地址,一次只使用一个,如果该中继出错,则会获取下一个
                client -> client.addressSupplier(brokerConfig.getBrokerAddressSupplier()),
                new StompReactorNettyCodec());
    }

    @Override
    public void onApplicationEvent(BrokerAvailabilityEvent event) {
        if (!event.isBrokerAvailable()) {
            log.warn("stomp broker is not available!!!!!!!!");
        } else {
            log.info("stomp broker is available");
        }
    }
}

消息处理

@Slf4j
@Controller
@RequiredArgsConstructor(onConstructor_ = {@Autowired})
public class StompController {
    private final SimpMessageSendingOperations msgOperations;
    private final SimpUserRegistry simpUserRegistry;

    /**
     * 回音消息,将用户发来的消息内容加上 Echo 前缀后推送回客户端
     */
    @MessageMapping("/echo")
    public void echo(Principal principal, Msg msg) {
        String username = principal.getName();
        msg.setContent("Echo: " + msg.getContent());
        msgOperations.convertAndSendToUser(username, "/topic/subNewMsg", msg);
        int userCount = simpUserRegistry.getUserCount();
        int sessionCount = simpUserRegistry.getUser(username).getSessions().size();
        log.info("当前本系统总在线人数: {}, 当前用户: {}, 该用户的客户端连接数: {}", userCount, username, sessionCount);
    }
}

实现效果

报文分析

开启调试模式,我们根据报文来分析一下前后端互通的报文

握手

客户端请求报文如下

GET ws://localhost:8025/ws/035/5hy4avgm/websocket HTTP/1.1
Host: localhost:8025
Connection: Upgrade
Pragma: no-cache
Cache-Control: no-cache
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.5735.289 Safari/537.36
Upgrade: websocket
Origin: http://localhost:8025
Sec-WebSocket-Version: 13
Accept-Encoding: gzip, deflate, br
Accept-Language: zh-CN,zh;q=0.9
Cookie: 略
Sec-WebSocket-Key: PlMHmdl2JRzDAVk3feOaeA==
Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits

服务端响应握手请求

HTTP/1.1 101
Upgrade: websocket
Connection: upgrade
Sec-WebSocket-Accept: 9CKY8n1j/cHoKsWmpmX4pNlQuZg=
Sec-WebSocket-Extensions: permessage-deflate;client_max_window_bits=15
X-Content-Type-Options: nosniff
X-XSS-Protection: 1; mode=block
Cache-Control: no-cache, no-store, max-age=0, must-revalidate
Pragma: no-cache
Expires: 0
X-Frame-Options: DENY
Date: Thu, 08 Feb 2024 06:58:28 GMT

stomp报文分析

在浏览器消息一栏,我们可以看到长连接过程中通信的报文

下面来简单分析一下stomp的报文

客户端请求连接

其中\n表示换行

[
  "CONNECT\naccept-version:1.1,1.0\nheart-beat:10000,10000\n\n\u0000"
]

可以看到请求连接的命令是CONNECT,连接报文里面还包含了心跳的信息

服务端返回连接成功

[
  "CONNECTED\nversion:1.1\nheart-beat:10000,10000\nuser-name:admin\n\n\u0000"
]

CONNECTED是服务端连接成功的命令,报文中也包含了心跳的信息

客户端订阅

订阅的目的地是:/user/topic/subNewMsg

["SUBSCRIBE\nid:sub-0\ndestination:/user/topic/subNewMsg\n\n\u0000"]

客户端发送消息

发送的目的地是:/app/echo

[
  "SEND\ndestination:/app/echo\ncontent-length:35\n\n{\"msgType\":1,\"content\":\"你好啊\"}\u0000"
]

服务端响应消息

响应的目的地是:/user/topic/subNewMsg,当订阅了这个目的地的,方法,将会被回调

[
  "MESSAGE\ndestination:/user/topic/subNewMsg\ncontent-type:application/json;charset=UTF-8\nsubscription:sub-0\nmessage-id:5hy4avgm-1\ncontent-length:41\n\n{\"content\":\"Echo: 你好啊\",\"msgType\":1}\u0000"
]

心跳报文

可以看到,约每隔10S,客户端和服务端都有一次心跳报文,发送的报文内容为一个回车。

[\n]

项目链接:https://gitee.com/syk1234/stomp-demo.git

以上就是Springboot+Stomp协议实现聊天功能的详细内容,更多关于Springboot+Stomp聊天的资料请关注脚本之家其它相关文章!

相关文章

  • Springboot集成Protobuf的流程步骤

    Springboot集成Protobuf的流程步骤

    在以往的项目中进行网络通信和数据交换的应用场景中,最经常使用的技术便是json或xml,但是今天在介绍一个Google的力作protobuf作为数据交换格式,文中给大家介绍了Springboot集成Protobuf的流程步骤,需要的朋友可以参考下
    2024-03-03
  • 详解Java如何有效避免空指针

    详解Java如何有效避免空指针

    空指针,也就是NullPointerException 简称NPE的,怕一下子写出NPE,部分的伙伴看不懂,索性就改成了空指针,在实际的开发中,我们最讨厌的就是遇到空指针了,业务跑着跑着发现了空指针,所以本文详细介绍了Java如何有效的避免空指针,需要的朋友可以参考下
    2023-12-12
  • SpringBoot实现自定义事件的方法详解

    SpringBoot实现自定义事件的方法详解

    这篇文章将用实例来和大家介绍一下如何在SpringBoot中自定义事件来使用观察者模式。文中的示例代码讲解详细,对我们学习SpringBoot有一定的帮助,需要的可以参考一下
    2022-06-06
  • Spring DI依赖注入过程解析

    Spring DI依赖注入过程解析

    依赖注入是由“依赖”和“注入”两个词汇组合而成,那么我们再一次顺藤摸瓜,分别分析这两个词语,这篇文章主要介绍了Spring DI依赖注入详解,需要的朋友可以参考下
    2022-11-11
  • Java中的synchronized重量级锁解析

    Java中的synchronized重量级锁解析

    这篇文章主要介绍了Java中的synchronized重量级锁解析,内核需要去申请这个互斥量,必须要进入内核态,也就是这里需要用户态,内核态的切换,状态的切换,开销是比较大的,这就是重型锁的一个弊端,需要的朋友可以参考下
    2024-01-01
  • Java获取中文拼音、中文首字母缩写和中文首字母的示例

    Java获取中文拼音、中文首字母缩写和中文首字母的示例

    本文主要介绍了Java获取中文拼音、中文首字母缩写和中文首字母,具有一定的参考价值,感兴趣的小伙伴们可以参考一下。
    2016-10-10
  • Java实现多线程下载和断点续传

    Java实现多线程下载和断点续传

    这篇文章主要为大家详细介绍了Java实现多线程下载和断点续传,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-06-06
  • Spring Boot 通过CORS实现跨域问题

    Spring Boot 通过CORS实现跨域问题

    这篇文章主要介绍了Spring Boot 通过CORS实现跨域,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-09-09
  • Spring boot 运用策略模式实现避免多次使用if的操作代码

    Spring boot 运用策略模式实现避免多次使用if的操作代码

    这篇文章主要介绍了Spring boot 运用策略模式实现,避免多次使用if,使用策略模式后,新加一种支付策略时,只需要在策略枚举中添加新加的策略信息,外加一个策略类即可,而不再需要添加新的if判断,需要的朋友可以参考下
    2022-08-08
  • java 并发线程个数的如何确定

    java 并发线程个数的如何确定

    这篇文章主要介绍了java 并发线程个数的如何确定,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-12-12

最新评论