Netty开发及粘包实战解决分析

 更新时间:2024年02月16日 10:01:08   作者:KerryWu  
这篇文章主要为大家介绍了Netty开发及粘包实战解决分析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

1. Netty介绍

Netty是一款开源的Java网络编程框架,广泛应用于很多高流量的服务器端应用程序:

  • 异步和事件驱动:Netty基于NIO(非阻塞I/O)构建,操作都是异步回调来触发事件,如连接建立、数据到达等。
  • 高性能:Netty的一大优点就是高性能。它的设计能够让你最大限度地利用现代的多核硬件。
  • 灵活的协议支持:Netty支持各种协议,包括TCP、UDP、HTTP/HTTPS、Unix Socket、WebSockets等。
  • 零拷贝:Netty支持“零拷贝”,这可以减少不必要的系统调用,显著提高数据处理性能。

Netty 目前最新版本是 4.1.95Final

很久之前 Netty就发布了 5 的测试版本,市场上都有很多介绍 Netty5 的书在卖了,但可惜问题太多,最终废弃了,目前依然只维护 4 的版本。

1.1. 组件

1.1.1. EventLoopGroup

EventLoopGroup 是一个线程池,用于管理和调度 EventLoop 对象。在 Netty 中,每个 EventLoopGroup 有一个或多个 EventLoop,用于处理连接请求和 I/O 操作,而每个EventLoop是单线程的。

所以Netty可以通过EventLoopGroup的构造调参,来实现不同的Reactor模型:

(1)既可也是单Reactor单线程模型:

EventLoopGroup group = new NioEventLoopGroup(1);
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)

(2)也可以是 主从Reactor多线程模型:

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup(n);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)

 主从分工

  • BossEventLoopGroup:负责接收客户端的连接请求。并将连接请求分配给workerEventLoopGroup中的某个EventLoop进行处理。BossGroup中通常只需一个EventLoop;
  • WorkerEventLoopGroup:负责处理服务器端的连接和数据读写操作。每个EventLoop都绑定在一个具体的线程上,在运行过程中只处理该线程所监听的IO事件。 workerGroup通常需要多个EventLoop。

1.1.2. EventLoop

EventLoop 则是事件循环的核心,负责监听和处理 Channel 中的事件和 I/O 操作。在 EventLoopGroup 中,每个 EventLoop 都是独立的,可以并发地处理多个 Channel 上的事件和 I/O 操作。

1.1.3. Channel 和 ByteBuf

定义

  • Channel:代表了一个网络通道,可以用于进行网络通信。通过使用 Channel,我们可以连接到远程服务器、发送和接收数据等。
  • ByteBuf:则是用于管理和操作数据的缓冲区,通过使用 ByteBuf,我们可以进行数据的读、写、复制、切片、合并等操作。

搭配使用

在 Netty 中,Channel 和 ByteBuf 是紧密结合的,通常一次数据传输会涉及到两个 Channel 和两个 ByteBuf 对象,分别代表了发送端和接收端的数据缓冲区。以下是 Channel 和 ByteBuf 的搭配使用流程:

  • 创建 Channel:首先,我们需要创建一个 Channel 对象,用于表示一个网络通道。可以通过 Bootstrap 或 ServerBootstrap 类来创建 Channel 对象,并配置其参数和属性。
  • 写入数据:当需要向远程服务器发送数据时,我们需要先将数据写入到 ByteBuf 对象中,然后将 ByteBuf 对象写入到 Channel 对象中。在写入数据时,可以通过 write() 或 writeAndFlush() 方法来实现。
  • 读取数据:当远程服务器发送数据时,我们需要通过 Channel 对象来读取数据。读取数据时,Channel 对象会将数据存储到 ByteBuf 对象中,我们可以通过 read() 方法来获取数据或数据大小。在读取数据之后,我们需要及时释放 ByteBuf 对象,以便避免内存泄漏和内存溢出等问题。
  • 释放资源:当数据传输完成后,我们需要释放 Channel 和 ByteBuf 对象的资源。在 Netty 中,Channel 和 ByteBuf 对象都需要显式地释放资源,以避免内存泄漏和内存溢出等问题。可以通过 release() 方法来释放 ByteBuf 对象,通过 close() 方法来释放 Channel 对象。

1.1.4. ChannelPipeline 和 Channel

定义

  • Channel:对象表示一个通信通道,可以进行数据的读写和事件的触发等操作。
  • ChannelPipeline:则是一个事件处理器的链表,用于处理 Channel 中的事件和数据。

