Netty之使用DelimiterBasedFrameDecoder进行消息分隔详解

 更新时间:2023年12月14日 09:59:00   作者:Terisadeng  
这篇文章主要介绍了Netty之使用DelimiterBasedFrameDecoder进行消息分隔详解,在使用Netty进行TCP消息传输时,为了上层协议能够对消息正确区分,避免粘包和拆包导致的问题,一般可以通过消息定长、将回车换行符作为消息结束符,需要的朋友可以参考下

DelimiterBasedFrameDecoder消息分隔

在使用Netty进行TCP消息传输时,为了上层协议能够对消息正确区分,避免粘包和拆包导致的问题。

一般可以通过消息定长、将回车换行符作为消息结束符、将特殊的分隔符作为消息的结束标志或者在消息头中定义长度字段来标识消息的总长度。

其中常用的通过分隔符作为消息的结束标志就涉及到Netty的DelimiterBasedFrameDecoder类,服务端如下:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
public class EchoServer
{
    public void bind(int port)throws Exception{
        //配置服务端的NIO线程组
        EventLoopGroup bossGroup=new NioEventLoopGroup();
        EventLoopGroup workerGroup=new NioEventLoopGroup();
        try
        {
            ServerBootstrap b=new ServerBootstrap();
            b.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, 1024)
            //控制台输出服务端运行日志
            .handler(new LoggingHandler(LogLevel.INFO))
            //编写服务端接收和发送消息的具体逻辑
            .childHandler(new ChildChannleHandler());
            //绑定启动端口,同步等待成功
            ChannelFuture f=b.bind(port).sync();
            //等待服务端监听端口关闭
            f.channel().closeFuture().sync();
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
        finally{
            //释放线程资源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
    //服务端接收到客户端的消息时会先执行该类的initChannel()方法进行channel的初始化操作
    private class ChildChannleHandler extends ChannelInitializer<SocketChannel>{
        @Override
        protected void initChannel(SocketChannel arg0)
            throws Exception
        {
            //创建分隔符缓冲对象,使用"$_"作为分隔符
            ByteBuf delimiter=Unpooled.copiedBuffer("$_".getBytes());
            //创建DelimiterBasedFrameDecoder对象,将其加入到ChannelPipeline
            //参数1024表示单条消息的最大长度,当达到该长度仍然没有找到分隔符就抛出TooLongFrame异常,第二个参数就是分隔符
            //由于DelimiterBasedFrameDecoder自动对请求消息进行了解码,下面的ChannelHandler接收到的msg对象就是完整的消息包
            arg0.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
            //StringDecoder解码器将ByteBuf解码成字符串对象,这样在ChannelHandlerAdapter中读取消息时就不需要通过ByteBuf获取了
            arg0.pipeline().addLast(new StringDecoder());
            //对网络事件进行读写操作的类
            arg0.pipeline().addLast(new EchoServerHandler());
        }
    }
    public static void main(String[] args)throws Exception
    {
        int port =8888;
        if (args!=null&&args.length>0)
        {
            port=Integer.valueOf(args[0]);
        }
        new EchoServer().bind(port);
    }
}

服务端消息读写操作:

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
//网络I/O事件读写操作
public class EchoServerHandler extends ChannelHandlerAdapter
{
    int counter=0;
    //接收客户端发送的消息并返回响应
    @Override
    public void channelRead(ChannelHandlerContext ctx,Object msg)throws Exception{
        //获取String类型的请求消息(StringDecoder已经对消息进行解码)
        String body=(String)msg;
        System.out.println("This is "+ ++counter+"times receive client : ["+body+"]");
        //由于设置了DelimiterBasedFrameDecoder过滤掉了分隔符"$_",   因此需要将返回消息尾部拼接上分隔符
        body+="$_";
        //将接收到的消息再放到ByteBuf中重新发送给客户端
        ByteBuf buf=Unpooled.copiedBuffer(body.getBytes());
        //把待发送的消息放到发送缓冲数组中,并把缓冲区中的消息全部写入SockChannel发送给客户端
        ctx.writeAndFlush(buf);
    }
    //发生异常时关闭ChannelHandlerContext,释放和ChannelHandlerContext相关联的句柄等资源
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
        cause.printStackTrace();
        ctx.close();
    }
}

客户端:

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
//客户端
public class EchoClient
{
    public void connect(int port,String host)throws Exception{
        //创建客户端进行I/O读写的线程组
        EventLoopGroup g=new NioEventLoopGroup();
        try
        {
            //创建客户端启动辅助类Bootstrap
            Bootstrap b=new Bootstrap();
            b.group(g)
            //设置Channel
            .channel(NioSocketChannel.class)
            //配置Channel
            .option(ChannelOption.TCP_NODELAY, true)
            //添加处理类,这里为了方便直接使用了匿名内部类
            .handler(new ChannelInitializer<SocketChannel>()
            {
                //当创建NioSocketChannel成功后,将ChannelHandler设置到ChannelPipeline中处理网络I/O事件
                @Override
                protected void initChannel(SocketChannel arg0)
                    throws Exception
                {
                    //与服务端相同,需要配置一系列的ChannelHandler
                    ByteBuf delimiter=Unpooled.copiedBuffer("$_".getBytes());
                    arg0.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));
                    arg0.pipeline().addLast(new StringDecoder());
                    //客户端的处理类加入ChannelPipeline
                    arg0.pipeline().addLast(new EchoClientHandler());
                }
            });
            //调用connect方法发起异步连接,并调用同步方法等待连接成功
            ChannelFuture f=b.connect(host, port).sync();
            //f.channel().writeAndFlush(Unpooled.wrappedBuffer("111$_".getBytes()));
            //等待客户端连接关闭
            f.channel().closeFuture().sync();
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
        finally{
            //释放线程组
            g.shutdownGracefully();
        }
    }
    public static void main(String[] args)throws Exception
    {
        int port=8888;
        if (args!=null&&args.length>0)
        {
            port=Integer.valueOf(args[0]);
        }
        new EchoClient().connect(port, "127.0.0.1");
    }
}

