SpringBoot如何集成Netty

 更新时间:2024年06月17日 17:01:19   作者:Sea-Man  
这篇文章主要介绍了SpringBoot如何集成Netty问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教

一、pom依赖

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.77.Final</version>
</dependency>
<dependency>
    <groupId>cn.hutool</groupId>
    <artifactId>hutool-all</artifactId>
    <version>5.5.8</version>
</dependency>

二、配置yml文件

server:
  port: 8001
  servlet:
    context-path: /netty
netty:
  url: 0.0.0.0  #0.0.0.0表示绑定任意ip
  port: 20004

三、服务端

package com.tlxy.lhn.controller.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class NettyServer {
    public static void main(String[] args) throws InterruptedException {
        //创建两个线程组bossGroup和workerGroup,含有的子线程NioEventLoop的个数默认是CPU的两倍
        //bossGroup只是处理连接请求,真正的和客户端业务处理,会交给workerGroup完成
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(1);

        try {
            //创建服务器端的启动对象
            ServerBootstrap bootstrap = new ServerBootstrap();
            //使用链式编程来配置参数
            bootstrap.group(bossGroup, workerGroup)//设置两个线程组
                    .channel(NioServerSocketChannel.class)//使用NioServerSocketChannel作为服务器的通道实现
                    //初始化服务器连接队列大小,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接
                    //多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel channel) throws Exception {
                            //对workerGroup的SocketChannel设置处理器
                            channel.pipeline().addLast(new NettyServerHandler());
                        }
                    });

            System.out.println("netty server start..");

            //绑定一个端口并且同步生成一个ChannelFuture异步对象,通过isDone()等方法可以判断异步事件的执行情况
            //启动服务器(并绑定的端口),bind是异步操作,sync方法是等待异步操作执行完毕
            ChannelFuture cf = bootstrap.bind(9000).sync();

            //给cf注册监听器,监听我们关心的事件
            cf.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    if (cf.isSuccess()) {
                        System.out.println("监听端口9000成功");
                    } else {
                        System.out.println("监听端口9000失败");
                    }
                }
            });
            //等待服务端监听端口关闭,closeFuture是异步操作
            //通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成,内部调用的是Object的wait()方法
            cf.channel().closeFuture().sync();

        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }


    }
}

NettyServer类中的

channel.pipeline().addLast(new NettyServerHandler());

对应以下的处理器。

package com.tlxy.lhn.controller.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));
        // 读取byteBuf
        // 业务处理
        // 回消息给客户端
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ByteBuf buf = Unpooled.copiedBuffer("HelloClient".getBytes(CharsetUtil.UTF_8));
        ctx.writeAndFlush(buf);
    }

    //只要Netty抛出错误就会执行,Netty断会开连接会抛出连接超时的错误
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.info("关闭通道");
        cause.printStackTrace();
        ctx.close();
    }
}

四、客户端

package com.tlxy.lhn.controller.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class NettyClient {
    public static void main(String[] args) throws InterruptedException {
        //客户端需要一个事件循环组
        NioEventLoopGroup group = new NioEventLoopGroup();

        try {
            //创建客户端启动对象
            //注意客户端使用的不是SocketBootstrap而是Bootstrap
            Bootstrap bootstrap = new Bootstrap();

            // 设置相关参数
            bootstrap.group(group) //设置线程组
                    .channel(NioSocketChannel.class)// 使用NioSocketChannel作为客户端的通道实现
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new NettyClientHandler());
                        }
                    });

            System.out.println("netty client start..");
            ChannelFuture cf = bootstrap.connect("127.0.0.1", 9000).sync();
            cf.channel().closeFuture().sync();
        }finally {
            group.shutdownGracefully();
        }


    }
}

NettyClient类中

ch.pipeline().addLast(new NettyClientHandler());

为处理器。