每个 Channel 都有一个关联的 ChannelPipeline 对象,当有事件发生时,Netty 会将事件从 Channel 中传递到 ChannelPipeline 中,然后按照顺序依次触发各个事件处理器 ChannelHandler 的逻辑。当事件处理完毕后,Netty 会将处理结果返回到 Channel 中,以便进行数据的读写等操作。

在 ChannelPipeline 中,可以添加多个事件处理器,用于处理不同类型的事件和数据。例如,可以添加一个消息解码器、一个消息编码器、一个业务逻辑处理器等。每个事件处理器都可以进行特定的逻辑处理,并将处理结果传递给下一个事件处理器。

1.2. 网络协议

Netty是一个非常强大和灵活的网络编程框架,它支持多种通信协议。以下是一些Netty支持的通信协议:

  • TCP/IP 和 UDP/IP:Netty 提供了底层的网络通信支持,可以构建基于TCP/IP或UDP/IP的应用。
  • HTTP/HTTPS and HTTP/2:Netty 提供了HTTP、HTTPS以及HTTP/2的高级支持。
  • WebSocket:Netty 支持 WebSocket,允许 Web 浏览器和服务器之间进行全双工通信。
  • Google Protobuf:Netty 为 Google 的 Protobuf 序列化库提供了支持。
  • SSL/TLS:通过JDK的Secure Socket Extension (JSSE),Netty 支持 SSL/TLS 实现安全通信。
  • Unix Domain Socket:从 Netty 4.1版本开始,Netty也开始支持 Unix Domain Socket。

因为 Netty 支持的网络协议丰富,所以当有非Http协议网络通信的需求时,大家第一时间会想到 Netty。

2. 代码示例

2.1. 基于tcp协议

2.1.1.服务端

pom

<dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.63.Final</version>
        </dependency>

服务端

@Component
public class NettyServer {
    // 创建两个线程组,分别用于接收客户端连接和处理网络IO操作
    private final EventLoopGroup bossGroup = new NioEventLoopGroup();
    private final EventLoopGroup workerGroup = new NioEventLoopGroup();
    @PostConstruct
    public void start() throws InterruptedException {
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
                // 指定使用 NioServerSocketChannel 作为通道实现
                .channel(NioServerSocketChannel.class)
                // 定义 ChannelPipeline(多个ChannelHandler组合)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
                        ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
                        ch.pipeline().addLast(new ServerHandler());
                    }
                });
        // 绑定端口,开始接收进来的连接
        ChannelFuture f = b.bind(8080).sync();
        if (f.isSuccess()) {
            System.out.println("启动Netty服务成功,端口号:" + 8080);
        }
    }
    @PreDestroy
    public void shutdown() {
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}

ChannelHandler 消息处理

public class ServerHandler extends SimpleChannelInboundHandler<String> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println("Received message from client: " + msg);
        // 回复消息给客户端
        //ctx.writeAndFlush("Received your message: " + msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

2.1.2.客户端

客户端

@DependsOn({"nettyServer"})
@Component
public class NettyClient {
    private EventLoopGroup group;
    private Channel channel;
    @PostConstruct
    public void start() throws InterruptedException {
        group = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) {
                        ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
                        ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
                        socketChannel.pipeline().addLast(new ClientHandler());
                    }
                });
        ChannelFuture future = bootstrap.connect("127.0.0.1", 8080).sync();
        if (future.isSuccess()) {
            System.out.println("连接服务器成功");
        }
        channel = future.channel();
    }
    @PreDestroy
    public void destroy() {
        if (group != null) {
            group.shutdownGracefully();
        }
    }

ChannelHandler 消息处理