客户端网络I/O事件处理:

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
//客户端读写网络I/O事件类
public class EchoClientHandler extends ChannelHandlerAdapter
{
    int counter;
    //发送到服务端的消息,注意结尾的分隔符一定要和服务端配置的分隔符一致,否则服务端ChannelInitializer.initChannel()方法虽然能够调用,但是DelimiterBasedFrameDecoder无法找到分隔符,不会调用读取消息的channelRead方法
    static final String ECHO_REQ="Hi,Welcome to Netty.$_";
    public EchoClientHandler(){
    }
    //客户端发送消息的方法
    @Override
    public void channelActive(ChannelHandlerContext ctx)throws Exception{
        for (int i = 0; i < 10; i++ )
        {
            //Unpooled.copiedBuffer()方法是深克隆,也可以使用Unpooled.buffer()写入消息发送
            ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes()));
        }
    }
    //读取服务端发送的消息
    @Override
    public void channelRead(ChannelHandlerContext ctx,Object msg)throws Exception{
        String body=(String)msg;
        System.out.println("This is "+ ++counter+" times receive server:["+body+"]");
    }
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx)throws Exception{
        //将消息发送队列中的消息写入到SocketChannel中发送给对方,channelActive使用了writeAndFlush这里可以不重写
        ctx.flush();
    }
    //异常处理,关闭ChannelHandlerContext
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
        cause.printStackTrace();
        ctx.close();
    }
}

启动服务端:

启动客户端发送消息:

到此这篇关于Netty之使用DelimiterBasedFrameDecoder进行消息分隔详解的文章就介绍到这了,更多相关DelimiterBasedFrameDecoder进行消息分隔内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 零基础写Java知乎爬虫之进阶篇

    零基础写Java知乎爬虫之进阶篇

    前面几篇文章,我们都是简单的实现了java爬虫抓取内容的问题,那么如果遇到复杂情况,我们还能继续那么做吗?答案当然是否定的,之前的仅仅是入门篇,都是些基础知识,给大家练手用的,本文我们就来点高大上的东西
    2014-11-11
  • 浅谈Java文件被执行的历程

    浅谈Java文件被执行的历程

    学习java以来,都是以语法,类库入手,最基本的也是最基础的java编译过程往往被我遗忘,先解释一下学习java第一课时,都听到过的一句话,“java是半解释语言”。什么是半解释语言。本文将介绍Java文件被执行的历程。
    2021-06-06
  • Spring DI依赖注入过程解析

    Spring DI依赖注入过程解析

    依赖注入是由“依赖”和“注入”两个词汇组合而成,那么我们再一次顺藤摸瓜,分别分析这两个词语,这篇文章主要介绍了Spring DI依赖注入详解,需要的朋友可以参考下
    2022-11-11
  • Spring Batch批处理框架操作指南

    Spring Batch批处理框架操作指南

    Spring Batch 是 Spring 提供的一个数据处理框架。企业域中的许多应用程序需要批量处理才能在关键任务环境中执行业务操作,这篇文章主要介绍了Spring Batch批处理框架操作指南,需要的朋友可以参考下
    2022-07-07
  • Java通过 Socket 实现 TCP服务端

    Java通过 Socket 实现 TCP服务端

    这篇文章主要介绍了Java通过 Socket 实现 TCP服务端的相关资料,需要的朋友可以参考下
    2017-05-05
  • Jenkins Pipeline 部署 SpringBoot 应用的教程详解

    Jenkins Pipeline 部署 SpringBoot 应用的教程详解

    这篇文章主要介绍了Jenkins Pipeline 部署 SpringBoot 应用的详细教程,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-07-07
  • spring aop execution表达式的用法

    spring aop execution表达式的用法

    这篇文章主要介绍了spring aop execution表达式的用法,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-07-07
  • SpringMVC中事务是否可以加在Controller层的问题

    SpringMVC中事务是否可以加在Controller层的问题

    这篇文章主要介绍了SpringMVC中事务是否可以加在Controller层的问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-02-02
  • Spring MVC请求参数接收的全面总结教程

    Spring MVC请求参数接收的全面总结教程

    这篇文章主要给大家总结介绍了关于Spring MVC请求参数接收的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2018-08-08
  • 一文掌握maven  filtering标签

    一文掌握maven  filtering标签

    这篇文章主要介绍了maven  filtering标签,本文通过三种方法给大家讲解maven filtering标签,结合示例代码给大家介绍的非常详细,需要的朋友可以参考下
    2023-02-02

最新评论