package com.tlxy.lhn.controller.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    /**
     * 客户端连接标识
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf buf = Unpooled.copiedBuffer("HelloServer".getBytes(CharsetUtil.UTF_8));
        ctx.writeAndFlush(buf);
    }

    //当通道建立后有事件时会触发,即服务端发送数据给客户端
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("收到服务端的消息是:" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("服务端地址是:" + ctx.channel().remoteAddress());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.info("关闭通道");
        cause.printStackTrace();
        ctx.close();
    }

}

五、粘包和拆包问题

客户端和服务端都是固定的框架,我们只需写处理器。

粘包和拆包问题,可以自己手写通过固定长度发送数据,或者使用Google的Protostuff。

<dependency>
    <groupId>com.dyuproject.protostuff</groupId>
    <artifactId>protostuff-api</artifactId>
    <version>1.0.8</version>
</dependency>
<dependency>
    <groupId>com.dyuproject.protostuff</groupId>
    <artifactId>protostuff-core</artifactId>
    <version>1.0.8</version>
</dependency>
<dependency>
    <groupId>com.dyuproject.protostuff</groupId>
    <artifactId>protostuff-runtime</artifactId>
    <version>1.0.8</version>
</dependency>
package com.tlxy.lhn.controller.netty;

import com.dyuproject.protostuff.LinkedBuffer;
import com.dyuproject.protostuff.ProtostuffIOUtil;
import com.dyuproject.protostuff.Schema;
import com.dyuproject.protostuff.runtime.RuntimeSchema;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class ProtostuffUtil {
    private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<Class<?>, Schema<?>>();

    private static <T> Schema<T> getSchema(Class<T> clazz) {
        @SuppressWarnings("unchecked")
        Schema<T> schema = (Schema<T>) cachedSchema.get(clazz);
        if (schema == null) {
            schema = RuntimeSchema.getSchema(clazz);
            if (schema != null) {
                cachedSchema.put(clazz, schema);
            }
        }
        return schema;
    }

    /**
     * 序列化
     *
     * @param obj
     * @return
     */
    public static <T> byte[] serializer(T obj) {
        @SuppressWarnings("unchecked")
        Class<T> clazz = (Class<T>) obj.getClass();
        LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
        try {
            Schema<T> schema = getSchema(clazz);
            return ProtostuffIOUtil.toByteArray(obj, schema, buffer);
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        } finally {
            buffer.clear();
        }
    }

    /**
     * 反序列化
     *
     * @param data
     * @param clazz
     * @return
     */
    public static <T> T deserializer(byte[] data, Class<T> clazz) {
        try {
            T obj = clazz.newInstance();
            Schema<T> schema = getSchema(clazz);
            ProtostuffIOUtil.mergeFrom(data, obj, schema);
            return obj;
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }

}

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • 关于jpa querydsl嵌套查询demo

    关于jpa querydsl嵌套查询demo

    这篇文章主要介绍了关于jpa querydsl 嵌套查询demo,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-05-05
  • SpringBoot快速设置拦截器并实现权限验证的方法

    SpringBoot快速设置拦截器并实现权限验证的方法

    本篇文章主要介绍了SpringBoot快速设置拦截器并实现权限验证的方法,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-01-01
  • Javaweb实现完整个人博客系统流程

    Javaweb实现完整个人博客系统流程

    这篇文章主要介绍了怎样用Java来实现一个完整的个人博客系统,我们通过实操上手的方式可以高效的巩固所学的基础知识,感兴趣的朋友一起来看看吧
    2022-03-03
  • springboot2.0 @Slf4j log 彩色日志配置输出到文件

    springboot2.0 @Slf4j log 彩色日志配置输出到文件

    这篇文章主要介绍了springboot2.0 @Slf4j log日志配置输出到文件(彩色日志),解决方式是使用了springboot原生自带的一个log框架,结合实例代码给大家讲解的非常详细,需要的朋友可以参考下
    2023-08-08
  • SpringBoot学习系列之MyBatis Plus整合封装的实例详解

    SpringBoot学习系列之MyBatis Plus整合封装的实例详解

    MyBatis-Plus是一款MyBatis的增强工具(简称MP),为简化开发、提高效率,这篇文章给大家介绍MyBatis Plus整合封装的实例详解,感兴趣的朋友跟随小编一起看看吧
    2020-08-08
  • Spring中的编程式事务和声明式事务

    Spring中的编程式事务和声明式事务

    Spring框架中,事务管理可以通过编程式事务和声明式事务两种方式实现,编程式事务通过手动编码控制事务的开始、提交和回滚,允许开发者精确控制事务,但增加了代码复杂度,声明式事务则通过@EnableTransactionManagement注解启用事务管理
    2024-11-11
  • 记一次线程爆满导致服务器崩溃的问题排查及解决

    记一次线程爆满导致服务器崩溃的问题排查及解决

    这篇文章主要介绍了记一次线程爆满导致服务器崩溃的问题排查及解决,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-10-10
  • JavaSE系列基础包装类及日历类详解

    JavaSE系列基础包装类及日历类详解

    这篇文章主要介绍的是JavaSE中常用的基础包装类以及日历类的使用详解,文中的示例代码简洁易懂,对我们学习JavaSE有一定的帮助,感兴趣的小伙伴快来跟随小编一起学习吧
    2021-12-12
  • 深入了解Java中的过滤器Filter和监听器Listener

    深入了解Java中的过滤器Filter和监听器Listener

    这篇文章主要为大家详细介绍了Java中的过滤器Filter和监听器Listener的使用以及二者的区别,文中的示例代码讲解详细,需要的可以参考一下
    2022-06-06
  • 解决Shiro 处理ajax请求拦截登录超时的问题

    解决Shiro 处理ajax请求拦截登录超时的问题

    这篇文章主要介绍了解决Shiro 处理ajax请求拦截登录超时的问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-09-09

最新评论