public class ClientHandler extends SimpleChannelInboundHandler<String> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) {
        System.out.println("Server response:" + msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

2.2. 基于Unix Socket协议

其他的不变,这里只关注客户服务端代码。

2.2.1. 代码

服务端

private final EventLoopGroup bossGroup = new KQueueEventLoopGroup();
    private final EventLoopGroup workerGroup = new KQueueEventLoopGroup();

    @PostConstruct
    public void start() throws InterruptedException {
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
                .channel(KQueueServerDomainSocketChannel.class)
                .childHandler(new ChannelInitializer<KQueueDomainSocketChannel>() {
                    @Override
                    public void initChannel(KQueueDomainSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
                        ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
                        ch.pipeline().addLast(new ServerHandler());
                    }
                });
        ChannelFuture f = b.bind(new DomainSocketAddress("/tmp/test.sock")).sync();
        if (f.isSuccess()) {
            System.out.println("启动Netty服务成功,文件:" + "/tmp/test.sock");
        }
    }

客户端

public void start() throws InterruptedException {
        group = new KQueueEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(KQueueDomainSocketChannel.class)
                .handler(new ChannelInitializer<KQueueDomainSocketChannel>() {
                    @Override
                    protected void initChannel(KQueueDomainSocketChannel socketChannel) {
                        socketChannel.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
                        socketChannel.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
                        socketChannel.pipeline().addLast(new ClientHandler());
                    }
                });

        ChannelFuture future = bootstrap.connect(new DomainSocketAddress("/tmp/test.sock")).sync();

        if (future.isSuccess()) {
            System.out.println("连接服务器成功");
        }

        channel = future.channel();
    }

2.2.2. 分析

Unix Socket协议

Unix Domain Socket(简称UDS)是一个用于实现本地进程间通信的协议。与使用网络套接字(socket)进行通信不同,UDS仅用于同一台机器上的相邻进程之间的通信。

在Unix/Linux系统中,UDS通常被用于代替TCP/IP套接字来提高性能和安全性。不过它们可以通过文件系统路径来建立连接,不能跨机器通信。

Netty中协议切换

通过对比上述代码,可以看出netty中切换协议是比较简单的,换成对应的 Channel 实现类,以及连接方式就可以了。

因为是mac中运行,示例代码中用KQueueDomainSocketChannel替代DomainSocketChannel

2.3. 测试

Controller发消息

@RestController
public class MsgController {
    @Autowired
    private NettyClient nettyClient;

    @PostMapping("/send")
    public ResponseEntity<Void> sendMsg(@RequestBody String msg) {
        System.out.println(msg.getBytes(StandardCharsets.UTF_8).length);
        try {
            for (int i = 0; i < 1000; i++) {
                nettyClient.send(msg);
            }
            return new ResponseEntity<>(HttpStatus.OK);
        } catch (Exception e) {
            return new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR);
        }
    }
}

测试结果

前面已经基于 TCP协议写好了netty的客户端、服务端,
现在写接口,可以通过客户端给服务端发消息,不过单次调用会一次性发1000遍。

接口调用传入:hello

预期结果:

Received message from client: hello
Received message from client: hello
... ... // 同样输出1000遍

实际结果:

Received message from client: hello
Received message from client: hellohello
Received message from client: hellohe
Received message from client: llohellohellohellohello
Received message from client: hellohellohello
... ... // 无规则

出现问题的原因就是下一章要将的粘包、拆包问题。

3. 粘包、拆包问题

3.1. 问题分析

3.1.1. tcp协议出现问题的原因

粘包/拆包问题是由TCP协议本身造成的,和Netty本身无关,任何基于TCP协议实现数据传输的技术都会面临这个问题,原因如下:

  • 应用程序写入数据的字节大小大于套接字发送缓冲区的大小:这种情况下,会发生拆包现象,发送方的TCP协议栈会把一次应用程序的发送操作分成多个数据段进行发送。
  • 进行了多次写入操作,但数据没有被及时发送出去:这可能是由于TCP的Nagle算法造成的。
  • 应用程序读取操作不及时:如果接收方面的应用层没有及时读取接收缓冲区的数据,造成堆积,从而形成一个大的数据块。如果此时应用层进行数据读取,就容易读取到多个TCP数据段的数据,形成了粘包现象。
  • 网络环境等硬件问题:如网络延迟、抖动等,也可能导致多个小的数据包合并为一个大包进行传送,从而导致粘包。

解决粘包和拆包问题的基本策略就是在应用层引入数据边界。常见的方法有:固定长度、分隔符、在包头中加入长度字段等。

3.1.2. 其他协议为什么没问题

HTTP 协议

HTTP 协议 基于 TCP 协议构建,而 TCP 是一种面向流的协议,所以理论上可能会有粘包问题。但是在实际应用中,HTTP 协议已经做了明确的分包处理,因此通常不需要开发者去处理粘包问题,HTTP 使用了一些特定的方式来定义数据包的边界:

对于 HTTP/1.0 和 HTTP/1.1,一次完整的 HTTP 交互由一个请求和一个响应组成,它们都是相对独立的。请求和响应都有明确的开始行(请求行或状态行)和结束标志(如 Content-Length 头或 chunked 编码表示的消息体长度)。这样可以很清楚地知道报文的开始和结束,避免了粘包问题。
对于 HTTP/2,它引入了二进制帧的概念。每个帧有明确的长度和类型,这也使得在接收端可以准确地解析出各个帧,避免粘包问题。

UDP 协议

UDP 协议 是一种无连接的、不可靠的协议,它并没有像TCP协议那样提供流量控制和拥塞控制等功能,因此在传输过程中可能会出现丢包或乱序等问题。由于UDP协议采用数据报方式进行传输,每个UDP数据报都有独立的头部标识,因此不会出现粘包问题。

WebSocket 协议

WebSocket 协议 建立连接后,客户端和服务器之间会保持长时间的连接状态,可以随时发送和接收数据。当服务器发送数据时,会将数据封装到一个完整的WebSocket帧中,并通过TCP协议进行传输。而客户端收到数据后,会从WebSocket帧中解析出数据,并进行相应处理。这样就避免了TCP协议中的“粘包”和“拆包”问题。

3.1.3. Unix Socket 为什么也有问题

Unix Socket(也被称为 Unix Domain Socket,UDS)主要支持以下两种类型的通信协议:

