SpringBoot集成netty实现websocket通信功能

 更新时间:2024年03月14日 09:11:44   作者:A尘埃  
Netty是一个高性能、异步事件驱动的网络应用框架,用于快速开发可维护的高性能协议服务器和客户端,WebSocket 是一种网络通信协议,相比传统的HTTP协议,本文给大家介绍了SpringBoot集成netty实现websocket通信功能,需要的朋友可以参考下

实现推送消息给指定的用户

一、依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>springboot-demo</artifactId>
        <groupId>com.et</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>netty</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.87.Final</version>
        </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.6.1</version>
        </dependency>
    </dependencies>
</project>

二、属性文件和启动类

server:
  port: 8088

三、Controller接口

@RestController
@RequestMapping("/push")
public class TestController{
	
	@Autowired
	PushMsgService pushMsgService
}

四、PushMsgService

public interface PushMsgService{
	
	//推送给指定用户
	void pushMsgToOne(String userId,String msg);

	//推送给所有用户
	void pushMsgToAll(String msg);
}
@Service
public class PushMsgServiceImpl implements PushMsgService{
	
	@Override
	public void pushMsgToOne(String userId, String msg){
		Channel channel = NettyConfig.getChannel(userId);
        if (Objects.isNull(channel)) {
            throw new RuntimeException("未连接socket服务器");
        }

        channel.writeAndFlush(new TextWebSocketFrame(msg));
	}

	@Override
	public void pushMsgToAll(String msg){
		NettyConfig.getChannelGroup().writeAndFlush(new TextWebSocketFrame(msg));
	}
}

五、NettyConfig

public class NettyConfig{
	
	//定义全局channel,管理所有的channel
	private static volatile ChannelGroup channelGroup = null;

	//存放请求ID与channel的对应关系
	private static volatile ConcurrentHashMap<String, Channel> channelMap = null;

	//定义两把锁
	private static final Object lock1 = new Object();
    private static final Object lock2 = new Object();

	public static ChannelGroup getChannelGroup() {
        if (null == channelGroup) {
            synchronized (lock1) {
                if (null == channelGroup) {
                    channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
                }
            }
        }
        return channelGroup;
    }

    public static ConcurrentHashMap<String, Channel> getChannelMap() {
        if (null == channelMap) {
            synchronized (lock2) {
                if (null == channelMap) {
                    channelMap = new ConcurrentHashMap<>();
                }
            }
        }
        return channelMap;
    }

    public static Channel getChannel(String userId) {
        if (null == channelMap) {
            return getChannelMap().get(userId);
        }
        return channelMap.get(userId);
    }
}

六、netty server

@Component
public class NettyServer {
    static final Logger log = LoggerFactory.getLogger(NettyServer.class);

    /**
     * 端口号
     */
    @Value("${webSocket.netty.port:8889}")
    int port;

    EventLoopGroup bossGroup;
    EventLoopGroup workGroup;

    @Autowired
    ProjectInitializer nettyInitializer;