  • 流式协议 (SOCK_STREAM): 类似于 TCP,在发送和接收数据时提供了字节流服务。数据在两个方向上都是有序的,并且不会重复或者丢失。这种模式下,一端发送的数据顺序和另一端接收的数据顺序是相同的。
  • 数据报协议 (SOCK_DGRAM): 这种类型的 socket 提供了一种无需连接的、固定大小的消息服务,类似于 UDP。每次读操作都返回最多一条完整的消息;如果消息超出缓冲区的大小,那么该消息可能会被截断。

Unix Socket 的这两种模式在行为上与 TCP 和 UDP 很相似。因此在基于 SOCK_STREAM 协议使用 Netty 开发服务端和客户端时,可能会出现类似粘包的问题。

前面有现成的基于Unix Stream协议实现的代码,我们同样调用接口试一下,发现 Unix Socket 同样会产生粘包问题

解决思路

结合HTTP、UDP、WebSocket 解决粘包/拆包问题的思路,同样也可以推导解决TCP问题的思路:在发送数据时,应该设计一种协议来确定消息的边界,比如:添加特殊的分隔符,或者在每个消息的头部包含消息的长度等。

基于这个思路,Netty 框架提供了 LineBasedFrameDecoder、DelimiterBasedFrameDecoder和 LengthFieldBasedFrameDecoder等解决方案,下面一一介绍。

3.2. 解决方案

3.2.1. LineBasedFrameDecoder

使用行结束符作为数据包的分隔符。每条消息后面都有一个行结束符(例如 \n 或 \r\n),它会一直读取字节直到遇到这个结束符,然后把之前读取到的字节组装成一条消息。

如果没有找到行结束符,那么就认为当前还没有读取到完整的数据包,需要将已经读取到的字节保存起来,等待下次读取。

代码-客户端修改

发送消息的方法中,每条消息结尾都加上行结束符后缀:

public void send(String msg) {
        if (channel != null) {
            channel.writeAndFlush(msg + "\\n");
        } else {
            System.out.println("message sending failed, connection not established");
        }
    }

 代码-服务端修改

在 ChannelPipeline 中加上下列解码的 ChannelHandler:

ch.pipeline().addLast(new LineBasedFrameDecoder(1024));

 局限性

  • 固定的分隔符:主要是通过 \n 或 \r\n 来标识一个完整的消息。这意味着如果你的协议中没有使用这两个字符作为结束标记,或者这两个字符在消息体中有特殊含义,则不能正确工作。
  • 只支持文本数据: 主要设计为处理文本协议。对于二进制数据,尤其是包含 \n 或 \r\n 的二进制数据,可能会出现误切割的情况。
  • 无法处理大数据包: 如果一个非常大的数据块在没有任何分隔符的情况下被发送,会消耗大量内存来存储这些数据,直到找到一个分隔符。这可能会导致内存溢出问题。所以构造方法中要设置 maxLength 参数(如示例中的 1024)。

3.2.2. DelimiterBasedFrameDecoder

解决方式

和LineBasedFrameDecoder类似,当接收到数据时,会检查是否存在分隔符。如果存在,它就认为已经读取到了一个完整的消息,并将这个消息传递给下一个ChannelHandler进行处理。如果不存在,它将继续等待,直到读取到分隔符。

区别在于,前者的分隔符固定,而它的分隔符可以自定义。

代码-客户端修改

发送消息的方法中,每条消息结尾都加上行结束符后缀:

public void send(String msg) {
        if (channel != null) {
            channel.writeAndFlush(msg + "$_");
        } else {
            System.out.println("message sending failed, connection not established");
        }
    }

 代码-服务端修改

在 ChannelPipeline 中加上下列解码的 ChannelHandler:

ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));

局限性

  • 依赖于特定的分隔符:需要依赖特定的分隔符来判定一个消息的结束,但是在某些情况下,这样的分隔符可能并不存在,或者不能很好地被应用在该协议上。同样可能出现误判。
  • 不适合二进制协议: 由于DelimiterBasedFrameDecoder主要是针对文本协议设计的,所以在处理一些二进制协议时可能会遇到困难。
  • 内存问题: 如果一个非常大的数据块在没有任何分隔符的情况下被发送,DelimiterBasedFrameDecoder可能会消耗过多的内存来存储这些数据,直到找到一个分隔符。这可能会导致内存溢出问题。所以也需要设置 maxFrameLength(如示例中的 1024)。

3.2.3. FixedLengthFrameDecoder

解决方式

工作原理主要是每次从 ByteBuf 中读取固定长度的字节,然后构造成一个独立的 frame 对象,传递给下一个 handler 处理。

这样可以确保不会因为 TCP 粘包导致多个消息被当作一个消息处理,也不会因为 TCP 拆包导致一个消息被当作多个消息处理。

代码-服务端修改

在 ChannelPipeline 中加上下列解码的 ChannelHandler:

ch.pipeline().addLast(new FixedLengthFrameDecoder(5));

因为传输的参数“hello”是5个字节,这类就固定为5.

局限性

  • 固定长度限制: FixedLengthFrameDecoder 只能处理固定长度的消息,如果实际应用中的消息长度不固定,那么就无法使用 FixedLengthFrameDecoder 进行解码。相应地,如果消息长度小于固定长度,那么必须填充到固定长度,这就可能会浪费带宽。
  • 无内置校验: FixedLengthFrameDecoder 仅仅是按照固定长度切分消息,它并不关心消息的完整性和正确性。如果你想对消息进行校验,需要自己实现。

3.2.4. LengthFieldBasedFrameDecoder

解决方式

  • 长度字段标识: LengthFieldBasedFrameDecoder 解决粘包问题的方式主要是通过在数据包中添加一个表示后续数据长度的字段,这个字段的位置和长度可以由开发者自定义,解码器会根据这个长度字段得知具体的消息体长度,然后进行正确的截取。
  • 校验读取: 当接收到新的数据包时,解码器首先找到长度字段,读取出消息体的长度,然后等待足够长度的数据到达后,再从 ByteBuf 中读取,形成一个完整的消息帧。
  • 消除半包读取: 通过以上方式,LengthFieldBasedFrameDecoder 可以确保每次都能从 ByteBuf 中读取到完整的消息帧,不会出现只读取到半个消息帧的情况。

在网络通信中,发送和接收数据需要遵循同一种协议。LengthFieldBasedFrameDecoder 是一个基于长度字段的解码器,而 LengthFieldPrepender 则是一个对应的编码器,它会在消息体前面加上一个长度字段。

它们一般会配套使用,这样发送端发送的数据和接收端接收的数据结构就会保持一致,从而能够正确地进行解码。

代码-客户端修改

添加ChannelHandler实现,通过LengthFieldPrepender这个编码器,在发送的消息前添加长度字段(这里的 4 是指长度字段本身占用的字节数量):

socketChannel.pipeline().addLast(new LengthFieldPrepender(4));

代码-服务端修改

在 ChannelPipeline 中加上下列解码的 ChannelHandler:

ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));

4. Netty特性优化

4.1. 内存池 PooledByteBufAllocator

内存池是一种用于管理和复用内存块的技术。可以避免频繁地分配和释放内存,从而减少系统开销和内存碎片问题,提高系统的效率和性能。