    @PostConstruct
    public void start() throws InterruptedException {
        new Thread(() -> {
            bossGroup = new NioEventLoopGroup();
            workGroup = new NioEventLoopGroup();
            ServerBootstrap bootstrap = new ServerBootstrap();
            // bossGroup辅助客户端的tcp连接请求, workGroup负责与客户端之前的读写操作
            bootstrap.group(bossGroup, workGroup);
            // 设置NIO类型的channel
            bootstrap.channel(NioServerSocketChannel.class);
            // 设置监听端口
            bootstrap.localAddress(new InetSocketAddress(port));
            // 设置管道
            bootstrap.childHandler(nettyInitializer);

            // 配置完成,开始绑定server,通过调用sync同步方法阻塞直到绑定成功
            ChannelFuture channelFuture = null;
            try {
                channelFuture = bootstrap.bind().sync();
                log.info("Server started and listen on:{}", channelFuture.channel().localAddress());
                // 对关闭通道进行监听
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }

    /**
     * 释放资源
     */
    @PreDestroy
    public void destroy() throws InterruptedException {
        if (bossGroup != null) {
            bossGroup.shutdownGracefully().sync();
        }
        if (workGroup != null) {
            workGroup.shutdownGracefully().sync();
        }
    }
}

七、ProjectInitializer初始化,设置websocket handler

@Component
public class ProjectInitializer extends ChannelInitializer<SocketChannel> {

    /**
     * webSocket协议名
     */
    static final String WEBSOCKET_PROTOCOL = "WebSocket";

    /**
     * webSocket路径
     */
    @Value("${webSocket.netty.path:/webSocket}")
    String webSocketPath;
    @Autowired
    WebSocketHandler webSocketHandler;

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        // 设置管道
        ChannelPipeline pipeline = socketChannel.pipeline();
        // 流水线管理通道中的处理程序(Handler),用来处理业务
        // webSocket协议本身是基于http协议的,所以这边也要使用http编解码器
        pipeline.addLast(new HttpServerCodec());
        pipeline.addLast(new ObjectEncoder());
        // 以块的方式来写的处理器
        pipeline.addLast(new ChunkedWriteHandler());
        pipeline.addLast(new HttpObjectAggregator(8192));
        pipeline.addLast(new WebSocketServerProtocolHandler(webSocketPath, WEBSOCKET_PROTOCOL, true, 65536 * 10));
        // 自定义的handler,处理业务逻辑
        pipeline.addLast(webSocketHandler);
    }
}

八、WebSocketHandler

@Component
@ChannelHandler.Sharable
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    private static final Logger log = LoggerFactory.getLogger(NettyServer.class);

    /**
     * 一旦连接,第一个被执行
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        log.info("有新的客户端链接:[{}]", ctx.channel().id().asLongText());
        // 添加到channelGroup 通道组
        NettyConfig.getChannelGroup().add(ctx.channel());
    }

    /**
     * 读取数据
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        log.info("服务器收到消息:{}", msg.text());

        // 获取用户ID,关联channel
        JSONObject jsonObject = JSONUtil.parseObj(msg.text());
        String uid = jsonObject.getStr("uid");
        NettyConfig.getChannelMap().put(uid, ctx.channel());

        // 将用户ID作为自定义属性加入到channel中,方便随时channel中获取用户ID
        AttributeKey<String> key = AttributeKey.valueOf("userId");
        ctx.channel().attr(key).setIfAbsent(uid);

        // 回复消息
        ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器收到消息啦"));
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        log.info("用户下线了:{}", ctx.channel().id().asLongText());
        // 删除通道
        NettyConfig.getChannelGroup().remove(ctx.channel());
        removeUserId(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.info("异常:{}", cause.getMessage());
        // 删除通道
        NettyConfig.getChannelGroup().remove(ctx.channel());
        removeUserId(ctx);
        ctx.close();
    }

    /**
     * 删除用户与channel的对应关系
     */
    private void removeUserId(ChannelHandlerContext ctx) {
        AttributeKey<String> key = AttributeKey.valueOf("userId");
        String userId = ctx.channel().attr(key).get();
        NettyConfig.getChannelMap().remove(userId);
    }
}

测试:

postman创建websocket连接 ws://127.0.0.1:8889/webSocket,并发送消息{‘uid’:‘sss’}给服务端

在这里插入图片描述

打开浏览器,给用户sss推送消息 http://127.0.0.1:8088/push/sss

在这里插入图片描述

以上就是SpringBoot集成netty实现websocket通信功能的详细内容,更多关于SpringBoot netty websocket通信的资料请关注脚本之家其它相关文章!

相关文章

  • Java实现自定义自旋锁代码实例

    Java实现自定义自旋锁代码实例

    这篇文章主要介绍了Java实现自定义自旋锁代码实例,Java自旋锁是一种线程同步机制,它允许线程在获取锁时不立即阻塞,而是通过循环不断尝试获取锁,直到成功获取为止,自旋锁适用于锁竞争激烈但持有锁的时间很短的情况,需要的朋友可以参考下
    2023-10-10
  • 如何使用Idea中的 Deployment 实现打包自动部署

    如何使用Idea中的 Deployment 实现打包自动部署

    这篇文章主要介绍了使用Idea中的 Deployment 实现打包自动部署,本文通过图文并茂的形式给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2023-08-08
  • 简单了解Spring中常用工具类

    简单了解Spring中常用工具类

    这篇文章主要介绍了简单了解Spring中常用工具类,非常全面,具有一定参考价值,需要的朋友可以了解下。
    2017-10-10
  • JavaWeb仓库管理系统详解

    JavaWeb仓库管理系统详解

    这篇文章主要为大家详细介绍了JavaWeb仓库管理系统,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-09-09
  • Java垃圾回收器的方法和原理总结

    Java垃圾回收器的方法和原理总结

    本篇文章主要介绍了Java垃圾回收器的方法和原理总结,Java垃圾回收器是Java虚拟机的重要模块,具有一定的参考价值,有兴趣的可以了解一下。
    2016-12-12
  • 浅谈spring DI 依赖注入方式和区别

    浅谈spring DI 依赖注入方式和区别

    Spring框架对Java开发的重要性不言而喻,本文主要介绍了spring DI 依赖注入方式和区别,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-07-07
  • 基于java构造方法Vector修改元素源码分析

    基于java构造方法Vector修改元素源码分析

    本篇文章是关于ava构造方法Vector源码分析系列文章,本文主要介绍了Vector修改元素的源码分析,有需要的朋友可以借鉴参考下,希望可以有所帮助
    2021-09-09
  • java获取各种路径的基本方法

    java获取各种路径的基本方法

    这篇文章主要为大家详细介绍了java获取各种路径的基本方法,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2016-10-10
  • MyBatis配置文件元素示例详解

    MyBatis配置文件元素示例详解

    在MyBatis框架的核心配置文件中,<configuration>元素是配置文件的根元素,其他元素都要在<contiguration>元素内配置,这篇文章主要介绍了MyBatis配置文件元素,需要的朋友可以参考下
    2023-06-06
  • 谈谈Hashmap的容量为什么是2的幂次问题

    谈谈Hashmap的容量为什么是2的幂次问题

    这篇文章主要介绍了谈谈Hashmap的容量为什么是2的幂次问题,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-09-09

最新评论