PooledByteBufAllocator(分配器 [ˈæləˌkeɪtər]) 是 Netty 提供的一个基于内存池的 ByteBuf 分配器。与直接创建新的 ByteBuf 实例相比,PooledByteBufAllocator 提供了重用内存的能力,这可以显著减少内存分配和垃圾收集的开销,提高性能:

  • 内存分区:PooledByteBufAllocator 将内存划分为多个 Arena,每个 Arena 进一步划分为多个 Chunk 和 Page。通过这种方式,PooledByteBufAllocator 能够满足不同大小的内存需求,并且能够快速找到合适的内存块进行分配。
  • 对象复用:当 ByteBuf 的引用计数为 0 时,它的内存会被返回到原来的 Arena 并可以被重用。这避免了频繁创建和销毁对象,降低了系统开销。
  • 线程本地缓存:PooledByteBufAllocator 使用了线程本地缓存技术(Thread Local Cache),每个线程都有自己的一份缓存池,这可以减少线程间的竞争,进一步提高性能。
  • 内存分配策略:对于小于 Page 大小的内存分配请求,PooledByteBufAllocator 使用 jemalloc 策略进行内存分配。这是一种高效的内存分配策略,能够减少内存碎片,提高内存使用率。

通过这些方式,PooledByteBufAllocator 可以有效地复用内存,提高了内存使用的效率和性能。

PooledByteBufAllocator 创建 ByteBuf 过程

PooledByteBufAllocator allocator = new PooledByteBufAllocator();

// 分别分配堆内存、堆外内存,内存大小也可以指定,如: allocator.heapBuffer(1024);
ByteBuf heapBuffer = allocator.heapBuffer();
ByteBuf directBuffer = allocator.directBuffer(); 

// 正常将写入数据或读取
heapBuffer.writeBytes(data);
byte b = heapBuffer.readByte();

// 记得不用时释放内存,堆外内存不受垃圾回收,不释放会有内存泄露
heapBuffer.release();
directBuffer.release();

不过实际项目中,很少有见过通过创建 PooledByteBufAllocator,再创建 ByteBuf 的。

基本都是由 Unpooled 工具类 创建 ByteBuf。

创建:堆内内存 OR 堆外内存?

(1)堆内内存:

如果你需要处理的数据比较小(比如几 KB 或几百 KB),而且需要进行频繁的读写操作,那么建议使用堆内内存。

(2)堆外内存:

如果你需要处理的数据比较大(比如几 MB 或几十 MB),而且需要进行频繁的 IO 操作,那么建议使用堆外内存。堆外内存是由操作系统管理的,数据存储在操作系统的内存中,可以直接进行 IO 操作。此外,在使用堆外内存时,可以避免 Java 堆和操作系统之间的数据拷贝,减少了系统的开销和延迟。

需要注意的是,堆外内存的申请和释放需要调用 JNI 接口,因此申请和释放堆外内存的开销会比较高。因此一般来说:

对于小规模的数据处理应用,建议使用堆内内存;对于大规模的数据处理应用,建议使用堆外内存

4.2. 内存池 Unpooled

Unpooled 是 Netty 中一个工具类,用于创建不同类型的 ByteBuf 对象,而且同样是使用PooledByteBufAllocator 类来分配和管理内存。

只不过它提供了一些静态方法,可以很方便地创建 HeapBuf、DirectBuf、CompositeBuf 等类型的 ByteBuf 对象。常见方法:

  • buffer():创建一个 HeapBuf 对象,使用 JVM 堆内存来存储数据。
  • directBuffer():创建一个 DirectBuf 对象,使用直接内存来存储数据。
  • wrappedBuffer():创建一个 CompositeBuf 对象,可以将多个 ByteBuf 对象合并成一个虚拟的 ByteBuf 对象。
  • copiedBuffer():创建一个 HeapBuf 对象,并将字节数组的内容复制到 HeapBuf 中。
  • unsafeBuffer():创建一个不安全的 ByteBuf 对象,用于一些特殊的场景,例如 JNI 调用等。

不过同样要记得在使用完毕后,应该及时调用 release() 方法来释放 ByteBuf 对象的资源哦。

回顾一下:考虑到Netty中 ByteBuf 等常用类,为避免频繁地分配和释放内存,通过内存池实现内存复用。但 ByteBuf 也是类,频繁地创建、销毁对象同样有大量的性能开销,怎么优化?

那么接下来我们看一下 对象池。

4.3. 对象池 Recycler

Recycler (回收器,[ˌriːˈsaɪkl] )是 Netty是一个对象池,主要用于重用对象,避免频繁创建和销毁带来的性能开销。被广泛地应用于各种场景中,例如 ByteBuf 对象池、EventExecutor 对象池、ChannelHandlerContext 对象池等等。我们还是来看看 ByteBuf。

ByteBuf 中包含一个 Recycler.Handle 对象,用于管理 ByteBuf 对象池的创建和销毁。当需要创建一个新的 ByteBuf 对象时,无论通过前面介绍的PooledByteBufAllocator、Unpooled,都是通过 ByteBufAllocator 接口提供的 directBuffer() 或 heapBuffer() 等方法来创建。

这些方法就是基于Recycler,会自动从线程本地的对象池中获取一个 ByteBuf 对象,如果对象池为空,则会创建一个新对象,并将其加入对象池中。当不再需要这个对象时,可以通过调用 release() 方法将其回收到对象池中,等待下次使用。

ChannelHandlerContext 对象池也类似,在 Netty 中,可以通过 ChannelHandlerContext 的 newContext() 方法来获取一个新的 ChannelHandlerContext 对象,这个方法会从 Recycler 对象池中获取一个 ChannelHandlerContext 对象并进行初始化,如果没有可用的对象,则会创建一个新对象。在使用完后,通过调用 ChannelHandlerContext 的 recycle() 方法将其回收到对象池中,等待下次使用。

当然 Recycler 是 Netty 中实现对象池的机制,并不局限于只有 Netty 的这些组件类可以用,任何我们自定义的类都可以。下面看一个例子。

示例(任何对象)

public class UserCache {
    private static final Recycler<User> userRecycler = new Recycler<User>() {
        @Override
        protected User newObject(Handle<User> handle) {
            return new User(handle);
        }
    };
    static final class User {
        private String name;
        private Recycler.Handle<User> handle;
        public void setName(String name) {
            this.name = name;
        }
        public String getName() {
            return name;
        }
        public User(Recycler.Handle<User> handle) {
            this.handle = handle;
        }
        public void recycle() {
            handle.recycle(this);
        }
    }
    public static void main(String[] args) {
        User user1 = userRecycler.get();
        user1.setName("hello");
        user1.recycle();
        User user2 = userRecycler.get();
        System.out.println(user1 == user2);
    }
}

左边的例子中,我们定义了一个User类,main方法中,user1.recycle(),user1回收了之后,然后 user2 再获取。

  • (1)user2获取的依然是同一个对象,所以打印出的结果是:hello 和 true。
  • (2)如果我们注释掉 user1.cecycle(),user2 会获取不到对象,打印的结果就是:null 和 false。

线程安全

另外,Recycler 使用线程本地变量(FastThreadLocal)来存储对象,每个线程都有一个独立的对象池。这个机制可以保证对象的安全性和线程互相独立,避免了线程安全问题和竞争条件的出现。

那么这个 FastThreadLocal 是啥?和常见的 ThreadLocal 有啥关系呢?

4.4. 本地线程优化 FastThreadLocal

FastThreadLocal(更快的ThreadLocal) 是 Netty 自己研发的一个工具类,用于替换 Java 原生的 ThreadLocal。主要有以下几个原因:

  • 性能:与 ThreadLocal 相比,FastThreadLocal 在存取线程局部变量时有更快的速度。在 ThreadLocal 中,每次获取变量都需要通过哈希映射进行查找,当线程局部变量很多时,这会成为一个性能瓶颈。而 FastThreadLocal 则将所有线程的局部变量存储在一个数组中,通过索引快速定位,提高了存取速度。
  • 避免内存泄漏:ThreadLocal 在使用不当时,很容易造成内存泄漏,需要我们在使用后再手动调用reomve()方法。而 FastThreadLocal 能有效避免这个问题。它会在每个线程结束时自动清理线程局部变量,而不是依赖于 JVM 的垃圾回收。
  • 更好的整合:Netty 中很多地方使用了线程局部变量,例如 ByteBuf 的内存池、Recycler 对象池等。有了自己的 FastThreadLocal,Netty 可以更好地控制和优化这些功能,提高整体性能。

 代码示例

public class FastThreadLocalDemo {
    private static final FastThreadLocal<Integer> THREAD_LOCAL = new FastThreadLocal<Integer>() {
        @Override
        protected Integer initialValue() throws Exception {
            return 1;
        }
    };
    public static void main(String[] args) {
        new FastThreadLocalThread(() -> {
            for (int i = 0; i < 10; i++) {
                System.out.println(Thread.currentThread().getName() + " --> " + THREAD_LOCAL.get());
                THREAD_LOCAL.set(THREAD_LOCAL.get() + 1);
            }
        }, "FastThreadLocalThread-1").start();
    }
}

注意事项

FastThreadLocal 的使用方式和 ThreadLocal差别不大,但是有几点需要注意:

  • 使用 FastThreadLocal 的线程最好是 FastThreadLocalThread 类型或者其子类。FastThreadLocal 会在这些线程中有更好的性能。如果使用的是Thread或其他实现的话,FastThreadLocal 仍然可以工作,但性能会降级。
  • 相比于 ThreadLocal,FastThreadLocal 的优势在于当一个线程有多个线程本地变量时,它可以通过减少哈希冲突和查找来提高性能。但是如果一个线程只有一个或者很少的线程本地变量,那么 ThreadLocal 可能会有更好的性能。
  • 当你不再需要使用 FastThreadLocal 中的对象时,还是应该调用 remove() 来避免内存泄漏。

虽说在使用了 FastThreadLocalThread 实例的情况下,在线程结束时,FastThreadLocal 会自动清理所有线程局部变量。但显式地调用 remove() 方法仍然是一个好的实践。特别是在长生命周期的线程或者使用了线程池的情况下,显式地清理线程局部变量可以帮助避免潜在的内存泄漏问题。

以上就是Netty开发及粘包实战解决分析的详细内容,更多关于Netty开发粘包解决的资料请关注脚本之家其它相关文章!

以上就是Netty开发及粘包实战解决分析的详细内容,更多关于Netty开发粘包解决的资料请关注脚本之家其它相关文章!

相关文章

  • SpringCloud轮询拉取注册表与服务发现流程详解

    SpringCloud轮询拉取注册表与服务发现流程详解

    这篇文章主要介绍了SpringCloud轮询拉取注册表与服务发现,现在很多创业公司都开始往springcloud靠了,可能是由于文档和组件比较丰富的原因吧,毕竟是一款目前来说比较完善的微服务架构
    2022-11-11
  • SpringBoot中使用拦截器的配置详解

    SpringBoot中使用拦截器的配置详解

    这篇文章主要介绍了SpringBoot中使用拦截器的配置详解,拦截器是 AOP 的一种实现,专门拦截对动态资源的后台请求,即拦截对控制层的请 求,使用场景比较多的是判断用户是否有权限请求后台,需要的朋友可以参考下
    2024-01-01
  • Java中策略设计模式的实现及应用场景

    Java中策略设计模式的实现及应用场景

    策略设计模式是Java中一种常用的设计模式,它通过定义一系列算法并将其封装成独立的策略类,从而使得算法可以在不影响客户端的情况下随时切换。策略设计模式主要应用于系统中存在多种相似的算法、需要灵活调整算法逻辑或者需要扩展新的算法等场景
    2023-04-04
  • 微服务eureka和nacos案例详解

    微服务eureka和nacos案例详解

    这篇文章主要介绍了微服务eureka和nacos,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2023-06-06
  • Java方法递归的形式和常见递归算法(方法递归结合File类查找文件)

    Java方法递归的形式和常见递归算法(方法递归结合File类查找文件)

    方法递归方法直接调用自己或者间接调用自己的形式称为方法递归( recursion),递归做为一种算法在程序设计语言中广泛应用,这篇文章主要介绍了Java方法递归的形式和常见递归算法-方法递归结合File类查找文件,需要的朋友可以参考下
    2023-02-02
  • SpringBoot整合RabbitMQ消息队列的完整步骤

    SpringBoot整合RabbitMQ消息队列的完整步骤

    这篇文章主要给大家介绍了关于SpringBoot整合RabbitMQ消息队列的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-05-05
  • 解决MyEclipse出现the user operation is waiting的问题

    解决MyEclipse出现the user operation is waiting的问题

    今天做项目的时候每次修改代码保存后都会跳出一个框框,然后就有两个进度条,上面写the user operation is wating...小编去网上查了查解决了这个问题,下面跟大家分享一下。
    2018-04-04
  • Java实现图片比对算法

    Java实现图片比对算法

    这篇文章主要为大家详细介绍了Java实现图片比对算法,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-04-04
  • String中intern方法的使用场景详解

    String中intern方法的使用场景详解

    这篇文章主要给大家介绍了关于String中intern方法的使用场景,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-09-09
  • java获取注册ip实例

    java获取注册ip实例

    本文分享了java获取注册ip实例代码,代码简洁,具有很好的参考价值,需要的朋友一起来看下吧
    2016-12-12

最新评论